import json import requests import urllib.parse import time import datetime import random import os import subprocess from cache import cache import ast # 3 => (3.0, 1.5) => (1.5, 1) max_api_wait_time = (1.5, 1) # 10 => 10 max_time = 10 user_agents = [ 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Safari/605.1.15', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3864.0 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.61 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:62.0) Gecko/20100101 Firefox/62.0', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:67.0) Gecko/20100101 Firefox/67.0', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:68.0) Gecko/20100101 Firefox/68.0', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:61.0) Gecko/20100101 Firefox/61.0', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:92.0) Gecko/20100101 Firefox/92.0', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36 Edge/17.17134', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.61 Safari/537.36 Edg/94.0.992.31', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0 Safari/605.1.15', 'Mozilla/5.0 (iPhone; CPU iPhone OS 12_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0 Mobile/15E148 Safari/604.1' ] def getRandomUserAgent(): user_agent = user_agents[random.randint(0, len(user_agents) - 1)] print(user_agent) return { 'User-Agent': user_agent } class InvidiousAPI: def __init__(self): self.all = ast.literal_eval(requests.get('https://raw.githubusercontent.com/yuto1106110/invidious-instance-dieu-eviter/refs/heads/main/data/valid.json', headers=getRandomUserAgent(), timeout=(1.0, 0.5)).text) self.video = self.all['video'] self.playlist = self.all['playlist'] self.search = self.all['search'] self.channel = self.all['channel'] self.comments = self.all['comments'] self.check_video = False def info(self): return { 'API': self.all, 'checkVideo': self.check_video } invidious_api = InvidiousAPI() url = requests.get('https://raw.githubusercontent.com/yuto1106110/invidious-instance-dieu-eviter/refs/heads/main/data/valid.json', headers=getRandomUserAgent()).text.rstrip() version = "1.0" new_instance_version = "1.3.2" os.system("chmod 777 ./yukiverify") class APITimeoutError(Exception): pass class UnallowedBot(Exception): pass def isJSON(json_str): try: json.loads(json_str) return True except json.JSONDecodeError as jde: pass return False def updateList(list, str): list.append(str) list.remove(str) return list def requestAPI(path, api_urls): starttime = time.time() for api in api_urls: if time.time() - starttime >= max_time - 1: break try: print(api + 'api/v1' + path) res = requests.get(api + 'api/v1' + path, headers=getRandomUserAgent(), timeout=max_api_wait_time) if res.status_code == requests.codes.ok and isJSON(res.text): if invidious_api.check_video and path.startswith('/video/'): # 動画の有無をチェックする場合 video_res = requests.get(json.loads(res.text)['formatStreams'][0]['url'], headers=getRandomUserAgent(), timeout=(3.0, 0.5)) if not 'video' in video_res.headers['Content-Type']: print(f"No Video(True)({video_res.headers['Content-Type']}): {api}") updateList(api_urls, api) continue if path.startswith('/channel/') and json.loads(res.text)["latestvideo"] == []: print(f"No Channel: {api}") updateList(api_urls, api) continue print(f"Success({invidious_api.check_video})({path.split('/')[1].split('?')[0]}): {api}") return res.text elif isJSON(res.text): # ステータスコードが200ではないかつ内容がJSON形式の場合 print(f"Returned Err0r(JSON): {api} ('{json.loads(res.text)['error'].replace('error', 'err0r')}')") updateList(api_urls, api) else: # ステータスコードが200ではないかつ内容がJSON形式ではない場合 print(f"Returned Err0r: {api} ('{res.text[:100]}')") updateList(api_urls, api) except: # 例外等が発生した場合 print(f"Err0r: {api}") updateList(api_urls, api) raise APITimeoutError("APIがタイムアウトしました") def getInfo(request): return json.dumps([version, os.environ.get('RENDER_EXTERNAL_URL'), str(request.scope["headers"]), str(request.scope['router'])[39:-2]]) failed = "Load Failed" def gettingVideoData(videoid): t = json.loads(requestAPI(f"/videos/{urllib.parse.quote(videoid)}", invidious_api.video)) if 'recommendedvideo' in t: recommended_videos = t["recommendedvideo"] elif 'recommendedVideos' in t: recommended_videos = t["recommendedVideos"] else: recommended_videos = { "videoId": "failed", "title": "failed", "authorId": "failed", "author": "failed", "lengthSeconds": 0, "viewCountText": "Load Failed" } adaptive = t.get('adaptiveFormats', []) streamUrls = [ { 'url': stream['url'], 'resolution': stream['resolution'] } for stream in adaptive if stream.get('container') == 'webm' and stream.get('resolution') ] return [ { 'video_urls': list(reversed([i["url"] for i in t["formatStreams"]]))[:2], 'description_html': t["descriptionHtml"].replace("\n", "
"), 'title': t["title"], 'length_text': str(datetime.timedelta(seconds=t["lengthSeconds"])), 'author_id': t["authorId"], 'author': t["author"], 'author_thumbnails_url': t["authorThumbnails"][-1]["url"], 'view_count': t["viewCount"], 'like_count': t["likeCount"], 'subscribers_count': t["subCountText"], 'streamUrls': streamUrls # ここに高画質ストリームが収納されます }, [ { "video_id": i["videoId"], "title": i["title"], "author_id": i["authorId"], "author": i["author"], "length_text": str(datetime.timedelta(seconds=i["lengthSeconds"])), "view_count_text": i["viewCountText"] } for i in recommended_videos ] ] def getVideoData(videoid): t = json.loads(requestAPI(f"/videos/{urllib.parse.quote(videoid)}", invidious_api.video)) # 推奨動画の情報(キー名の違いに対応) if 'recommendedvideo' in t: recommended_videos = t["recommendedvideo"] elif 'recommendedVideos' in t: recommended_videos = t["recommendedVideos"] else: recommended_videos = [{ "videoId": failed, "title": failed, "authorId": failed, "author": failed, "lengthSeconds": 0, "viewCountText": "Load Failed" }] # 【新規追加】adaptiveFormats から高画質動画と音声の URL を抽出する adaptiveFormats = t.get("adaptiveFormats", []) highstream_url = None audio_url = None # 高画質: container == 'webm' かつ resolution == '1080p' のストリーム for stream in adaptiveFormats: if stream.get("container") == "webm" and stream.get("resolution") == "1080p": highstream_url = stream.get("url") break if not highstream_url: for stream in adaptiveFormats: if stream.get("container") == "webm" and stream.get("resolution") == "720p": highstream_url = stream.get("url") break # 音声: container == 'm4a' かつ audioQuality == 'AUDIO_QUALITY_MEDIUM' のストリーム for stream in adaptiveFormats: if stream.get("container") == "m4a" and stream.get("audioQuality") == "AUDIO_QUALITY_MEDIUM": audio_url = stream.get("url") break adaptive = t.get('adaptiveFormats', []) streamUrls = [ { 'url': stream['url'], 'resolution': stream['resolution'] } for stream in adaptive if stream.get('container') == 'webm' and stream.get('resolution') ] return [ { # 既存処理(ここでは formatStreams のURLを逆順にして上位2件を使用) 'video_urls': list(reversed([i["url"] for i in t["formatStreams"]]))[:2], # 追加:高画質動画と音声のURL 'highstream_url': highstream_url, 'audio_url': audio_url, 'description_html': t["descriptionHtml"].replace("\n", "
"), 'title': t["title"], 'length_text': str(datetime.timedelta(seconds=t["lengthSeconds"])), 'author_id': t["authorId"], 'author': t["author"], 'author_thumbnails_url': t["authorThumbnails"][-1]["url"], 'view_count': t["viewCount"], 'like_count': t["likeCount"], 'subscribers_count': t["subCountText"], 'streamUrls': streamUrls }, [ { "video_id": i["videoId"], "title": i["title"], "author_id": i["authorId"], "author": i["author"], "length_text": str(datetime.timedelta(seconds=i["lengthSeconds"])), "view_count_text": i["viewCountText"] } for i in recommended_videos] ] def getSearchData(q, page): def formatSearchData(data_dict): if data_dict["type"] == "video": return { "type": "video", "title": data_dict["title"] if 'title' in data_dict else failed, "id": data_dict["videoId"] if 'videoId' in data_dict else failed, "authorId": data_dict["authorId"] if 'authorId' in data_dict else failed, "author": data_dict["author"] if 'author' in data_dict else failed, "published": data_dict["publishedText"] if 'publishedText' in data_dict else failed, "length": str(datetime.timedelta(seconds=data_dict["lengthSeconds"])), "view_count_text": data_dict["viewCountText"] } elif data_dict["type"] == "playlist": return { "type": "playlist", "title": data_dict["title"] if 'title' in data_dict else failed, "id": data_dict['playlistId'] if 'playlistId' in data_dict else failed, "thumbnail": data_dict["playlistThumbnail"] if 'playlistThumbnail' in data_dict else failed, "count": data_dict["videoCount"] if 'videoCount' in data_dict else failed } elif data_dict["authorThumbnails"][-1]["url"].startswith("https"): return { "type": "channel", "author": data_dict["author"] if 'author' in data_dict else failed, "id": data_dict["authorId"] if 'authorId' in data_dict else failed, "thumbnail": data_dict["authorThumbnails"][-1]["url"] if 'authorThumbnails' in data_dict and len(data_dict["authorThumbnails"]) and 'url' in data_dict["authorThumbnails"][-1] else failed } else: return { "type": "channel", "author": data_dict["author"] if 'author' in data_dict else failed, "id": data_dict["authorId"] if 'authorId' in data_dict else failed, "thumbnail": "https://" + data_dict['authorThumbnails'][-1]['url'] } # "datas"というのは気持ち悪いかもしれないが、複数のデータが入っていると明示できるという # メリットの方がコードを書く上では大きい datas_dict = json.loads(requestAPI(f"/search?q={urllib.parse.quote(q)}&page={page}&hl=jp", invidious_api.search)) return [formatSearchData(data_dict) for data_dict in datas_dict] def getChannelData(channelid): t = json.loads(requestAPI(f"/channels/{urllib.parse.quote(channelid)}", invidious_api.channel)) if 'latestvideo' in t: latest_videos = t['latestvideo'] elif 'latestVideos' in t: latest_videos = t['latestVideos'] else: latest_videos = { "title": failed, "videoId": failed, "authorId": failed, "author": failed, "publishedText": failed, "viewCountText": "0", "lengthSeconds": "0" } return [ [ { # 直近の動画 "type":"video", "title": i["title"], "id": i["videoId"], "authorId": t["authorId"], "author": t["author"], "published": i["publishedText"], "view_count_text": i['viewCountText'], "length_str": str(datetime.timedelta(seconds=i["lengthSeconds"])) } for i in latest_videos ], { # チャンネル情報 "channel_name": t["author"], "channel_icon": t["authorThumbnails"][-1]["url"], "channel_profile": t["descriptionHtml"], "author_banner": urllib.parse.quote(t["authorBanners"][0]["url"], safe="-_.~/:") if 'authorBanners' in t and len(t['authorBanners']) else '', "subscribers_count": t["subCount"], "tags": t["tags"] } ] def getPlaylistData(listid, page): t = json.loads(requestAPI(f"/playlists/{urllib.parse.quote(listid)}?page={urllib.parse.quote(page)}", invidious_api.playlist))["videos"] return [{"title": i["title"], "id": i["videoId"], "authorId": i["authorId"], "author": i["author"], "type": "video"} for i in t] def getCommentsData(videoid): t = json.loads(requestAPI(f"/comments/{urllib.parse.quote(videoid)}?hl=jp", invidious_api.comments))["comments"] return [{"author": i["author"], "authoricon": i["authorThumbnails"][-1]["url"], "authorid": i["authorId"], "body": i["contentHtml"].replace("\n", "
")} for i in t] ''' 使われていないし戻り値も設定されていないためコメントアウト def get_replies(videoid, key): t = json.loads(requestAPI(f"/comments/{videoid}?hmac_key={key}&hl=jp&format=html", invidious_api.comments))["contentHtml"] ''' def checkCookie(cookie): isTrue = True if cookie == "True" else False return isTrue def getVerifyCode(): try: result = subprocess.run(["./yukiverify"], encoding='utf-8', stdout=subprocess.PIPE) hashed_password = result.stdout.strip() return hashed_password except subprocess.CalledProcessError as e: print(f"getVerifyCode__Error: {e}") return None from fastapi import FastAPI, Depends, Form from fastapi import Response, Cookie, Request from fastapi.responses import HTMLResponse, PlainTextResponse from fastapi.responses import RedirectResponse as redirect from fastapi.middleware.gzip import GZipMiddleware from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from typing import Union from fastapi import Form app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None) app.mount("/js", StaticFiles(directory="./statics/js"), name="static") app.mount("/css", StaticFiles(directory="./statics/css"), name="static") app.mount("/img", StaticFiles(directory="./statics/img"), name="static") app.mount("/genesis", StaticFiles(directory="./blog", html=True), name="static") app.add_middleware(GZipMiddleware, minimum_size=1000) from fastapi.templating import Jinja2Templates template = Jinja2Templates(directory='templates').TemplateResponse no_robot_meta_tag = '' @app.get("/", response_class=HTMLResponse) def home(response: Response, request: Request, yuki: Union[str] = Cookie(None)): if checkCookie(yuki): response.set_cookie("yuki", "True", max_age=60 * 60 * 24 * 7) return template("home.html", {"request": request}) print(checkCookie(yuki)) return redirect("/genesis") @app.get('/watch', response_class=HTMLResponse) def video(v: str, response: Response, request: Request, yuki: Union[str, None] = Cookie(None), proxy: Union[str, None] = Cookie(None)): if not (checkCookie(yuki)): return redirect("/") # 埋め込み再生がオンの場合は /ume にリダイレクト if request.cookies.get("ume_toggle", "false") == "true": return redirect(f"/ume?v={v}") response.set_cookie("yuki", "True", max_age=7*24*60*60) video_data = getVideoData(v) ''' return [ { 'video_urls': list(reversed([i["url"] for i in t["formatStreams"]]))[:2], 'description_html': t["descriptionHtml"].replace("\n", "
"), 'title': t["title"], 'length_text': str(datetime.timedelta(seconds=t["lengthSeconds"])) 'author_id': t["authorId"], 'author': t["author"], 'author_thumbnails_url': t["authorThumbnails"][-1]["url"], 'view_count': t["viewCount"], 'like_count': t["likeCount"], 'subscribers_count': t["subCountText"] }, [ { "video_id": i["videoId"], "title": i["title"], "author_id": i["authorId"], "author": i["author"], "length_text": str(datetime.timedelta(seconds=i["lengthSeconds"])), "view_count_text": i["viewCountText"] } for i in recommended_videos ] ] ''' response.set_cookie("yuki", "True", max_age=60 * 60 * 24 * 7) return template('video.html', { "request": request, "videoid": v, "videourls": video_data[0]['video_urls'], "description": video_data[0]['description_html'], "video_title": video_data[0]['title'], "author_id": video_data[0]['author_id'], "author_icon": video_data[0]['author_thumbnails_url'], "author": video_data[0]['author'], "length_text": video_data[0]['length_text'], "view_count": video_data[0]['view_count'], "like_count": video_data[0]['like_count'], "subscribers_count": video_data[0]['subscribers_count'], "recommended_videos": video_data[1], "proxy":proxy }) @app.get('/w', response_class=HTMLResponse) def video(v:str, response: Response, request: Request, yuki: Union[str] = Cookie(None), proxy: Union[str] = Cookie(None)): # v: video_id if not(checkCookie(yuki)): return redirect("/") response.set_cookie(key="yuki", value="True", max_age=7*24*60*60) video_data = getVideoData(v) ''' return [ { 'video_urls': list(reversed([i["url"] for i in t["formatStreams"]]))[:2], 'highstream_url': highstream_url, 'audio_url': audio_url, 'description_html': t["descriptionHtml"].replace("\n", "
"), 'title': t["title"], 'length_text': str(datetime.timedelta(seconds=t["lengthSeconds"])) 'author_id': t["authorId"], 'author': t["author"], 'author_thumbnails_url': t["authorThumbnails"][-1]["url"], 'view_count': t["viewCount"], 'like_count': t["likeCount"], 'subscribers_count': t["subCountText"] }, [ { "video_id": i["videoId"], "title": i["title"], "author_id": i["authorId"], "author": i["author"], "length_text": str(datetime.timedelta(seconds=i["lengthSeconds"])), "view_count_text": i["viewCountText"] } for i in recommended_videos ] ] ''' response.set_cookie("yuki", "True", max_age=60 * 60 * 24 * 7) return template('hiquo.html', { "request": request, "videoid": v, "videourls": video_data[0]['video_urls'], "highstream_url": video_data[0]['highstream_url'], "audio_url": video_data[0]['audio_url'], "description": video_data[0]['description_html'], "video_title": video_data[0]['title'], "author_id": video_data[0]['author_id'], "author_icon": video_data[0]['author_thumbnails_url'], "author": video_data[0]['author'], "length_text": video_data[0]['length_text'], "view_count": video_data[0]['view_count'], "like_count": video_data[0]['like_count'], "subscribers_count": video_data[0]['subscribers_count'], "recommended_videos": video_data[1], "proxy":proxy }) @app.get('/ume', response_class=HTMLResponse) def ume_video(v: str, response: Response, request: Request, yuki: Union[str, None] = Cookie(None), proxy: Union[str, None] = Cookie(None)): if not checkCookie(yuki): return redirect("/") response.set_cookie("yuki", "True", max_age=7*24*60*60) video_data = getVideoData(v) ''' return [ { 'video_urls': list(reversed([i["url"] for i in t["formatStreams"]]))[:2], 'description_html': t["descriptionHtml"].replace("\n", "
"), 'title': t["title"], 'length_text': str(datetime.timedelta(seconds=t["lengthSeconds"])) 'author_id': t["authorId"], 'author': t["author"], 'author_thumbnails_url': t["authorThumbnails"][-1]["url"], 'view_count': t["viewCount"], 'like_count': t["likeCount"], 'subscribers_count': t["subCountText"] }, [ { "title": i["title"], "author_id": i["authorId"], "author": i["author"], "length_text": str(datetime.timedelta(seconds=i["lengthSeconds"])), "view_count_text": i["viewCountText"] } for i in recommended_videos ] ] ''' response.set_cookie("yuki", "True", max_age=60 * 60 * 24 * 7) return template('ume.html', { "request": request, "videoid": v, "videourls": video_data[0]['video_urls'], "description": video_data[0]['description_html'], "video_title": video_data[0]['title'], "author_id": video_data[0]['author_id'], "author_icon": video_data[0]['author_thumbnails_url'], "author": video_data[0]['author'], "length_text": video_data[0]['length_text'], "view_count": video_data[0]['view_count'], "like_count": video_data[0]['like_count'], "subscribers_count": video_data[0]['subscribers_count'], "recommended_videos": video_data[1], "proxy":proxy }) @app.get('/ww', response_class=HTMLResponse) def video(v:str, response: Response, request: Request, yuki: Union[str] = Cookie(None), proxy: Union[str] = Cookie(None)): # v: video_id if not(checkCookie(yuki)): return redirect("/") response.set_cookie(key="yuki", value="True", max_age=7*24*60*60) video_data = gettingVideoData(v) # gettingVideoDataを使用 ''' return [ { 'video_urls': list(reversed([i["url"] for i in t["formatStreams"]]))[:2], 'description_html': t["descriptionHtml"].replace("\n", "
"), 'title': t["title"], 'length_text': str(datetime.timedelta(seconds=t["lengthSeconds"])) 'author_id': t["authorId"], 'author': t["author"], 'author_thumbnails_url': t["authorThumbnails"][-1]["url"], 'view_count': t["viewCount"], 'like_count': t["likeCount"], 'subscribers_count': t["subCountText"], 'streamUrls': streamUrls }, [ { "video_id": i["videoId"], "title": i["title"], "author_id": i["authorId"], "author": i["author"], "length_text": str(datetime.timedelta(seconds=i["lengthSeconds"])), "view_count_text": i["viewCountText"] } for i in recommended_videos ] ] ''' response.set_cookie("yuki", "True", max_age=60 * 60 * 24 * 7) return template('watch.html', { # watch.htmlを準備してください( ・∇・)。通常の再生 + 画質を選択できる機能があると良い。 "request": request, # 画質のデータはstreamUrls.resolutionに入っています。ストリームURLはstreamUrls.url。 "videoid": v, "videourls": video_data[0]['video_urls'], "description": video_data[0]['description_html'], "video_title": video_data[0]['title'], "author_id": video_data[0]['author_id'], "author_icon": video_data[0]['author_thumbnails_url'], "author": video_data[0]['author'], "length_text": video_data[0]['length_text'], "view_count": video_data[0]['view_count'], "like_count": video_data[0]['like_count'], "subscribers_count": video_data[0]['subscribers_count'], "streamUrls": video_data[0]['streamUrls'], #ここに高画質ストリーム(対応する画質を含む)を収納 "recommended_videos": video_data[1], "proxy":proxy }) @app.get("/search", response_class=HTMLResponse) def search(q:str, response: Response, request: Request, page:Union[int, None]=1, yuki: Union[str] = Cookie(None), proxy: Union[str] = Cookie(None)): if not(checkCookie(yuki)): return redirect("/") response.set_cookie("yuki", "True", max_age=60 * 60 * 24 * 7) return template("search.html", {"request": request, "results":getSearchData(q, page), "word":q, "next":f"/search?q={q}&page={page + 1}", "proxy":proxy}) @app.get("/hashtag/{tag}") def search(tag:str, response: Response, request: Request, page:Union[int, None]=1, yuki: Union[str] = Cookie(None)): if not(checkCookie(yuki)): return redirect("/") return redirect(f"/search?q={tag}") @app.get("/channel/{channelid}", response_class=HTMLResponse) def channel(channelid:str, response: Response, request: Request, yuki: Union[str] = Cookie(None), proxy: Union[str] = Cookie(None)): if not(checkCookie(yuki)): return redirect("/") response.set_cookie("yuki", "True", max_age=60 * 60 * 24 * 7) t = getChannelData(channelid) return template("channel.html", {"request": request, "results": t[0], "channel_name": t[1]["channel_name"], "channel_icon": t[1]["channel_icon"], "channel_profile": t[1]["channel_profile"], "cover_img_url": t[1]["author_banner"], "subscribers_count": t[1]["subscribers_count"], "proxy": proxy}) @app.get("/playlist", response_class=HTMLResponse) def playlist(list:str, response: Response, request: Request, page:Union[int, None]=1, yuki: Union[str] = Cookie(None), proxy: Union[str] = Cookie(None)): if not(checkCookie(yuki)): return redirect("/") response.set_cookie("yuki", "True", max_age=60 * 60 * 24 * 7) return template("search.html", {"request": request, "results": getPlaylistData(list, str(page)), "word": "", "next": f"/playlist?list={list}", "proxy": proxy}) @app.get("/comments") def comments(request: Request, v:str): return template("comments.html", {"request": request, "comments": getCommentsData(v)}) @app.get("/thumbnail") def thumbnail(v:str): return Response(content = requests.get(f"https://img.youtube.com/vi/{v}/0.jpg").content, media_type=r"image/jpeg") @app.get("/suggest") def suggest(keyword:str): return [i[0] for i in json.loads(requests.get("http://www.google.com/complete/search?client=youtube&hl=ja&ds=yt&q=" + urllib.parse.quote(keyword), headers=getRandomUserAgent()).text[19:-1])[1]] @cache(seconds=120) def getSource(name): return requests.get(f'https://raw.githubusercontent.com/LunaKamituki/yuki-source/refs/heads/main/{name}.html', headers=getRandomUserAgent()).text @app.get("/bbs", response_class=HTMLResponse) def bbs(request: Request, name: Union[str, None] = "", seed:Union[str, None]="", channel:Union[str, None]="main", verify:Union[str, None]="false", yuki: Union[str] = Cookie(None)): if not(checkCookie(yuki)): return redirect("/") res = HTMLResponse(no_robot_meta_tag + requests.get(f"{url}bbs?name={urllib.parse.quote(name)}&seed={urllib.parse.quote(seed)}&channel={urllib.parse.quote(channel)}&verify={urllib.parse.quote(verify)}", cookies={"yuki":"True"}).text.replace('AutoLink(xhr.responseText);', 'urlConvertToLink(xhr.responseText);') + getSource('bbs')) return res @cache(seconds=5) def getCachedBBSAPI(verify, channel): return requests.get(f"{url}bbs/api?t={urllib.parse.quote(str(int(time.time()*1000)))}&verify={urllib.parse.quote(verify)}&channel={urllib.parse.quote(channel)}", cookies={"yuki":"True"}).text @app.get("/bbs/api", response_class=HTMLResponse) def bbsAPI(request: Request, t: str, channel:Union[str, None]="main", verify: Union[str, None] = "false"): return getCachedBBSAPI(verify, channel) @app.get("/bbs/result") def write_bbs(request: Request, name: str = "", message: str = "", seed:Union[str, None] = "", channel:Union[str, None]="main", verify:Union[str, None]="false", yuki: Union[str] = Cookie(None)): if not(checkCookie(yuki)): return redirect("/") if 'Google-Apps-Script' in str(request.scope["headers"][1][1]): raise UnallowedBot("GASのBotは許可されていません") params = { 'name': urllib.parse.quote(name), 'message': urllib.parse.quote(message), 'seed': urllib.parse.quote(seed), 'channel': urllib.parse.quote(channel), 'verify': urllib.parse.quote(verify), 'info': urllib.parse.quote(getInfo(request)), 'serververify': getVerifyCode() } url_querys = '' for key, value in params.items(): url_querys += f'{key}={value}&' if url_querys != '': url_querys = '?' + url_querys[:-1] t = requests.get(f"{url}bbs/result" + url_querys, cookies={"yuki": "True"}, allow_redirects=False) if t.status_code != 307: return HTMLResponse(no_robot_meta_tag + t.text.replace('AutoLink(xhr.responseText);', 'urlConvertToLink(xhr.responseText);') + getSource('bbs')) return redirect(f"/bbs?name={urllib.parse.quote(name)}&seed={urllib.parse.quote(seed)}&channel={urllib.parse.quote(channel)}&verify={urllib.parse.quote(verify)}") @cache(seconds=120) def getCachedBBSHow(): return requests.get(f"{url}bbs/how").text @app.get("/bbs/how", response_class=PlainTextResponse) def view_commonds(request: Request, yuki: Union[str] = Cookie(None)): if not(checkCookie(yuki)): return redirect("/") return getCachedBBSHow() @app.get("/info", response_class=HTMLResponse) def viewlist(response: Response, request: Request, yuki: Union[str] = Cookie(None)): if not(checkCookie(yuki)): return redirect("/") response.set_cookie("yuki", "True", max_age=60 * 60 * 24 * 7) return template("info.html", {"request": request, "Youtube_API": invidious_api.video[0], "Channel_API": invidious_api.channel[0], "comments": invidious_api.comments[0]}) @app.get("/reset", response_class=PlainTextResponse) def home(): global url, invidious_api url = requests.get('https://raw.githubusercontent.com/yuto1106110/yuto-yuki-youtube-1/main/APItati', headers=getRandomUserAgent()).text.rstrip() invidious_api = InvidiousAPI() return 'Success' @app.get("/version", response_class=PlainTextResponse) def displayVersion(): return str({'version': version, 'new_instance_version': new_instance_version}) @app.get("/api/update", response_class=PlainTextResponse) def updateAllAPI(): global invidious_api return str((invidious_api := InvidiousAPI()).info()) @app.get("/api/{api_name}", response_class=PlainTextResponse) def displayAPI(api_name: str): match api_name: case 'all': api_value = invidious_api.info() case 'video': api_value = invidious_api.video case 'search': api_value = invidious_api.search case 'channel': api_value = invidious_api.channel case 'comments': api_value = invidious_api.comments case 'playlist': api_value = invidious_api.playlist case _: api_value = f'API Name Error: {api_name}' return str(api_value) @app.get("/api/{api_name}/next", response_class=PlainTextResponse) def rotateAPI(api_name: str): match api_name: case 'video': updateList(invidious_api.video, invidious_api.video[0]) case 'search': updateList(invidious_api.search, invidious_api.search[0]) case 'channel': updateList(invidious_api.channel, invidious_api.channel[0]) case 'comments': updateList(invidious_api.comments, invidious_api.comments[0]) case 'playlist': updateList(invidious_api.playlist, invidious_api.playlist[0]) case _: return f'API Name Error: {api_name}' return 'Finish' @app.get("/api/video/check", response_class=PlainTextResponse) def displayCheckVideo(): return str(invidious_api.check_video) @app.get("/api/video/check/toggle", response_class=PlainTextResponse) def toggleVideoCheck(): global invidious_api invidious_api.check_video = not invidious_api.check_video return f'{not invidious_api.check_video} to {invidious_api.check_video}' @app.get("/proxy", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("proxy.html", {"request": request}) @app.get("/rammer", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("rammerhead.html", {"request": request}) @app.get("/shadow", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("shadow.html", {"request": request}) @app.get("/inbox", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("inbox.html", {"request": request}) @app.get("/help", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("help.html", {"request": request}) @app.get("/proxypage", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("proxy.html", {"request": request}) @app.get("/url", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("url.html", {"request": request}) @app.get("/light", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("light.html", {"request": request}) @app.get("/sitsumon", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("otoiawase.html", {"request": request}) @app.get("/news", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("news.html", {"request": request}) @app.get("/space", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("space.html", {"request": request}) @app.get("/update", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("update.html", {"request": request}) @app.get("/others", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("others.html", {"request": request}) @app.get("/qanda", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("Q&A.html", {"request": request}) @app.get("/1v1lol", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("1v1-lol.html", {"request": request}) @app.get("/drive", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("drive.html", {"request": request}) @app.get("/paper", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("paper.html", {"request": request}) @app.get("/snow", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("snow.html", {"request": request}) @app.get("/2048", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("block.html", {"request": request}) @app.get("/ose", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("ose.html", {"request": request}) @app.get("/game", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("game.html", {"request": request}) @app.get("/and", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("android.html", {"request": request}) @app.get("/cone", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("cone.html", {"request": request}) @app.get("/usa", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("usa.html", {"request": request}) @app.get("/chat", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("chat.html", {"request": request}) @app.get("/ball", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("ball.html", {"request": request}) @app.get("/history", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("history.html", {"request": request}) @app.get("/bj", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("bj.html", {"request": request}) @app.get("/tools", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("tools.html", {"request": request}) @app.get("/news", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("news.html", {"request": request}) @app.get("/re", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("re.html", {"request": request}) @app.get("/setting", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("settings.html", {"request": request}) @app.get("/among", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("among.html", {"request": request}) @app.get("/among-1", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("among-1.html", {"request": request}) @app.get("/among-2", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("among-2.html", {"request": request}) @app.get("/interland", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("interland.html", {"request": request}) @app.get("/denki", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("denki.html", {"request": request}) @app.get("/dog", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("dog.html", {"request": request}) @app.get("/dash", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("dash.html", {"request": request}) @app.get("/dairan", response_class=HTMLResponse) def list_page(response: Response, request: Request): return template("dairan.html", {"request": request}) @app.exception_handler(500) def error500(request: Request, __): return template("error.html", {"request": request, "context": '500 Internal Server Error'}, status_code=500) @app.exception_handler(404) def error404(request: Request, __): return template("error.html", {"request": request, "context": '404 Error、あれれ'}, status_code=404) @app.exception_handler(APITimeoutError) def apiWait(request: Request, exception: APITimeoutError): return template("apiTimeout.html", {"request": request}, status_code=504) @app.exception_handler(UnallowedBot) def returnToUnallowedBot(request: Request, exception: UnallowedBot): return template("error.html", {"request": request, "context": '403 Forbidden'}, status_code=403)