Create riivivetube.py

This commit is contained in:
TheErrorExe 2025-07-03 14:45:59 +00:00 committed by GitHub
parent 7b0662e3ab
commit 3d582ba857
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

297
riivivetube.py Normal file
View File

@ -0,0 +1,297 @@
# work in progress
# videos and more things are not working
# search and trending is working
# swf files from yt2009wii
from flask import Flask, send_from_directory, send_file, request, Response, jsonify, stream_with_context
import os
import json
import re
import requests
import xml.etree.ElementTree as ET
from datetime import datetime
import subprocess
import time
import yt_dlp
import youtubei
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
app = Flask(__name__)
#stream_cache = {}
#CACHE_DURATION = 300
executor = ThreadPoolExecutor(max_workers=10)
CATEGORIES = ["trending", "music", "gaming", "sports", "news"]
DL_FOLDER = "./dl"
if not os.path.exists(DL_FOLDER):
os.makedirs(DL_FOLDER)
def get_first_video_id_from_route(category):
try:
url = f"http://127.0.0.1:5005/{category}"
response = requests.get(url)
if response.status_code != 200:
print(f"[{category}] HTTP ERROR : HTTP {response.status_code}")
return None
ns = {
'yt': 'http://www.youtube.com/xml/schemas/2015'
}
root = ET.fromstring(response.content)
entry = root.find('entry')
if entry is None:
print(f"[{category}] <entry> not found")
return None
videoid_el = entry.find('.//yt:videoid', ns)
if videoid_el is None:
print(f"[{category}] <yt:videoid> not found")
return None
return videoid_el.text.strip()
except Exception as e:
print(f"[{category}] xml parsing error: {e}")
return None
def download_thumbnail(video_id, category):
url = f"http://i.ytimg.com/vi/{video_id}/hqdefault.jpg"
try:
response = requests.get(url, stream=True)
if response.status_code == 200:
os.makedirs(DL_FOLDER, exist_ok=True)
filepath = os.path.join(DL_FOLDER, f"{category}.jpg")
with open(filepath, 'wb') as f:
for chunk in response.iter_content(1024):
f.write(chunk)
print(f"[{category}] Thumbnail saved: {filepath}")
else:
print(f"[{category}] HTTP ERROR: HTTP {response.status_code}")
except Exception as e:
print(f"[{category}] Error: {e}")
def thumbnail_scheduler():
while True:
for category in CATEGORIES:
video_id = get_first_video_id_from_route(category)
if video_id:
download_thumbnail(video_id, category)
else:
print(f"[{category}] video id missing")
time.sleep(600)
class GetVideoInfo:
def build(self, videoId):
streamUrl = f"https://www.googleapis.com/youtubei/v1/player?videoId={videoId}"
headers = {
'Content-Type': 'application/json',
'User-Agent': 'Mozilla/5.0',
'Cookie': 'cookie',
'X-Goog-Visitor-Id': "id",
'X-Youtube-Bootstrap-Logged-In': 'false'
}
payload = {
"context": {
"client": {
"hl": "en",
"gl": "US",
"clientName": "WEB",
"clientVersion": "2.20231221"
}
},
"videoId": videoId,
"params": ""
}
response = requests.post(streamUrl, json=payload, headers=headers)
if response.status_code != 200:
return f"Error retrieving video info: {response.status_code}", response.status_code
try:
json_data = response.json()
# print(json_data)
title = json_data['videoDetails']['title']
length_seconds = json_data['videoDetails']['lengthSeconds']
author = json_data['videoDetails']['author']
except KeyError as e:
return f"Missing key: {e}", 400
fmtList = "43/854x480/9/0/115"
fmtStreamMap = f"43|"
fmtMap = "43/0/7/0/0"
thumbnailUrl = f"http://i.ytimg.com/vi/{videoId}/mqdefault.jpg"
response_str = (
f"status=ok&"
f"length_seconds={length_seconds}&"
f"keywords=a&"
f"vq=None&"
f"muted=0&"
f"avg_rating=5.0&"
f"thumbnailUrl={thumbnailUrl}&"
f"allow_ratings=1&"
f"hl=en&"
f"ftoken=&"
f"allow_embed=1&"
f"fmtMap={fmtMap}&"
f"fmt_url_map={fmtStreamMap}&"
f"token=null&"
f"plid=null&"
f"track_embed=0&"
f"author={author}&"
f"title={title}&"
f"videoId={videoId}&"
f"fmtList={fmtList}&"
f"fmtStreamMap={fmtStreamMap}"
)
return Response(response_str, content_type='text/plain')
@app.route('/get_video_info', methods=['GET'])
def get_video_info():
video_id = request.args.get('video_id')
if not video_id:
return jsonify({"error": "Missing video_id parameter"}), 400
video_info = GetVideoInfo().build(video_id)
return video_info
@app.route("/wiitv")
def wiitv():
return send_from_directory(".", "leanbacklite_wii.swf", mimetype='application/x-shockwave-flash')
@app.route("/<path:filename>")
def serve_video(filename):
file_path = os.path.join(filename)
if not os.path.exists(file_path):
return "404 Not found", 404
return send_file(file_path)
@app.route('/player_204')
def player():
return ""
#@app.route('/get_video', methods=['GET'])
#def get_video():
#work in progress
@app.route('/apiplayer-loader')
def loadapi():
return send_from_directory('.', 'loader.swf', mimetype='application/x-shockwave-flash')
@app.route('/videoplayback')
def playback():
return send_from_directory('.', 'apiplayer.swf', mimetype='application/x-shockwave-flash')
@app.route('/complete/search')
def completesearch():
return send_file('search.js')
class Invidious:
def generateXML(self, json_data):
xml_string = '<?xml version=\'1.0\' encoding=\'UTF-8\'?>'
xml_string += '<feed xmlns:openSearch=\'http://a9.com/-/spec/opensearch/1.1/\' xmlns:media=\'http://search.yahoo.com/mrss/\' xmlns:yt=\'http://www.youtube.com/xml/schemas/2015\'>'
xml_string += '<title type=\'text\'>Videos</title>'
xml_string += '<author><name>ReviveMii</name><uri>http://revivemii.xyz</uri></author>'
xml_string += '<generator ver=\'1.0\' uri=\'http://new.old.errexe.xyz/\'>RiiviveTube</generator>'
xml_string += f'<openSearch:totalResults>{len(json_data)}</openSearch:totalResults>'
xml_string += '<openSearch:startIndex>1</openSearch:startIndex>'
xml_string += '<openSearch:itemsPerPage>20</openSearch:itemsPerPage>'
for item in json_data:
xml_string += '<entry>'
xml_string += '<id>http://127.0.0.1/api/videos/' + self.escape_xml(item["videoId"]) + '</id>'
xml_string += '<published>' + self.escape_xml(item.get("publishedText", "")) + '</published>'
xml_string += '<title type="text">' + self.escape_xml(item.get("title", "")) + '</title>'
xml_string += '<link rel="http://127.0.0.1/api/videos/' + self.escape_xml(item["videoId"]) + '/related"/>'
xml_string += '<author><name>' + self.escape_xml(item.get("author", "")) + '</name>'
xml_string += '<uri>http://127.0.0.1/api/channels/' + self.escape_xml(item.get("authorId", "")) + '</uri></author>'
xml_string += '<media:group>'
xml_string += '<media:thumbnail yt:name="hqdefault" url="http://i.ytimg.com/vi/' + self.escape_xml(item["videoId"]) + '/hqdefault.jpg" height="240" width="320" time="00:00:00"/>'
xml_string += '<yt:duration seconds="' + self.escape_xml(str(item.get("lengthSeconds", 0))) + '"/>'
xml_string += '<yt:videoid id="' + self.escape_xml(item["videoId"]) + '">' + self.escape_xml(item["videoId"]) + '</yt:videoid>'
xml_string += '<media:credit role="uploader" name="' + self.escape_xml(item.get("author", "")) + '">' + self.escape_xml(item.get("author", "")) + '</media:credit>'
xml_string += '</media:group>'
xml_string += '<yt:statistics favoriteCount="' + str(item.get("viewCount", 0)) + '" viewCount="' + str(item.get("viewCount", 0)) + '"/>'
xml_string += '</entry>'
xml_string += '</feed>'
return xml_string
def search(self, query):
results = youtubei.innertube_search(query)
return Response(self.generateXML(results), mimetype='text/atom+xml')
def trends(self, type_param=None):
results = youtubei.innertube_trending(type_param)
return Response(self.generateXML(results), mimetype='text/atom+xml')
def music(self, type_param=None):
return self.search("music")
def gaming(self, type_param=None):
return self.search("gaming")
def sports(self, type_param=None):
return self.search("sports")
def news(self, type_param=None):
return self.search("news")
@staticmethod
def escape_xml(s):
if s is None:
return ''
return s.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')\
.replace('"', '&quot;').replace("'", '&apos;')
inv = Invidious()
@app.route('/feeds/api/videos')
def api_videos():
query = request.args.get('q')
if not query:
return jsonify({"error": "Missing 'q' parameter"}), 400
try:
return inv.search(query)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/trending')
def trending():
try:
return inv.trends()
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/music')
def trending_music():
try:
return inv.music()
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/gaming')
def trending_gaming():
try:
return inv.gaming()
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/sports')
def trending_sports():
try:
return inv.sports()
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/news')
def trending_news():
try:
return inv.news()
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == "__main__":
threading.Thread(target=thumbnail_scheduler, daemon=True).start()
app.run(host="0.0.0.0", port=5005, debug=True)