import os
import re
import time
import json
import subprocess
import requests
from datetime import datetime, timezone
from bs4 import BeautifulSoup
# ===== Channel.io 設定 =====
GET_URL = "https://desk-api.channel.io/desk/channels/200605/groups/519217/messages"
POST_URL = GET_URL
PARAMS = {
"sortOrder": "desc",
"limit": 36,
"logFolded": "false",
}
X_ACCOUNT = os.getenv("channeliotokenbot2")
if not X_ACCOUNT:
raise RuntimeError("環境変数 channeliotokenokenbot2 が設定されていません")
HEADERS_GET = {
"accept": "application/json",
"accept-language": "ja",
"x-account": X_ACCOUNT,
}
HEADERS_POST = {
"accept": "application/json",
"accept-language": "ja",
"content-type": "application/json",
"x-account": X_ACCOUNT,
}
# ===== ssyoutube =====
SSYOUTUBE_URL = "https://ssyoutube.online/yt-video-detail/"
# ===== tfLink クライアント =====
from tflink import TFLinkClient
tf_client = TFLinkClient() # 匿名アップロード
# ===== Utils =====
def parse_updated_at(value):
if isinstance(value, (int, float)):
return datetime.fromtimestamp(value / 1000, tz=timezone.utc)
elif isinstance(value, str):
return datetime.fromisoformat(value.replace("Z", "+00:00"))
return None
def extract_youtube_id(text):
patterns = [
r"v=([A-Za-z0-9_-]{11})",
r"youtu\.be/([A-Za-z0-9_-]{11})",
]
for p in patterns:
m = re.search(p, text)
if m:
return m.group(1)
return None
# ===== ssyoutube HTML 解析 =====
def fetch_download_links(youtube_url):
res = requests.post(
SSYOUTUBE_URL,
data={"videoURL": youtube_url},
timeout=30,
headers={
"User-Agent": "Mozilla/5.0",
"Referer": "https://ssyoutube.online/",
}
)
res.raise_for_status()
soup = BeautifulSoup(res.text, "lxml")
buttons = soup.select("button[data-url]")
results = []
for btn in buttons:
url = btn.get("data-url")
quality = btn.get("data-quality")
has_audio = btn.get("data-has-audio")
if not url:
continue
results.append({
"url": url,
"quality": quality,
"has_audio": has_audio,
})
return results
# ===== 動画と音声の選別 =====
def choose_best_streams(items):
video_only = []
audio_only = []
for item in items:
url = item["url"]
quality = item["quality"] or ""
has_audio = item["has_audio"]
if url.endswith(".m4a") or "audio" in quality.lower():
audio_only.append(item)
else:
video_only.append(item)
if not video_only:
video = None
else:
video = sorted(video_only,
key=lambda x: int(re.sub(r"[^\d]", "", x["quality"] or "0")),
reverse=True)[0]
if not audio_only:
audio = None
else:
audio = sorted(audio_only,
key=lambda x: int(re.sub(r"[^\d]", "", x["quality"] or "0")),
reverse=True)[0]
return video, audio
# ===== 結合 =====
def merge_video_audio(video_url, audio_url, out_file):
cmd = [
"ffmpeg", "-y",
"-i", video_url,
"-i", audio_url,
"-c", "copy",
out_file,
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
print("FFmpeg merge error:", result.stderr.decode())
return False
return True
# ===== tfLink アップロード =====
def upload_to_tflink(file_path):
res = tf_client.upload(file_path)
return res.download_link
def build_links(items, upload_link):
lines = []
for item in items:
url = item["url"]
quality = item["quality"]
line = f' {quality}'
lines.append(line)
lines.append(f"結合ファイルダウンロード: {upload_link}")
return "\n".join(lines)
def send_to_channel(text):
payload = {
"requestId": f"desk-web-{int(time.time() * 1000)}",
"blocks": [
{
"type": "text",
"value": text
}
],
"buttons": None,
"form": None,
"webPage": None,
"files": None,
"customPayload": None
}
res = requests.post(
POST_URL,
headers=HEADERS_POST,
data=json.dumps(payload),
timeout=30
)
res.raise_for_status()
# ===== Main =====
def main():
while True:
try:
res = requests.get(
GET_URL,
headers=HEADERS_GET,
params=PARAMS,
timeout=30,
)
res.raise_for_status()
messages = res.json().get("messages", [])
latest_msg = None
latest_time = None
for msg in messages:
plain_text = msg.get("plainText")
updated_at = msg.get("updatedAt")
if not plain_text or updated_at is None:
continue
t = parse_updated_at(updated_at)
if not t:
continue
if latest_time is None or t > latest_time:
latest_time = t
latest_msg = msg
if not latest_msg:
time.sleep(10)
continue
text = latest_msg["plainText"]
youtube_id = extract_youtube_id(text)
if not youtube_id:
time.sleep(10)
continue
youtube_url = f"https://www.youtube.com/watch?v={youtube_id}"
items = fetch_download_links(youtube_url)
if not items:
time.sleep(10)
continue
video_stream, audio_stream = choose_best_streams(items)
if not video_stream or not audio_stream:
print("映像または音声ストリームが足りません")
time.sleep(10)
continue
# ダウンロードして結合
temp_video = "video.mp4"
temp_audio = "audio.mp4"
merged = "merged_output.mp4"
# ダウンロード
with requests.get(video_stream["url"], stream=True) as r:
with open(temp_video, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
with requests.get(audio_stream["url"], stream=True) as r:
with open(temp_audio, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
if not merge_video_audio(temp_video, temp_audio, merged):
print("結合失敗")
time.sleep(10)
continue
# tfLink アップロード
upload_link = upload_to_tflink(merged)
message_text = build_links(items, upload_link)
send_to_channel(message_text)
print("送信完了")
except Exception as e:
print("エラー:", e)
time.sleep(15)
if __name__ == "__main__":
main()