import os import re import time import json import subprocess import requests from datetime import datetime, timezone from bs4 import BeautifulSoup # ===== Channel.io 設定 ===== GET_URL = "https://desk-api.channel.io/desk/channels/200605/groups/519217/messages" POST_URL = GET_URL PARAMS = { "sortOrder": "desc", "limit": 36, "logFolded": "false", } X_ACCOUNT = os.getenv("channeliotokenbot2") if not X_ACCOUNT: raise RuntimeError("環境変数 channeliotokenokenbot2 が設定されていません") HEADERS_GET = { "accept": "application/json", "accept-language": "ja", "x-account": X_ACCOUNT, } HEADERS_POST = { "accept": "application/json", "accept-language": "ja", "content-type": "application/json", "x-account": X_ACCOUNT, } # ===== ssyoutube ===== SSYOUTUBE_URL = "https://ssyoutube.online/yt-video-detail/" # ===== tfLink クライアント ===== from tflink import TFLinkClient tf_client = TFLinkClient() # 匿名アップロード # ===== Utils ===== def parse_updated_at(value): if isinstance(value, (int, float)): return datetime.fromtimestamp(value / 1000, tz=timezone.utc) elif isinstance(value, str): return datetime.fromisoformat(value.replace("Z", "+00:00")) return None def extract_youtube_id(text): patterns = [ r"v=([A-Za-z0-9_-]{11})", r"youtu\.be/([A-Za-z0-9_-]{11})", ] for p in patterns: m = re.search(p, text) if m: return m.group(1) return None # ===== ssyoutube HTML 解析 ===== def fetch_download_links(youtube_url): res = requests.post( SSYOUTUBE_URL, data={"videoURL": youtube_url}, timeout=30, headers={ "User-Agent": "Mozilla/5.0", "Referer": "https://ssyoutube.online/", } ) res.raise_for_status() soup = BeautifulSoup(res.text, "lxml") buttons = soup.select("button[data-url]") results = [] for btn in buttons: url = btn.get("data-url") quality = btn.get("data-quality") has_audio = btn.get("data-has-audio") if not url: continue results.append({ "url": url, "quality": quality, "has_audio": has_audio, }) return results # ===== 動画と音声の選別 ===== def choose_best_streams(items): video_only = [] audio_only = [] for item in items: url = item["url"] quality = item["quality"] or "" has_audio = item["has_audio"] if url.endswith(".m4a") or "audio" in quality.lower(): audio_only.append(item) else: video_only.append(item) if not video_only: video = None else: video = sorted(video_only, key=lambda x: int(re.sub(r"[^\d]", "", x["quality"] or "0")), reverse=True)[0] if not audio_only: audio = None else: audio = sorted(audio_only, key=lambda x: int(re.sub(r"[^\d]", "", x["quality"] or "0")), reverse=True)[0] return video, audio # ===== 結合 ===== def merge_video_audio(video_url, audio_url, out_file): cmd = [ "ffmpeg", "-y", "-i", video_url, "-i", audio_url, "-c", "copy", out_file, ] result = subprocess.run(cmd, capture_output=True) if result.returncode != 0: print("FFmpeg merge error:", result.stderr.decode()) return False return True # ===== tfLink アップロード ===== def upload_to_tflink(file_path): res = tf_client.upload(file_path) return res.download_link def build_links(items, upload_link): lines = [] for item in items: url = item["url"] quality = item["quality"] line = f' {quality}' lines.append(line) lines.append(f"結合ファイルダウンロード: {upload_link}") return "\n".join(lines) def send_to_channel(text): payload = { "requestId": f"desk-web-{int(time.time() * 1000)}", "blocks": [ { "type": "text", "value": text } ], "buttons": None, "form": None, "webPage": None, "files": None, "customPayload": None } res = requests.post( POST_URL, headers=HEADERS_POST, data=json.dumps(payload), timeout=30 ) res.raise_for_status() # ===== Main ===== def main(): while True: try: res = requests.get( GET_URL, headers=HEADERS_GET, params=PARAMS, timeout=30, ) res.raise_for_status() messages = res.json().get("messages", []) latest_msg = None latest_time = None for msg in messages: plain_text = msg.get("plainText") updated_at = msg.get("updatedAt") if not plain_text or updated_at is None: continue t = parse_updated_at(updated_at) if not t: continue if latest_time is None or t > latest_time: latest_time = t latest_msg = msg if not latest_msg: time.sleep(10) continue text = latest_msg["plainText"] youtube_id = extract_youtube_id(text) if not youtube_id: time.sleep(10) continue youtube_url = f"https://www.youtube.com/watch?v={youtube_id}" items = fetch_download_links(youtube_url) if not items: time.sleep(10) continue video_stream, audio_stream = choose_best_streams(items) if not video_stream or not audio_stream: print("映像または音声ストリームが足りません") time.sleep(10) continue # ダウンロードして結合 temp_video = "video.mp4" temp_audio = "audio.mp4" merged = "merged_output.mp4" # ダウンロード with requests.get(video_stream["url"], stream=True) as r: with open(temp_video, "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) with requests.get(audio_stream["url"], stream=True) as r: with open(temp_audio, "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) if not merge_video_audio(temp_video, temp_audio, merged): print("結合失敗") time.sleep(10) continue # tfLink アップロード upload_link = upload_to_tflink(merged) message_text = build_links(items, upload_link) send_to_channel(message_text) print("送信完了") except Exception as e: print("エラー:", e) time.sleep(15) if __name__ == "__main__": main()