Blog/primeran/primeran_series_download.py
(Deskargatu)
#!/usr/bin/env python3
"""
Primeran-etik bideoa jaitsi (/w/, /s/, /m/), DASH CENC fluxuak berreraiki
eta ClearKey gakoarekin deszifratu (KID:KEY hex).
.sort fitxategiak ere sortzen ditu (WebVTT API-tik).
"""
from __future__ import annotations
import argparse
import json
import os
import re
import shutil
import ssl
import subprocess
import sys
import tempfile
import urllib.error
import urllib.request
import xml.etree.ElementTree as ET
from urllib.parse import urljoin, urlparse
from dataclasses import dataclass
from pathlib import Path
SCRIPT_DIR = Path(__file__).resolve().parent
BUNDLE_BIN = SCRIPT_DIR / "bin"
EPISODE_HTML_LOOKBACK = 30000
EPISODE_HTML_AFTER_SLUG = 4500
def _is_executable_file(path: Path) -> bool:
return path.is_file() and os.access(path, os.X_OK)
def _resolve_tool(name: str, override: str | None) -> str | None:
if override:
p = Path(override).expanduser()
return str(p.resolve()) if p.is_file() else None
bundled = BUNDLE_BIN / name
if _is_executable_file(bundled):
return str(bundled.resolve())
w = shutil.which(name)
return str(Path(w).resolve()) if w else None
@dataclass(frozen=True)
class Episode:
season: int
episode: int
slug: str
title: str
manifest_url: str
subtitles: tuple[tuple[str, str], ...]
def _urlopen(url: str, timeout: int = 120, headers: dict | None = None, data: bytes | None = None):
req = urllib.request.Request(url, headers=headers or {}, data=data)
try:
ctx = ssl.create_default_context()
return urllib.request.urlopen(req, timeout=timeout, context=ctx)
except urllib.error.HTTPError:
raise
except urllib.error.URLError:
ctx = ssl._create_unverified_context()
return urllib.request.urlopen(req, timeout=timeout, context=ctx)
def _fetch(url: str) -> str:
return _urlopen(url, headers={
"User-Agent": "Mozilla/5.0 (compatible; primeran-dl/1.0)",
"Accept-Language": "eu,es;q=0.9,en;q=0.8",
}).read().decode("utf-8", errors="replace")
def _fetch_json(url: str) -> dict:
return json.loads(
_urlopen(url, headers={
"User-Agent": "Mozilla/5.0 (compatible; primeran-dl/1.0)",
"Accept": "application/json",
}).read().decode("utf-8")
)
def _origin_url(page_url: str) -> str:
p = urlparse(page_url)
return f"{p.scheme}://{p.netloc}"
def _extract_series_base_slug(url_slug: str) -> str:
"""Extrait le slug de base d'une série depuis une URL /s/<slug>.
Ex: 'threesome-d2-1' -> 'threesome', 'goenkale' -> 'goenkale'
"""
m = re.match(r"^(.+?)-d\d+-\d+$", url_slug)
return m.group(1) if m else url_slug
def _parse_primeran_page_url(page_url: str) -> tuple[str, str]:
"""Itzultzen du (kind, slug) non kind « series » (…/s/…), « movie » (…/m/…) edo « watch » (…/w/…)."""
m = re.search(r"https?://[^/]+/(s|m|w)/([^/?#]+)", page_url, re.I)
if not m:
raise ValueError(
f"URL ezezaguna (https://primeran.eus/s/…, …/m/… edo …/w/… espero zen): {page_url}"
)
kind_map = {"s": "series", "m": "movie", "w": "watch"}
return kind_map[m.group(1).lower()], m.group(2)
def _page_title_from_html(html: str) -> str | None:
m = re.search(r'<meta[^>]+property="og:title"[^>]+content="([^"]*)"', html, re.I)
if m:
return m.group(1).strip()
m = re.search(r"<title[^>]*>([^<]+)</title>", html, re.I)
return m.group(1).strip() if m else None
def _extract_episode_subtitles_from_chunk(chunk: str, slug_pos_in_chunk: int) -> tuple[tuple[str, str], ...]:
"""SSR : tableau subtitles: (avant ou après le slug selon la sérialisation).
Ne pas utiliser seulement le dernier « subtitles: » avant le slug : si les épisodes
sont listés dans l’ordre inverse dans le HTML, ce bloc est souvent celui de l’épisode
« suivant » (ex. E03 reçoit les sous-titres de E04). On garde tous les marqueurs
valides dans la fenêtre et on prend celui dont le début est le plus proche du slug.
"""
def pairs_from_blob(sub_blob: str) -> list[tuple[str, str]]:
pairs: list[tuple[str, str]] = []
for fm in re.finditer(
r'file:"(https://cdnstorage\.primeran\.eus/directus/eitb/[^"]+\.vtt)"',
sub_blob,
):
url = fm.group(1)
tail = sub_blob[fm.end() :]
cm = re.search(r'language:\$R\[\d+\]=\{id:\d+,code:"([^"]+)"', tail)
if cm:
pairs.append((cm.group(1), url))
return pairs
def cut_subtitles_blob(rest_from_marker: str) -> str | None:
for end_marker in ("],age_rating", "],tags:", "],theme:", "],transcoding_data:"):
end = rest_from_marker.find(end_marker)
if end >= 0:
return rest_from_marker[: end + 1]
return None
best: tuple[tuple[str, str], ...] = ()
best_dist: int | None = None
for sm in re.finditer("subtitles:", chunk):
sub_mark = sm.start()
rest = chunk[sub_mark:]
sub_blob = cut_subtitles_blob(rest)
if not sub_blob:
continue
pairs = pairs_from_blob(sub_blob)
if not pairs:
continue
dist = abs(sub_mark - slug_pos_in_chunk)
if best_dist is None or dist < best_dist:
best_dist = dist
best = tuple(pairs)
return best
def parse_episodes(html: str, series_slug: str, page_url: str) -> tuple[str, list[Episode]]:
"""Parse les épisodes d'une série via l'API /api/v1/series/."""
base_m = re.search(r'transcoding_base_path:"([^"]+)"', html)
base = base_m.group(1).rstrip("/") if base_m else ""
origin = _origin_url(page_url)
base_series_slug = _extract_series_base_slug(series_slug)
data = _fetch_json(f"{origin}/api/v1/series/{base_series_slug}")
out: list[Episode] = []
for season in data.get("seasons", []):
season_num = season.get("number", 0)
for ep_data in season.get("episodes", []):
if ep_data.get("type") != "vod":
continue
slug = ep_data.get("slug", "")
title = ep_data.get("title", slug) or slug
ep_num = ep_data.get("episode_number", 0)
tbp = ep_data.get("transcoding_base_path", "")
manifest_url: str | None = None
for td in ep_data.get("transcoding_data", []):
if td.get("drm_type") == "widevine" and td.get("manifest_type") == "dash":
mu = td.get("manifest_url", "")
if mu:
manifest_url = tbp + mu
break
if not manifest_url:
continue
subs: list[tuple[str, str]] = []
for s in ep_data.get("subtitles", []):
lang = (s.get("language") or {}).get("code", "und")
file_url = s.get("file", "")
if file_url:
subs.append((lang, file_url))
out.append(Episode(
season=season_num,
episode=ep_num,
slug=slug,
title=title,
manifest_url=manifest_url,
subtitles=tuple(subs),
))
out.sort(key=lambda e: (e.season, e.episode, e.slug))
return base, out
def parse_movie(html: str, movie_slug: str, page_url: str) -> tuple[str, list[Episode]]:
"""Parse un film via l'API /api/v1/media/."""
origin = _origin_url(page_url)
data = _fetch_json(f"{origin}/api/v1/media/{movie_slug}")
manifest_url = _pick_widevine_dash_url(data, origin, movie_slug)
title = data.get("title", movie_slug) or movie_slug
subs = _subtitles_from_api(data)
base_m = re.search(r'transcoding_base_path:"([^"]+)"', html)
base = base_m.group(1).rstrip("/") if base_m else ""
return base, [
Episode(
season=0, episode=0, slug=movie_slug,
title=title, manifest_url=manifest_url, subtitles=tuple(subs),
)
]
def parse_watch(html: str, slug: str, page_url: str) -> tuple[str, list[Episode]]:
"""Parse un épisode depuis une page /w/ (watch) via l'API /api/v1/media/."""
origin = _origin_url(page_url)
data = _fetch_json(f"{origin}/api/v1/media/{slug}")
manifest_url = _pick_widevine_dash_url(data, origin, slug)
title = data.get("title", slug) or slug
sd = data.get("season_data", {}) or {}
season = sd.get("season_number", 0)
ep_m = re.search(r"-d\d+-(\d+)", slug)
ep_num = int(ep_m.group(1)) if ep_m else 0
subs = _subtitles_from_api(data)
base_m = re.search(r'transcoding_base_path:"([^"]+)"', html)
base = base_m.group(1).rstrip("/") if base_m else ""
return base, [
Episode(
season=season, episode=ep_num, slug=slug,
title=title, manifest_url=manifest_url, subtitles=tuple(subs),
)
]
def _pick_widevine_dash_url(data: dict, origin: str, label: str) -> str:
for m in data.get("manifests", []):
if m.get("type") == "dash" and (m.get("drmConfig") or {}).get("type") == "widevine":
rel = m.get("manifestURL", "")
if rel:
return origin + rel
raise RuntimeError(f"Ez dago DASH Widevine manifestik « {label} »-rentzat.")
def _subtitles_from_api(data: dict) -> list[tuple[str, str]]:
subs: list[tuple[str, str]] = []
for s in data.get("subtitles", []):
lang = (s.get("language") or {}).get("code", "und")
file_url = s.get("file", "")
if file_url:
subs.append((lang, file_url))
return subs
def _dash_segments_base_url(mpd_xml: bytes, manifest_url: str) -> str:
"""Répertoire de base pour init/media (période BaseURL absolu ou relatif, sinon dossier du MPD)."""
root = ET.fromstring(mpd_xml)
_strip_ns_tree(root)
period = root.find("Period")
if period is not None:
bu_el = period.find("BaseURL")
if bu_el is not None and bu_el.text:
raw = bu_el.text.strip()
if raw.startswith("http://") or raw.startswith("https://"):
return raw.rstrip("/")
if raw:
return urljoin(manifest_url + "/", raw).rstrip("/")
return manifest_url.rsplit("/", 1)[0]
def _subtitles_from_mpd_vtt(mpd_xml: bytes) -> tuple[tuple[str, str], ...]:
"""Pistes text/vtt avec BaseURL Directus (souvent présentes sur les MPD « films »)."""
root = ET.fromstring(mpd_xml)
_strip_ns_tree(root)
out: list[tuple[str, str]] = []
for ad in root.findall("Period/AdaptationSet"):
mime = (ad.get("mimeType") or "").lower()
if "vtt" not in mime:
continue
lang = ad.get("lang") or "und"
for rep in ad.findall("Representation"):
bu = rep.find("BaseURL")
if bu is None or not bu.text:
continue
url = bu.text.strip()
if url.startswith("https://cdnstorage.primeran.eus/") and url.endswith(".vtt"):
out.append((lang, url))
return tuple(out)
def _strip_ns(tag: str) -> str:
return tag.split("}", 1)[-1] if tag.startswith("{") and "}" in tag else tag
def _strip_ns_tree(elem: ET.Element) -> None:
elem.tag = _strip_ns(elem.tag)
for k in list(elem.attrib):
nk = _strip_ns(k) if k.startswith("{") else k
if nk != k:
elem.attrib[nk] = elem.attrib.pop(k)
for c in elem:
_strip_ns_tree(c)
def pick_video_audio_templates(
mpd_xml: bytes, quality_height: int
) -> tuple[tuple[str, str, int, int], tuple[str, str, int, int]]:
"""
Returns ((init, media_template, segment_count, start_number), ...) for best video
<= quality_height and the (single) main audio track. Paths are relative to manifest
URL directory. Segment count honors SegmentTimeline/S @r (repeat).
"""
root = ET.fromstring(mpd_xml)
_strip_ns_tree(root)
def seg_count(st: ET.Element | None) -> int:
if st is None:
return 0
tl = st.find("SegmentTimeline")
if tl is None:
return 0
# Each <S> describes (r+1) contiguous segments of that duration (DASH r default 0).
n = 0
for s in tl.findall("S"):
r = int(s.get("r", "0") or "0")
n += r + 1
return n
def segment_start_number(st: ET.Element) -> int:
sn = st.get("startNumber")
return int(sn) if sn is not None else 1
best: tuple[int, ET.Element, ET.Element] | None = None
audio_pick: tuple[ET.Element, ET.Element] | None = None
for ad in root.findall("Period/AdaptationSet"):
ctype = ad.get("contentType", "")
# Skip trickplay / thumbnails
for rep in ad.findall("Representation"):
st = rep.find("SegmentTemplate")
if st is None:
continue
init = st.get("initialization") or ""
if "trickplay" in init or "tiles" in init:
continue
if ctype == "video":
h = int(rep.get("height") or "0")
bw = int(rep.get("bandwidth") or "0")
if h > quality_height:
continue
if best is None or h > best[0] or (h == best[0] and bw > int(best[1].get("bandwidth") or "0")):
best = (h, rep, st)
elif ctype == "audio":
if audio_pick is None or int(rep.get("bandwidth") or "0") > int(
audio_pick[0].get("bandwidth") or "0"
):
audio_pick = (rep, st)
if best is None or audio_pick is None:
raise RuntimeError("Impossible de choisir pistes vidéo/audio dans le MPD")
vrep, vst = best[1], best[2]
arep, ast = audio_pick
v_init = vst.get("initialization", "")
v_media = vst.get("media", "")
v_n = seg_count(vst)
v_sn = segment_start_number(vst)
a_init = ast.get("initialization", "")
a_media = ast.get("media", "")
a_n = seg_count(ast)
a_sn = segment_start_number(ast)
if not all([v_init, v_media, v_n, a_init, a_media, a_n]):
raise RuntimeError("SegmentTemplate incomplet dans le MPD")
return (v_init, v_media, v_n, v_sn), (a_init, a_media, a_n, a_sn)
def _download_file(url: str, dest: Path) -> None:
dest.parent.mkdir(parents=True, exist_ok=True)
r = _urlopen(url, timeout=300, headers={
"User-Agent": "Mozilla/5.0 (compatible; primeran-dl/1.0)",
})
with r, dest.open("wb") as f:
while True:
b = r.read(1024 * 256)
if not b:
break
f.write(b)
def _concat_segments(
base_cenc: str,
init_rel: str,
media_templ: str,
count: int,
out_raw: Path,
start_number: int = 1,
) -> None:
parts: list[Path] = []
with tempfile.TemporaryDirectory() as td:
tdir = Path(td)
init_url = f"{base_cenc}/{init_rel}"
init_path = tdir / "init.mp4"
_download_file(init_url, init_path)
parts.append(init_path)
for i in range(count):
n = start_number + i
media = media_templ.replace("$Number$", str(n))
seg_url = f"{base_cenc}/{media}"
seg_path = tdir / f"seg_{n}.m4s"
try:
_download_file(seg_url, seg_path)
except urllib.error.HTTPError as e:
if e.code == 404 and i > count * 0.9:
print(f" … azken segmentua {n} falta da (404), gelditu", flush=True)
break
raise
parts.append(seg_path)
if (i + 1) % 50 == 0:
print(f" … segmentuak {i + 1}/{count}", flush=True)
if len(parts) < 5:
raise RuntimeError(f"Segmentu gutxiegi jaitsi dira ({len(parts)})")
with out_raw.open("wb") as out:
for p in parts:
out.write(p.read_bytes())
def _run_mp4decrypt(mp4decrypt: str, kid: str, key: str, in_path: Path, out_path: Path) -> None:
cmd = [mp4decrypt, "--key", f"{kid}:{key}", str(in_path), str(out_path)]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL)
def _ffmpeg_mux(ffmpeg: str, video: Path, audio: Path, out_mp4: Path) -> None:
cmd = [
ffmpeg,
"-y",
"-loglevel",
"error",
"-i",
str(video),
"-i",
str(audio),
"-map",
"0:v:0",
"-map",
"1:a:0",
"-c",
"copy",
str(out_mp4),
]
subprocess.run(cmd, check=True)
def _lang_tag_for_filename(lang: str) -> str:
t = re.sub(r"[^\w.-]+", "_", lang.strip(), flags=re.ASCII).strip("._")
return t if t else "sub"
def _vtt_url_to_srt(ffmpeg: str, vtt_url: str, dest_srt: Path) -> None:
with tempfile.TemporaryDirectory() as td:
vtt_path = Path(td) / "subs.vtt"
_download_file(vtt_url, vtt_path)
cmd = [ffmpeg, "-y", "-loglevel", "error", "-i", str(vtt_path), str(dest_srt)]
subprocess.run(cmd, check=True)
def _safe_name(s: str, max_len: int = 120) -> str:
s = re.sub(r'[<>:"/\\|?*]+', "_", s)
s = re.sub(r"\s+", " ", s).strip()
return s[:max_len] if len(s) > max_len else s
def main() -> int:
ap = argparse.ArgumentParser(description="Primeran-etik bideoa jaitsi (DASH CENC + ClearKey).")
ap.add_argument(
"page_url",
metavar="url",
help="Orria …/s/<slug>, …/m/<slug> edo …/w/<slug> (adb. https://primeran.eus/m/maspalomas)",
)
ap.add_argument(
"--key",
dest="clearkey",
help="KID:GILA hex-ean (32 karaktere bakoitza). Bestela PRIMERAN_CLEARKEY ingurune-aldagaia.",
)
ap.add_argument("-o", "--output-dir", type=Path, default=Path("."), help="Irteerako karpeta")
ap.add_argument("--quality", type=int, default=1080, choices=(360, 480, 720, 1080))
ap.add_argument(
"--mp4decrypt",
help="mp4decrypt-en bidea (lehenetsia: ./bin/mp4decrypt badago, bestela PATH)",
)
ap.add_argument(
"--ffmpeg",
help="ffmpeg-en bidea (lehenetsia: ./bin/ffmpeg badago, bestela PATH)",
)
ap.add_argument(
"--limit",
type=int,
default=0,
help="Lehenengo N elementuak bakarrik prozesatu (atalak edo film bakarra ; 0 = guztiak)",
)
ap.add_argument("--no-subs", action="store_true", help="Ez jaitsi / bihurtu azpitituluak .srt")
args = ap.parse_args()
ck = args.clearkey or os.environ.get("PRIMERAN_CLEARKEY")
if not ck:
print("Eman --key KID:GILA edo exportatu PRIMERAN_CLEARKEY=KID:GILA", file=sys.stderr)
return 2
if ":" not in ck:
print("Giltza formatua: kid_hex:giltza_hex", file=sys.stderr)
return 2
kid_raw, key_raw = ck.split(":", 1)
kid = kid_raw.replace("-", "").lower()
key = key_raw.replace("-", "").lower()
if len(kid) != 32 or len(key) != 32:
print("KID eta giltzak 16 byte izan behar dituzte (32 karaktere hex).", file=sys.stderr)
return 2
mp4decrypt = _resolve_tool("mp4decrypt", args.mp4decrypt)
if not mp4decrypt:
print(
f"mp4decrypt ez da aurkitu ({BUNDLE_BIN / 'mp4decrypt'} edo --mp4decrypt espero zen).",
file=sys.stderr,
)
return 2
ffmpeg = _resolve_tool("ffmpeg", args.ffmpeg)
if not ffmpeg:
print(
"ffmpeg ez da aurkitu (jarri binary bat bin/ffmpeg karpetan, instalatu edo erabili --ffmpeg).",
file=sys.stderr,
)
return 2
try:
kind, media_slug = _parse_primeran_page_url(args.page_url)
except ValueError as ex:
print(str(ex), file=sys.stderr)
return 2
print(f"Orria: {args.page_url} ({kind})", flush=True)
html = _fetch(args.page_url)
try:
if kind == "movie":
base, episodes = parse_movie(html, media_slug, args.page_url)
elif kind == "watch":
base, episodes = parse_watch(html, media_slug, args.page_url)
else:
base, episodes = parse_episodes(html, media_slug, args.page_url)
except RuntimeError as ex:
print(str(ex), file=sys.stderr)
return 1
if not episodes:
label = "filma" if kind == "movie" else "atala"
print(f"Ez da {label} aurkitu « {media_slug} »-rentzat.", file=sys.stderr)
return 1
if args.limit > 0:
episodes = episodes[: args.limit]
unit = "bideoa(k)" if kind == "movie" else "atala(k)"
print(f"CDN: {base or '(ezezaguna)'} — {len(episodes)} {unit}.", flush=True)
args.output_dir.mkdir(parents=True, exist_ok=True)
for ep in episodes:
if kind == "movie":
label = f"{_safe_name(ep.title)}.mp4"
else:
label = f"S{ep.season:02d}E{ep.episode:02d} - {_safe_name(ep.title)}.mp4"
dest = args.output_dir / label
if dest.exists():
print(f"Dagoeneko existitzen da, saltatu: {dest.name}", flush=True)
continue
manifest_url = ep.manifest_url
print(f"Jaisten: {label}", flush=True)
try:
mpd = _urlopen(manifest_url, timeout=60, headers={
"User-Agent": "Mozilla/5.0 (compatible; primeran-dl/1.0)",
}).read()
except urllib.error.HTTPError as e:
print(f" MPD errorea {e.code} {manifest_url}-rentzat", file=sys.stderr)
continue
try:
vt, at = pick_video_audio_templates(mpd, args.quality)
except Exception as ex:
print(f" MPD: {ex}", file=sys.stderr)
continue
base_cenc = _dash_segments_base_url(mpd, manifest_url)
subtitle_tracks = ep.subtitles or _subtitles_from_mpd_vtt(mpd)
with tempfile.TemporaryDirectory() as tmp:
t = Path(tmp)
v_raw = t / "v_raw.mp4"
a_raw = t / "a_raw.mp4"
v_dec = t / "v_dec.mp4"
a_dec = t / "a_dec.mp4"
print(" Bideoa…", flush=True)
_concat_segments(base_cenc, vt[0], vt[1], vt[2], v_raw, vt[3])
print(" Audioa…", flush=True)
_concat_segments(base_cenc, at[0], at[1], at[2], a_raw, at[3])
print(" Deszifratzen…", flush=True)
_run_mp4decrypt(mp4decrypt, kid, key, v_raw, v_dec)
_run_mp4decrypt(mp4decrypt, kid, key, a_raw, a_dec)
print(" Mux ffmpeg…", flush=True)
_ffmpeg_mux(ffmpeg, v_dec, a_dec, dest)
if not args.no_subs and subtitle_tracks:
print(" Azpitituluak (.srt)…", flush=True)
lang_count: dict[str, int] = {}
for lang, vtt_url in subtitle_tracks:
tag = _lang_tag_for_filename(lang)
n = lang_count.get(tag, 0) + 1
lang_count[tag] = n
if n == 1:
srt_name = f"{dest.stem}.{tag}.srt"
else:
srt_name = f"{dest.stem}.{tag}.{n}.srt"
srt_dest = args.output_dir / srt_name
if srt_dest.exists():
continue
try:
_vtt_url_to_srt(ffmpeg, vtt_url, srt_dest)
except (OSError, subprocess.CalledProcessError) as ex:
print(f" SRT abisua ({tag}): {ex}", file=sys.stderr)
print(f" OK → {dest}", flush=True)
return 0
if __name__ == "__main__":
raise SystemExit(main())