riivivetube/youtubei.py
2025-08-29 21:00:46 +02:00

304 lines
10 KiB
Python

# work in progress
import requests
YOUTUBE_INNERTUBE_API_KEY = "key"
YOUTUBE_API_URL = f"https://www.youtube.com/youtubei/v1/search?key={YOUTUBE_INNERTUBE_API_KEY}&region=US"
YOUTUBE_API_BROWSE_URL = f"https://www.youtube.com/youtubei/v1/browse?key={YOUTUBE_INNERTUBE_API_KEY}"
CLIENT_CONTEXT = {
"client": {
"clientName": "WEB",
"clientVersion": "2.20230615.09.00",
"hl": "en",
"gl": "US",
}
}
TRENDING_PARAMS = {
"music": "4gINGgt5dG1hX2NoYXJ0cw%3D%3D",
"gaming": "4gIcGhpnYW1pbmdfY29ycHVzX21vc3RfcG9wdWxhcg%3D%3D",
"movies": "4gIKGgh0cmFpbGVycw%3D%3D"
}
def safe_int(value, default=0):
try:
return int(value)
except (ValueError, TypeError):
return default
def get_video_info(video_id):
url = "https://www.youtube.com/youtubei/v1/player"
headers = {
"User-Agent": "Mozilla/5.0",
"Accept-Language": "en-US,en;q=0.9",
"Content-Type": "application/json",
"X-YouTube-Client-Name": "1",
"X-YouTube-Client-Version": "2.20231221"
}
payload = {
"context": {
"client": {
"hl": "en",
"gl": "US",
"clientName": "WEB",
"clientVersion": "2.20231221"
}
},
"videoId": video_id
}
try:
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
data = response.json()
video_details = data.get('videoDetails', {})
microformat = data.get('microformat', {}).get('playerMicroformatRenderer', {})
engagement = data.get('engagement', {})
result = {
'videoId': video_id,
'title': video_details.get('title', ''),
'author': video_details.get('author', ''),
'authorId': video_details.get('channelId', ''),
'lengthSeconds': video_details.get('lengthSeconds', 0),
'viewCount': video_details.get('viewCount', 0),
'description': video_details.get('shortDescription', ''),
'publishedText': microformat.get('publishDate', ''),
'averageRating': engagement.get('averageRating', 0),
'likeCount': engagement.get('likeCount', 0),
'dislikeCount': engagement.get('dislikeCount', 0),
'keywords': video_details.get('keywords', []),
'isLive': video_details.get('isLive', False),
'thumbnail': {
'thumbnails': video_details.get('thumbnail', {}).get('thumbnails', [])
}
}
return result
except Exception as e:
print(f"Error fetching video info: {str(e)}")
return None
def extract_length_text_and_seconds(vd):
length_obj = vd.get("lengthText", {})
simple_text = length_obj.get("simpleText")
accessible_label = (
length_obj.get("accessibility", {})
.get("accessibilityData", {})
.get("label")
)
result = None
seconds = 0
if simple_text:
result = {
"accessibility": {
"accessibilityData": {
"label": accessible_label or ""
}
},
"simpleText": simple_text
}
try:
parts = list(map(int, simple_text.split(":")))
if len(parts) == 3:
seconds = parts[0] * 3600 + parts[1] * 60 + parts[2]
elif len(parts) == 2:
seconds = parts[0] * 60 + parts[1]
elif len(parts) == 1:
seconds = parts[0]
except:
seconds = 0
return result, seconds
def extract_videos_from_items(items):
videos = []
for item in items:
if "videoRenderer" in item:
videos.append(item["videoRenderer"])
elif "carouselShelfRenderer" in item or "richShelfRenderer" in item:
contents = item.get("carouselShelfRenderer", {}).get("contents", []) \
or item.get("richShelfRenderer", {}).get("contents", [])
videos.extend(extract_videos_from_items(contents))
elif "shelfRenderer" in item:
contents = item.get("shelfRenderer", {}).get("content", {}).get("expandedShelfContentsRenderer", {}).get("items", [])
videos.extend(extract_videos_from_items(contents))
return videos
def deduplicate_videos(videos):
seen = set()
unique = []
for v in videos:
vid = v.get("videoId")
if vid and vid not in seen:
seen.add(vid)
unique.append(v)
return unique
def parse_view_count(view_count_text):
text = (view_count_text or "").lower().replace("views", "").strip()
try:
if "k" in text:
return int(float(text.replace("k", "")) * 1_000)
if "m" in text:
return int(float(text.replace("m", "")) * 1_000_000)
return int("".join(filter(str.isdigit, text))) or 0
except:
return 0
def innertube_search(query, region="US", max_results=50):
payload = {"context": CLIENT_CONTEXT, "query": query, "params": ""}
headers = {"Content-Type": "application/json", "User-Agent": "Mozilla/5.0", "Accept-Language": "en-US"}
r = requests.post(YOUTUBE_API_URL, json=payload, headers=headers)
r.raise_for_status()
data = r.json()
videos = []
sections = (
data.get("contents", {})
.get("twoColumnSearchResultsRenderer", {})
.get("primaryContents", {})
.get("sectionListRenderer", {})
.get("contents", [])
)
for section in sections:
items = section.get("itemSectionRenderer", {}).get("contents", [])
for item in items:
vd = item.get("videoRenderer")
if not vd:
continue
title = vd["title"]["runs"][0]["text"]
vid = vd.get("videoId", "")
owner_runs = vd.get("ownerText", {}).get("runs", [{}])
author = owner_runs[0].get("text", "")
nav = owner_runs[0].get("navigationEndpoint", {}).get("browseEndpoint", {})
authorId = nav.get("browseId", "")
authorUrl = nav.get("canonicalBaseUrl", "")
authorVerified = any(
b.get("metadataBadgeRenderer", {}).get("style") == "BADGE_STYLE_TYPE_VERIFIED"
for b in vd.get("ownerBadges", [])
)
authorThumbnails = (
vd.get("channelThumbnailSupportedRenderers", {})
.get("channelThumbnailWithLinkRenderer", {})
.get("thumbnail", {}).get("thumbnails", [])
)
videoThumbnails = vd.get("thumbnail", {}).get("thumbnails", [])
desc = vd.get("descriptionSnippet", {}).get("runs", [{}])[0].get("text", "")
viewCountText = vd.get("viewCountText", {}).get("simpleText", "")
viewCount = parse_view_count(viewCountText)
publishedText = vd.get("publishedTimeText", {}).get("simpleText", "")
liveNow = any(
"LIVE" in b.get("metadataBadgeRenderer", {}).get("label", "").upper()
for b in vd.get("badges", [])
)
lengthText, lengthSeconds = extract_length_text_and_seconds(vd)
videos.append({
"type": "video",
"title": title,
"videoId": vid,
"author": author,
"authorId": authorId,
"authorUrl": authorUrl,
"authorVerified": authorVerified,
"authorThumbnails": authorThumbnails,
"videoThumbnails": videoThumbnails,
"description": desc,
"viewCount": viewCount,
"viewCountText": viewCountText,
"publishedText": publishedText,
"lengthSeconds": lengthSeconds,
"lengthText": lengthText,
"liveNow": liveNow
})
if len(videos) >= max_results:
break
if len(videos) >= max_results:
break
return videos
def innertube_trending(trending_type=None, region="US", max_results=50):
trending_type_key = trending_type.lower() if trending_type else ""
params = TRENDING_PARAMS.get(trending_type_key, "")
payload = {
"context": CLIENT_CONTEXT,
"browseId": "FEtrending",
}
if params:
payload["params"] = params
headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0",
"Accept-Language": "en-US"
}
resp = requests.post(YOUTUBE_API_BROWSE_URL, json=payload, headers=headers)
resp.raise_for_status()
data = resp.json()
try:
section_list = data.get("contents", {}) \
.get("twoColumnBrowseResultsRenderer", {}) \
.get("tabs", [])[0] \
.get("tabRenderer", {}) \
.get("content", {}) \
.get("sectionListRenderer", {}) \
.get("contents", [])
all_items = []
for section in section_list:
items = section.get("itemSectionRenderer", {}).get("contents", [])
all_items.extend(items)
videos = extract_videos_from_items(all_items)
videos = deduplicate_videos(videos)
videos = videos[:max_results]
def parse_video(vd):
title = vd.get("title", {}).get("runs", [{}])[0].get("text", "")
vid = vd.get("videoId", "")
author = vd.get("ownerText", {}).get("runs", [{}])[0].get("text", "")
thumbnails = vd.get("thumbnail", {}).get("thumbnails", [])
view_count_text = vd.get("viewCountText", {}).get("simpleText", "")
view_count = parse_view_count(view_count_text)
published_text = vd.get("publishedTimeText", {}).get("simpleText", "")
length_text_obj, length_seconds = extract_length_text_and_seconds(vd)
return {
"title": title,
"videoId": vid,
"author": author,
"videoThumbnails": thumbnails,
"viewCount": view_count,
"publishedText": published_text,
"lengthText": length_text_obj,
"lengthSeconds": length_seconds
}
return [parse_video(v) for v in videos]
except Exception as e:
print("Trending Parsing Error:", e)
return []