import json
import time
import requests
import datetime
import urllib.parse
from pathlib import Path
from typing import Union, List, Dict, Any
import asyncio
from fastapi import FastAPI, Response, Request, Cookie, Form
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from starlette.concurrency import run_in_threadpool
BASE_DIR = Path(__file__).resolve().parent.parent
templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
class APITimeoutError(Exception): pass
def getRandomUserAgent(): return {'User-Agent': 'Mozilla/50 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.61 Safari/537.36'}
def isJSON(json_str):
try: json.loads(json_str); return True
except json.JSONDecodeError: return False
max_time = 10.0
max_api_wait_time = (3.0, 8.0)
failed = "Load Failed"
MAX_RETRIES = 10
RETRY_DELAY = 3.0
EDU_STREAM_API_BASE_URL = "https://siawaseok.duckdns.org/api/stream/"
SHORT_STREAM_API_BASE_URL = "https://yt-dl-kappa.vercel.app/short/"
invidious_api_data = {
'video': [
'https://invidious.lunivers.trade/',
'https://yt.omada.cafe/',
'https://inv.perditum.com/',
'https://inv.perditum.com/',
'https://iv.melmac.space/',
'https://invidious.nikkosphere.com/',
'https://iv.duti.dev/',
'https://youtube.alt.tyil.nl/',
'https://inv.antopie.org/',
'https://lekker.gay/',
],
'playlist': [
'https://invidious.lunivers.trade/',
'https://invidious.ducks.party/',
'https://super8.absturztau.be/',
'https://invidious.nikkosphere.com/',
'https://invidious.ducks.party/',
'https://yt.omada.cafe/',
'https://iv.melmac.space/',
'https://iv.duti.dev/',
],
'search': [
'https://invidious.lunivers.trade/',
'https://inv.vern.cc/',
'https://yt.thechangebook.org/',
'https://invidious.vern.cc/',
'https://invidious.materialio.us/',
'https://invid-api.poketube.fun/',
'https://invidious.ducks.party/',
'https://super8.absturztau.be/',
'https://invidious.nikkosphere.com/',
'https://invidious.ducks.party/',
'https://yt.omada.cafe/',
'https://iv.melmac.space/',
'https://iv.duti.dev/',
],
'channel': [
'https://invidious.lunivers.trade/',
'https://invid-api.poketube.fun/',
'https://invidious.ducks.party/',
'https://super8.absturztau.be/',
'https://invidious.nikkosphere.com/',
'https://invidious.ducks.party/',
'https://yt.omada.cafe/',
'https://iv.melmac.space/',
'https://iv.duti.dev/',
],
'comments': [
'https://invidious.lunivers.trade/',
'https://invidious.ducks.party/',
'https://super8.absturztau.be/',
'https://invidious.nikkosphere.com/',
'https://invidious.ducks.party/',
'https://yt.omada.cafe/',
'https://iv.duti.dev/',
'https://iv.melmac.space/',
]
}
class InvidiousAPI:
def __init__(self):
self.all = invidious_api_data
self.video = list(self.all['video']);
self.playlist = list(self.all['playlist']);
self.search = list(self.all['search']);
self.channel = list(self.all['channel']);
self.comments = list(self.all['comments']);
self.check_video = False
def requestAPI(path, api_urls):
starttime = time.time()
apis_to_try = api_urls
for api in apis_to_try:
if time.time() - starttime >= max_time - 1:
break
try:
res = requests.get(api + 'api/v1' + path, headers=getRandomUserAgent(), timeout=max_api_wait_time)
if res.status_code == requests.codes.ok and isJSON(res.text):
return res.text
except requests.exceptions.RequestException:
continue
raise APITimeoutError("All available API instances failed to respond.")
def getEduKey():
api_url = "https://apis.kahoot.it/media-api/youtube/key"
try:
res = requests.get(api_url, headers=getRandomUserAgent(), timeout=max_api_wait_time)
res.raise_for_status()
if isJSON(res.text):
data = json.loads(res.text)
return data.get("key")
except requests.exceptions.RequestException as e:
pass
except json.JSONDecodeError:
pass
return None
def formatSearchData(data_dict, failed="Load Failed"):
if data_dict["type"] == "video":
return {"type": "video", "title": data_dict.get("title", failed), "id": data_dict.get("videoId", failed), "author": data_dict.get("author", failed), "published": data_dict.get("publishedText", failed), "length": str(datetime.timedelta(seconds=data_dict.get("lengthSeconds", 0))), "view_count_text": data_dict.get("viewCountText", failed)}
elif data_dict["type"] == "playlist":
return {"type": "playlist", "title": data_dict.get("title", failed), "id": data_dict.get('playlistId', failed), "thumbnail": data_dict.get("playlistThumbnail", failed), "count": data_dict.get("videoCount", failed)}
elif data_dict["type"] == "channel":
thumbnail_url = data_dict.get('authorThumbnails', [{}])[-1].get('url', failed)
thumbnail = "https://" + thumbnail_url.lstrip("http://").lstrip("//") if not thumbnail_url.startswith("https") else thumbnail_url
return {"type": "channel", "author": data_dict.get("author", failed), "id": data_dict.get("authorId", failed), "thumbnail": thumbnail}
return {"type": "unknown", "data": data_dict}
async def getVideoData(videoid):
t_text = await run_in_threadpool(requestAPI, f"/videos/{urllib.parse.quote(videoid)}", invidious_api.video)
t = json.loads(t_text)
recommended_videos = t.get('recommendedvideo') or t.get('recommendedVideos') or []
fallback_videourls = list(reversed([i["url"] for i in t["formatStreams"]]))[:2]
return [{
'video_urls': fallback_videourls,
'description_html': t["descriptionHtml"].replace("\n", "
"), 'title': t["title"],
'length_text': str(datetime.timedelta(seconds=t["lengthSeconds"])), 'author_id': t["authorId"], 'author': t["author"], 'author_thumbnails_url': t["authorThumbnails"][-1]["url"], 'view_count': t["viewCount"], 'like_count': t["likeCount"], 'subscribers_count': t["subCountText"]
}, [
{"video_id": i["videoId"], "title": i["title"], "author_id": i["authorId"], "author": i["author"], "length_text": str(datetime.timedelta(seconds=i["lengthSeconds"])), "view_count_text": i["viewCountText"]}
for i in recommended_videos
]]
async def getSearchData(q, page):
datas_text = await run_in_threadpool(requestAPI, f"/search?q={urllib.parse.quote(q)}&page={page}&hl=jp", invidious_api.search)
datas_dict = json.loads(datas_text)
return [formatSearchData(data_dict) for data_dict in datas_dict]
async def getTrendingData(region: str):
path = f"/trending?region={region}&hl=jp"
datas_text = await run_in_threadpool(requestAPI, path, invidious_api.search)
datas_dict = json.loads(datas_text)
return [formatSearchData(data_dict) for data_dict in datas_dict if data_dict.get("type") == "video"]
async def getChannelData(channelid):
t = {}
try:
t_text = await run_in_threadpool(requestAPI, f"/channels/{urllib.parse.quote(channelid)}", invidious_api.channel)
t = json.loads(t_text)
latest_videos_check = t.get('latestvideo') or t.get('latestVideos')
if not latest_videos_check:
t = {}
except APITimeoutError:
pass
except json.JSONDecodeError:
pass
except Exception as e:
pass
latest_videos = t.get('latestvideo') or t.get('latestVideos') or []
author_thumbnails = t.get("authorThumbnails", [])
author_icon_url = author_thumbnails[-1].get("url", failed) if author_thumbnails else failed
author_banner_url = ''
author_banners = t.get('authorBanners', [])
if author_banners and author_banners[0].get("url"):
author_banner_url = urllib.parse.quote(author_banners[0]["url"], safe="-_.~/:")
return [[
{"type":"video", "title": i.get("title", failed), "id": i.get("videoId", failed), "author": t.get("author", failed), "published": i.get("publishedText", failed), "view_count_text": i.get('viewCountText', failed), "length_str": str(datetime.timedelta(seconds=i.get("lengthSeconds", 0)))}
for i in latest_videos
], {
"channel_name": t.get("author", "チャンネル情報取得失敗"),
"channel_icon": author_icon_url,
"channel_profile": t.get("descriptionHtml", "このチャンネルのプロフィール情報は見つかりませんでした。"),
"author_banner": author_banner_url,
"subscribers_count": t.get("subCount", failed),
"tags": t.get("tags", [])
}]
async def getPlaylistData(listid, page):
t_text = await run_in_threadpool(requestAPI, f"/playlists/{urllib.parse.quote(listid)}?page={urllib.parse.quote(str(page))}", invidious_api.playlist)
t = json.loads(t_text)["videos"]
return [{"title": i["title"], "id": i["videoId"], "authorId": i["authorId"], "author": i["author"], "type": "video"} for i in t]
async def getCommentsData(videoid):
t_text = await run_in_threadpool(requestAPI, f"/comments/{urllib.parse.quote(videoid)}", invidious_api.comments)
t = json.loads(t_text)["comments"]
return [{"author": i["author"], "authoricon": i["authorThumbnails"][-1]["url"], "authorid": i["authorId"], "body": i["contentHtml"].replace("\n", "
")} for i in t]
def get_fallback_hls_url(videoid: str) -> str:
FALLBACK_API_URL = f"https://test-live-tau.vercel.app/get/url/{videoid}"
try:
res = requests.get(
FALLBACK_API_URL,
headers=getRandomUserAgent(),
timeout=max_api_wait_time
)
res.raise_for_status()
hls_url = res.text.strip()
if not hls_url or not hls_url.startswith("https://manifest.googlevideo.com/api/manifest/hls_variant"):
raise ValueError("Fallback API response is not a valid HLS URL.")
return hls_url
except requests.exceptions.HTTPError as e:
raise APITimeoutError(f"Fallback HLS API returned HTTP error: {e.response.status_code}") from e
except (requests.exceptions.RequestException, ValueError) as e:
raise APITimeoutError(f"Error processing fallback HLS API response: {e}") from e
def get_360p_single_url(videoid: str) -> str:
YTDL_API_URL = f"https://server-thxk.onrender.com/stream/{videoid}"
try:
res = requests.get(
YTDL_API_URL,
headers=getRandomUserAgent(),
timeout=max_api_wait_time
)
res.raise_for_status()
data = res.json()
formats: List[Dict[str, Any]] = data.get("formats", [])
if not formats:
raise ValueError("External API response is missing video formats.")
target_format = next((
f for f in formats
if f.get("itag") == "18" and f.get("videoUrl")
), None)
if target_format and target_format.get("videoUrl"):
return target_format["videoUrl"]
raise ValueError("Could not find a single 360p stream with audio (itag 18) in the main API response.")
except (requests.exceptions.RequestException, ValueError, json.JSONDecodeError) as e:
try:
return get_fallback_hls_url(videoid)
except APITimeoutError as fallback_e:
raise APITimeoutError(f"Both primary and fallback stream APIs failed to respond. Primary error: {e}. Fallback error: {fallback_e}") from fallback_e
except Exception as fallback_e:
raise ValueError(f"Fallback HLS API failed: {fallback_e}") from fallback_e
def fetch_high_quality_streams(videoid: str) -> dict:
YTDL_API_URL = f"https://server-thxk.onrender.com/high/{videoid}"
try:
res = requests.get(
YTDL_API_URL,
headers=getRandomUserAgent()
)
res.raise_for_status()
data = res.json()
hls_url = data.get("m3u8Url")
if not hls_url:
raise ValueError("Could not find m3u8Url in the external high-quality stream API response.")
return {
"video_url": hls_url,
"audio_url": "",
"title": f"{data.get('resolution', 'High Quality')} Stream for {videoid}"
}
except requests.exceptions.HTTPError as e:
raise APITimeoutError(f"External stream API returned HTTP error: {e.response.status_code}") from e
except (requests.exceptions.RequestException, ValueError, json.JSONDecodeError) as e:
raise APITimeoutError(f"Error processing external stream API response: {e}") from e
async def fetch_embed_url_from_external_api(videoid: str) -> str:
target_url = f"{EDU_STREAM_API_BASE_URL}{videoid}"
def sync_fetch():
res = requests.get(
target_url,
headers=getRandomUserAgent(),
timeout=max_api_wait_time
)
res.raise_for_status()
data = res.json()
embed_url = data.get("url")
if not embed_url:
raise ValueError("External API response is missing the 'url' field.")
return embed_url
return await run_in_threadpool(sync_fetch)
async def fetch_short_data_from_external_api(channelid: str) -> Dict[str, Any]:
target_url = f"{SHORT_STREAM_API_BASE_URL}{urllib.parse.quote(channelid)}"
def sync_fetch():
res = requests.get(
target_url,
headers=getRandomUserAgent(),
timeout=max_api_wait_time
)
res.raise_for_status()
return res.json()
return await run_in_threadpool(sync_fetch)
app = FastAPI()
invidious_api = InvidiousAPI()
app.mount(
"/static",
StaticFiles(directory=str(BASE_DIR / "static")),
name="static"
)
@app.get("/api/edu")
async def get_edu_key_route():
key = await run_in_threadpool(getEduKey)
if key:
return {"key": key}
else:
return Response(content='{"error": "Failed to retrieve key from Kahoot API"}', media_type="application/json", status_code=500)
@app.get('/api/stream_high/{videoid}', response_class=HTMLResponse)
async def embed_high_quality_video(request: Request, videoid: str, proxy: Union[str] = Cookie(None)):
try:
stream_data = await run_in_threadpool(fetch_high_quality_streams, videoid)
except APITimeoutError as e:
return Response(f"Failed to retrieve high-quality stream URL", status_code=503)
except Exception as e:
return Response("An unexpected error occurred while retrieving stream data.", status_code=500)
return templates.TemplateResponse(
'embed_high.html',
{
"request": request,
"video_url": stream_data["video_url"],
"audio_url": stream_data["audio_url"],
"video_title": stream_data["title"],
"videoid": videoid,
"proxy": proxy
}
)
@app.get("/api/stream_360p_url/{videoid}")
async def get_360p_stream_url_route(videoid: str):
try:
url = await run_in_threadpool(get_360p_single_url, videoid)
return {"stream_url": url}
except APITimeoutError as e:
return Response(content=f'{{"error": "Failed to get stream URL after multiple attempts: {e}"}}', media_type="application/json", status_code=503)
except Exception as e:
return Response(content=f'{{"error": "An unexpected error occurred: {e}"}}', media_type="application/json", status_code=500)
@app.get('/api/edu/{videoid}', response_class=HTMLResponse)
async def embed_edu_video(request: Request, videoid: str, proxy: Union[str] = Cookie(None)):
embed_url = None
try:
embed_url = await fetch_embed_url_from_external_api(videoid)
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code
if status_code == 404:
return Response(f"Stream URL for videoid '{videoid}' not found.", status_code=404)
return Response("Failed to retrieve stream URL from external service (HTTP Error).", status_code=503)
except (requests.exceptions.RequestException, ValueError, json.JSONDecodeError) as e:
return Response("Failed to retrieve stream URL from external service (Connection/Format Error).", status_code=503)
return templates.TemplateResponse(
'embed.html',
{
"request": request,
"embed_url": embed_url,
"videoid": videoid,
"proxy": proxy
}
)
@app.get("/api/short/{channelid}")
async def get_short_data_route(channelid: str):
try:
data = await fetch_short_data_from_external_api(channelid)
return data
except Exception as e:
return Response(
content=f'{{"error": "Failed to retrieve Shorts data from external service: {e!r}"}}',
media_type="application/json",
status_code=503
)
@app.get('/', response_class=HTMLResponse)
async def home(request: Request, yuzu_access_granted: Union[str] = Cookie(None), proxy: Union[str] = Cookie(None)):
if yuzu_access_granted != "True":
return RedirectResponse(url="/gate", status_code=302)
trending_videos = []
try:
trending_videos = await getTrendingData("jp")
except Exception as e:
pass
return templates.TemplateResponse("index.html", {
"request": request,
"proxy": proxy,
"results": trending_videos,
"word": ""
})
@app.get('/gate', response_class=HTMLResponse)
async def access_gate_get(request: Request):
return templates.TemplateResponse("access_gate.html", {
"request": request,
"message": "アクセスコードを入力してください。"
})
@app.post('/gate', response_class=RedirectResponse)
async def access_gate_post(request: Request, access_code: str = Form(...)):
CORRECT_CODE = "yuzu"
if access_code == CORRECT_CODE:
response = RedirectResponse(url="/", status_code=302)
expires_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=1)
response.set_cookie(key="yuzu_access_granted", value="True", expires=expires_time.strftime("%a, %d-%b-%Y %H:%M:%S GMT"), httponly=True)
return response
else:
return templates.TemplateResponse("access_gate.html", {
"request": request,
"message": "無効なアクセスコードです。もう一度入力してください。",
"error": True
}, status_code=401)
@app.get('/watch', response_class=HTMLResponse)
async def video(v:str, request: Request, proxy: Union[str] = Cookie(None)):
video_data = await getVideoData(v)
high_quality_url = ""
return templates.TemplateResponse('video.html', {
"request": request, "videoid": v, "videourls": video_data[0]['video_urls'],
"high_quality_url": high_quality_url,
"description": video_data[0]['description_html'], "video_title": video_data[0]['title'], "author_id": video_data[0]['author_id'], "author_icon": video_data[0]['author_thumbnails_url'], "author": video_data[0]['author'], "length_text": video_data[0]['length_text'], "view_count": video_data[0]['view_count'], "like_count": video_data[0]['like_count'], "subscribers_count": video_data[0]['subscribers_count'], "recommended_videos": video_data[1], "proxy":proxy
})
@app.get("/search", response_class=HTMLResponse)
async def search(q:str, request: Request, page:Union[int, None]=1, proxy: Union[str] = Cookie(None)):
search_results = await getSearchData(q, page)
return templates.TemplateResponse("search.html", {"request": request, "results":search_results, "word":q, "next":f"/search?q={q}&page={page + 1}", "proxy":proxy})
@app.get("/hashtag/{tag}")
async def hashtag_search(tag:str):
return RedirectResponse(f"/search?q={tag}", status_code=302)
@app.get("/channel/{channelid}", response_class=HTMLResponse)
async def channel(channelid:str, request: Request, proxy: Union[str] = Cookie(None)):
channel_data = await getChannelData(channelid)
latest_videos = channel_data[0]
channel_info = channel_data[1]
shorts_videos = []
try:
shorts_data = await fetch_short_data_from_external_api(channelid)
if isinstance(shorts_data, list):
shorts_videos = shorts_data
elif isinstance(shorts_data, dict) and "videos" in shorts_data:
shorts_videos = shorts_data["videos"]
except Exception as e:
shorts_videos = []
return templates.TemplateResponse("channel.html", {
"request": request,
"results": latest_videos,
"shorts": shorts_videos,
"channel_name": channel_info["channel_name"],
"channel_icon": channel_info["channel_icon"],
"channel_profile": channel_info["channel_profile"],
"cover_img_url": channel_info["author_banner"],
"subscribers_count": channel_info["subscribers_count"],
"tags": channel_info["tags"],
"proxy": proxy
})
@app.get("/playlist", response_class=HTMLResponse)
async def playlist(list:str, request: Request, page:Union[int, None]=1, proxy: Union[str] = Cookie(None)):
playlist_data = await getPlaylistData(list, str(page))
return templates.TemplateResponse("search.html", {"request": request, "results": playlist_data, "word": "", "next": f"/playlist?list={list}&page={page + 1}", "proxy": proxy})
@app.get("/comments", response_class=HTMLResponse)
async def comments(request: Request, v:str):
comments_data = await getCommentsData(v)
return templates.TemplateResponse("comments.html", {"request": request, "comments": comments_data})
@app.get("/thumbnail")
def thumbnail(v:str):
return Response(content = requests.get(f"https://img.youtube.com/vi/{v}/0.jpg").content, media_type="image/jpeg")
@app.get("/suggest")
def suggest(keyword:str):
res_text = requests.get("http://www.google.com/complete/search?client=youtube&hl=ja&ds=yt&q=" + urllib.parse.quote(keyword), headers=getRandomUserAgent()).text
return [i[0] for i in json.loads(res_text[19:-1])[1]]