# work in progress import requests YOUTUBE_INNERTUBE_API_KEY = "key" YOUTUBE_API_URL = f"https://www.youtube.com/youtubei/v1/search?key={YOUTUBE_INNERTUBE_API_KEY}®ion=US" YOUTUBE_API_BROWSE_URL = f"https://www.youtube.com/youtubei/v1/browse?key={YOUTUBE_INNERTUBE_API_KEY}" CLIENT_CONTEXT = { "client": { "clientName": "WEB", "clientVersion": "2.20230615.09.00", "hl": "en", "gl": "US", } } TRENDING_PARAMS = { "music": "4gINGgt5dG1hX2NoYXJ0cw%3D%3D", "gaming": "4gIcGhpnYW1pbmdfY29ycHVzX21vc3RfcG9wdWxhcg%3D%3D", "movies": "4gIKGgh0cmFpbGVycw%3D%3D" } def safe_int(value, default=0): try: return int(value) except (ValueError, TypeError): return default def get_video_info(video_id): url = "https://www.youtube.com/youtubei/v1/player" headers = { "User-Agent": "Mozilla/5.0", "Accept-Language": "en-US,en;q=0.9", "Content-Type": "application/json", "X-YouTube-Client-Name": "1", "X-YouTube-Client-Version": "2.20231221" } payload = { "context": { "client": { "hl": "en", "gl": "US", "clientName": "WEB", "clientVersion": "2.20231221" } }, "videoId": video_id } try: response = requests.post(url, json=payload, headers=headers) response.raise_for_status() data = response.json() video_details = data.get('videoDetails', {}) microformat = data.get('microformat', {}).get('playerMicroformatRenderer', {}) engagement = data.get('engagement', {}) result = { 'videoId': video_id, 'title': video_details.get('title', ''), 'author': video_details.get('author', ''), 'authorId': video_details.get('channelId', ''), 'lengthSeconds': video_details.get('lengthSeconds', 0), 'viewCount': video_details.get('viewCount', 0), 'description': video_details.get('shortDescription', ''), 'publishedText': microformat.get('publishDate', ''), 'averageRating': engagement.get('averageRating', 0), 'likeCount': engagement.get('likeCount', 0), 'dislikeCount': engagement.get('dislikeCount', 0), 'keywords': video_details.get('keywords', []), 'isLive': video_details.get('isLive', False), 'thumbnail': { 'thumbnails': video_details.get('thumbnail', {}).get('thumbnails', []) } } return result except Exception as e: print(f"Error fetching video info: {str(e)}") return None def extract_length_text_and_seconds(vd): length_obj = vd.get("lengthText", {}) simple_text = length_obj.get("simpleText") accessible_label = ( length_obj.get("accessibility", {}) .get("accessibilityData", {}) .get("label") ) result = None seconds = 0 if simple_text: result = { "accessibility": { "accessibilityData": { "label": accessible_label or "" } }, "simpleText": simple_text } try: parts = list(map(int, simple_text.split(":"))) if len(parts) == 3: seconds = parts[0] * 3600 + parts[1] * 60 + parts[2] elif len(parts) == 2: seconds = parts[0] * 60 + parts[1] elif len(parts) == 1: seconds = parts[0] except: seconds = 0 return result, seconds def extract_videos_from_items(items): videos = [] for item in items: if "videoRenderer" in item: videos.append(item["videoRenderer"]) elif "carouselShelfRenderer" in item or "richShelfRenderer" in item: contents = item.get("carouselShelfRenderer", {}).get("contents", []) \ or item.get("richShelfRenderer", {}).get("contents", []) videos.extend(extract_videos_from_items(contents)) elif "shelfRenderer" in item: contents = item.get("shelfRenderer", {}).get("content", {}).get("expandedShelfContentsRenderer", {}).get("items", []) videos.extend(extract_videos_from_items(contents)) return videos def deduplicate_videos(videos): seen = set() unique = [] for v in videos: vid = v.get("videoId") if vid and vid not in seen: seen.add(vid) unique.append(v) return unique def parse_view_count(view_count_text): text = (view_count_text or "").lower().replace("views", "").strip() try: if "k" in text: return int(float(text.replace("k", "")) * 1_000) if "m" in text: return int(float(text.replace("m", "")) * 1_000_000) return int("".join(filter(str.isdigit, text))) or 0 except: return 0 def innertube_search(query, region="US", max_results=50): payload = {"context": CLIENT_CONTEXT, "query": query, "params": ""} headers = {"Content-Type": "application/json", "User-Agent": "Mozilla/5.0", "Accept-Language": "en-US"} r = requests.post(YOUTUBE_API_URL, json=payload, headers=headers) r.raise_for_status() data = r.json() videos = [] sections = ( data.get("contents", {}) .get("twoColumnSearchResultsRenderer", {}) .get("primaryContents", {}) .get("sectionListRenderer", {}) .get("contents", []) ) for section in sections: items = section.get("itemSectionRenderer", {}).get("contents", []) for item in items: vd = item.get("videoRenderer") if not vd: continue title = vd["title"]["runs"][0]["text"] vid = vd.get("videoId", "") owner_runs = vd.get("ownerText", {}).get("runs", [{}]) author = owner_runs[0].get("text", "") nav = owner_runs[0].get("navigationEndpoint", {}).get("browseEndpoint", {}) authorId = nav.get("browseId", "") authorUrl = nav.get("canonicalBaseUrl", "") authorVerified = any( b.get("metadataBadgeRenderer", {}).get("style") == "BADGE_STYLE_TYPE_VERIFIED" for b in vd.get("ownerBadges", []) ) authorThumbnails = ( vd.get("channelThumbnailSupportedRenderers", {}) .get("channelThumbnailWithLinkRenderer", {}) .get("thumbnail", {}).get("thumbnails", []) ) videoThumbnails = vd.get("thumbnail", {}).get("thumbnails", []) desc = vd.get("descriptionSnippet", {}).get("runs", [{}])[0].get("text", "") viewCountText = vd.get("viewCountText", {}).get("simpleText", "") viewCount = parse_view_count(viewCountText) publishedText = vd.get("publishedTimeText", {}).get("simpleText", "") liveNow = any( "LIVE" in b.get("metadataBadgeRenderer", {}).get("label", "").upper() for b in vd.get("badges", []) ) lengthText, lengthSeconds = extract_length_text_and_seconds(vd) videos.append({ "type": "video", "title": title, "videoId": vid, "author": author, "authorId": authorId, "authorUrl": authorUrl, "authorVerified": authorVerified, "authorThumbnails": authorThumbnails, "videoThumbnails": videoThumbnails, "description": desc, "viewCount": viewCount, "viewCountText": viewCountText, "publishedText": publishedText, "lengthSeconds": lengthSeconds, "lengthText": lengthText, "liveNow": liveNow }) if len(videos) >= max_results: break if len(videos) >= max_results: break return videos def innertube_trending(trending_type=None, region="US", max_results=50): trending_type_key = trending_type.lower() if trending_type else "" params = TRENDING_PARAMS.get(trending_type_key, "") payload = { "context": CLIENT_CONTEXT, "browseId": "FEtrending", } if params: payload["params"] = params headers = { "Content-Type": "application/json", "User-Agent": "Mozilla/5.0", "Accept-Language": "en-US" } resp = requests.post(YOUTUBE_API_BROWSE_URL, json=payload, headers=headers) resp.raise_for_status() data = resp.json() try: section_list = data.get("contents", {}) \ .get("twoColumnBrowseResultsRenderer", {}) \ .get("tabs", [])[0] \ .get("tabRenderer", {}) \ .get("content", {}) \ .get("sectionListRenderer", {}) \ .get("contents", []) all_items = [] for section in section_list: items = section.get("itemSectionRenderer", {}).get("contents", []) all_items.extend(items) videos = extract_videos_from_items(all_items) videos = deduplicate_videos(videos) videos = videos[:max_results] def parse_video(vd): title = vd.get("title", {}).get("runs", [{}])[0].get("text", "") vid = vd.get("videoId", "") author = vd.get("ownerText", {}).get("runs", [{}])[0].get("text", "") thumbnails = vd.get("thumbnail", {}).get("thumbnails", []) view_count_text = vd.get("viewCountText", {}).get("simpleText", "") view_count = parse_view_count(view_count_text) published_text = vd.get("publishedTimeText", {}).get("simpleText", "") length_text_obj, length_seconds = extract_length_text_and_seconds(vd) return { "title": title, "videoId": vid, "author": author, "videoThumbnails": thumbnails, "viewCount": view_count, "publishedText": published_text, "lengthText": length_text_obj, "lengthSeconds": length_seconds } return [parse_video(v) for v in videos] except Exception as e: print("Trending Parsing Error:", e) return []