Tfe

Ongi etorri tfe-ren webgunera...

Blog/primeran/primeran_series_download.py

(Deskargatu)
#!/usr/bin/env python3
"""
Primeran-etik bideoa jaitsi (/w/, /s/, /m/), DASH CENC fluxuak berreraiki
eta ClearKey gakoarekin deszifratu (KID:KEY hex).
.sort fitxategiak ere sortzen ditu (WebVTT API-tik).
"""

from __future__ import annotations

import argparse
import json
import os
import re
import shutil
import ssl
import subprocess
import sys
import tempfile
import urllib.error
import urllib.request
import xml.etree.ElementTree as ET
from urllib.parse import urljoin, urlparse
from dataclasses import dataclass
from pathlib import Path

SCRIPT_DIR = Path(__file__).resolve().parent
BUNDLE_BIN = SCRIPT_DIR / "bin"
EPISODE_HTML_LOOKBACK = 30000
EPISODE_HTML_AFTER_SLUG = 4500


def _is_executable_file(path: Path) -> bool:
    return path.is_file() and os.access(path, os.X_OK)


def _resolve_tool(name: str, override: str | None) -> str | None:
    if override:
        p = Path(override).expanduser()
        return str(p.resolve()) if p.is_file() else None
    bundled = BUNDLE_BIN / name
    if _is_executable_file(bundled):
        return str(bundled.resolve())
    w = shutil.which(name)
    return str(Path(w).resolve()) if w else None


@dataclass(frozen=True)
class Episode:
    season: int
    episode: int
    slug: str
    title: str
    manifest_url: str
    subtitles: tuple[tuple[str, str], ...]


def _urlopen(url: str, timeout: int = 120, headers: dict | None = None, data: bytes | None = None):
    req = urllib.request.Request(url, headers=headers or {}, data=data)
    try:
        ctx = ssl.create_default_context()
        return urllib.request.urlopen(req, timeout=timeout, context=ctx)
    except urllib.error.HTTPError:
        raise
    except urllib.error.URLError:
        ctx = ssl._create_unverified_context()
        return urllib.request.urlopen(req, timeout=timeout, context=ctx)


def _fetch(url: str) -> str:
    return _urlopen(url, headers={
        "User-Agent": "Mozilla/5.0 (compatible; primeran-dl/1.0)",
        "Accept-Language": "eu,es;q=0.9,en;q=0.8",
    }).read().decode("utf-8", errors="replace")


def _fetch_json(url: str) -> dict:
    return json.loads(
        _urlopen(url, headers={
            "User-Agent": "Mozilla/5.0 (compatible; primeran-dl/1.0)",
            "Accept": "application/json",
        }).read().decode("utf-8")
    )


def _origin_url(page_url: str) -> str:
    p = urlparse(page_url)
    return f"{p.scheme}://{p.netloc}"


def _extract_series_base_slug(url_slug: str) -> str:
    """Extrait le slug de base d'une série depuis une URL /s/<slug>.
    Ex: 'threesome-d2-1' -> 'threesome', 'goenkale' -> 'goenkale'
    """
    m = re.match(r"^(.+?)-d\d+-\d+$", url_slug)
    return m.group(1) if m else url_slug


def _parse_primeran_page_url(page_url: str) -> tuple[str, str]:
    """Itzultzen du (kind, slug) non kind « series » (…/s/…), « movie » (…/m/…) edo « watch » (…/w/…)."""
    m = re.search(r"https?://[^/]+/(s|m|w)/([^/?#]+)", page_url, re.I)
    if not m:
        raise ValueError(
            f"URL ezezaguna (https://primeran.eus/s/…, …/m/… edo …/w/… espero zen): {page_url}"
        )
    kind_map = {"s": "series", "m": "movie", "w": "watch"}
    return kind_map[m.group(1).lower()], m.group(2)


def _page_title_from_html(html: str) -> str | None:
    m = re.search(r'<meta[^>]+property="og:title"[^>]+content="([^"]*)"', html, re.I)
    if m:
        return m.group(1).strip()
    m = re.search(r"<title[^>]*>([^<]+)</title>", html, re.I)
    return m.group(1).strip() if m else None


def _extract_episode_subtitles_from_chunk(chunk: str, slug_pos_in_chunk: int) -> tuple[tuple[str, str], ...]:
    """SSR : tableau subtitles: (avant ou après le slug selon la sérialisation).

    Ne pas utiliser seulement le dernier « subtitles: » avant le slug : si les épisodes
    sont listés dans l’ordre inverse dans le HTML, ce bloc est souvent celui de l’épisode
    « suivant » (ex. E03 reçoit les sous-titres de E04). On garde tous les marqueurs
    valides dans la fenêtre et on prend celui dont le début est le plus proche du slug.
    """

    def pairs_from_blob(sub_blob: str) -> list[tuple[str, str]]:
        pairs: list[tuple[str, str]] = []
        for fm in re.finditer(
            r'file:"(https://cdnstorage\.primeran\.eus/directus/eitb/[^"]+\.vtt)"',
            sub_blob,
        ):
            url = fm.group(1)
            tail = sub_blob[fm.end() :]
            cm = re.search(r'language:\$R\[\d+\]=\{id:\d+,code:"([^"]+)"', tail)
            if cm:
                pairs.append((cm.group(1), url))
        return pairs

    def cut_subtitles_blob(rest_from_marker: str) -> str | None:
        for end_marker in ("],age_rating", "],tags:", "],theme:", "],transcoding_data:"):
            end = rest_from_marker.find(end_marker)
            if end >= 0:
                return rest_from_marker[: end + 1]
        return None

    best: tuple[tuple[str, str], ...] = ()
    best_dist: int | None = None
    for sm in re.finditer("subtitles:", chunk):
        sub_mark = sm.start()
        rest = chunk[sub_mark:]
        sub_blob = cut_subtitles_blob(rest)
        if not sub_blob:
            continue
        pairs = pairs_from_blob(sub_blob)
        if not pairs:
            continue
        dist = abs(sub_mark - slug_pos_in_chunk)
        if best_dist is None or dist < best_dist:
            best_dist = dist
            best = tuple(pairs)
    return best


def parse_episodes(html: str, series_slug: str, page_url: str) -> tuple[str, list[Episode]]:
    """Parse les épisodes d'une série via l'API /api/v1/series/."""
    base_m = re.search(r'transcoding_base_path:"([^"]+)"', html)
    base = base_m.group(1).rstrip("/") if base_m else ""

    origin = _origin_url(page_url)
    base_series_slug = _extract_series_base_slug(series_slug)
    data = _fetch_json(f"{origin}/api/v1/series/{base_series_slug}")

    out: list[Episode] = []
    for season in data.get("seasons", []):
        season_num = season.get("number", 0)
        for ep_data in season.get("episodes", []):
            if ep_data.get("type") != "vod":
                continue
            slug = ep_data.get("slug", "")
            title = ep_data.get("title", slug) or slug
            ep_num = ep_data.get("episode_number", 0)

            tbp = ep_data.get("transcoding_base_path", "")
            manifest_url: str | None = None
            for td in ep_data.get("transcoding_data", []):
                if td.get("drm_type") == "widevine" and td.get("manifest_type") == "dash":
                    mu = td.get("manifest_url", "")
                    if mu:
                        manifest_url = tbp + mu
                    break
            if not manifest_url:
                continue

            subs: list[tuple[str, str]] = []
            for s in ep_data.get("subtitles", []):
                lang = (s.get("language") or {}).get("code", "und")
                file_url = s.get("file", "")
                if file_url:
                    subs.append((lang, file_url))

            out.append(Episode(
                season=season_num,
                episode=ep_num,
                slug=slug,
                title=title,
                manifest_url=manifest_url,
                subtitles=tuple(subs),
            ))

    out.sort(key=lambda e: (e.season, e.episode, e.slug))
    return base, out


def parse_movie(html: str, movie_slug: str, page_url: str) -> tuple[str, list[Episode]]:
    """Parse un film via l'API /api/v1/media/."""
    origin = _origin_url(page_url)
    data = _fetch_json(f"{origin}/api/v1/media/{movie_slug}")

    manifest_url = _pick_widevine_dash_url(data, origin, movie_slug)

    title = data.get("title", movie_slug) or movie_slug
    subs = _subtitles_from_api(data)

    base_m = re.search(r'transcoding_base_path:"([^"]+)"', html)
    base = base_m.group(1).rstrip("/") if base_m else ""

    return base, [
        Episode(
            season=0, episode=0, slug=movie_slug,
            title=title, manifest_url=manifest_url, subtitles=tuple(subs),
        )
    ]


def parse_watch(html: str, slug: str, page_url: str) -> tuple[str, list[Episode]]:
    """Parse un épisode depuis une page /w/ (watch) via l'API /api/v1/media/."""
    origin = _origin_url(page_url)
    data = _fetch_json(f"{origin}/api/v1/media/{slug}")

    manifest_url = _pick_widevine_dash_url(data, origin, slug)

    title = data.get("title", slug) or slug
    sd = data.get("season_data", {}) or {}
    season = sd.get("season_number", 0)
    ep_m = re.search(r"-d\d+-(\d+)", slug)
    ep_num = int(ep_m.group(1)) if ep_m else 0
    subs = _subtitles_from_api(data)

    base_m = re.search(r'transcoding_base_path:"([^"]+)"', html)
    base = base_m.group(1).rstrip("/") if base_m else ""

    return base, [
        Episode(
            season=season, episode=ep_num, slug=slug,
            title=title, manifest_url=manifest_url, subtitles=tuple(subs),
        )
    ]


def _pick_widevine_dash_url(data: dict, origin: str, label: str) -> str:
    for m in data.get("manifests", []):
        if m.get("type") == "dash" and (m.get("drmConfig") or {}).get("type") == "widevine":
            rel = m.get("manifestURL", "")
            if rel:
                return origin + rel
    raise RuntimeError(f"Ez dago DASH Widevine manifestik « {label} »-rentzat.")


def _subtitles_from_api(data: dict) -> list[tuple[str, str]]:
    subs: list[tuple[str, str]] = []
    for s in data.get("subtitles", []):
        lang = (s.get("language") or {}).get("code", "und")
        file_url = s.get("file", "")
        if file_url:
            subs.append((lang, file_url))
    return subs


def _dash_segments_base_url(mpd_xml: bytes, manifest_url: str) -> str:
    """Répertoire de base pour init/media (période BaseURL absolu ou relatif, sinon dossier du MPD)."""
    root = ET.fromstring(mpd_xml)
    _strip_ns_tree(root)
    period = root.find("Period")
    if period is not None:
        bu_el = period.find("BaseURL")
        if bu_el is not None and bu_el.text:
            raw = bu_el.text.strip()
            if raw.startswith("http://") or raw.startswith("https://"):
                return raw.rstrip("/")
            if raw:
                return urljoin(manifest_url + "/", raw).rstrip("/")
    return manifest_url.rsplit("/", 1)[0]


def _subtitles_from_mpd_vtt(mpd_xml: bytes) -> tuple[tuple[str, str], ...]:
    """Pistes text/vtt avec BaseURL Directus (souvent présentes sur les MPD « films »)."""
    root = ET.fromstring(mpd_xml)
    _strip_ns_tree(root)
    out: list[tuple[str, str]] = []
    for ad in root.findall("Period/AdaptationSet"):
        mime = (ad.get("mimeType") or "").lower()
        if "vtt" not in mime:
            continue
        lang = ad.get("lang") or "und"
        for rep in ad.findall("Representation"):
            bu = rep.find("BaseURL")
            if bu is None or not bu.text:
                continue
            url = bu.text.strip()
            if url.startswith("https://cdnstorage.primeran.eus/") and url.endswith(".vtt"):
                out.append((lang, url))
    return tuple(out)


def _strip_ns(tag: str) -> str:
    return tag.split("}", 1)[-1] if tag.startswith("{") and "}" in tag else tag


def _strip_ns_tree(elem: ET.Element) -> None:
    elem.tag = _strip_ns(elem.tag)
    for k in list(elem.attrib):
        nk = _strip_ns(k) if k.startswith("{") else k
        if nk != k:
            elem.attrib[nk] = elem.attrib.pop(k)
    for c in elem:
        _strip_ns_tree(c)


def pick_video_audio_templates(
    mpd_xml: bytes, quality_height: int
) -> tuple[tuple[str, str, int, int], tuple[str, str, int, int]]:
    """
    Returns ((init, media_template, segment_count, start_number), ...) for best video
    <= quality_height and the (single) main audio track. Paths are relative to manifest
    URL directory. Segment count honors SegmentTimeline/S @r (repeat).
    """
    root = ET.fromstring(mpd_xml)
    _strip_ns_tree(root)

    def seg_count(st: ET.Element | None) -> int:
        if st is None:
            return 0
        tl = st.find("SegmentTimeline")
        if tl is None:
            return 0
        # Each <S> describes (r+1) contiguous segments of that duration (DASH r default 0).
        n = 0
        for s in tl.findall("S"):
            r = int(s.get("r", "0") or "0")
            n += r + 1
        return n

    def segment_start_number(st: ET.Element) -> int:
        sn = st.get("startNumber")
        return int(sn) if sn is not None else 1

    best: tuple[int, ET.Element, ET.Element] | None = None
    audio_pick: tuple[ET.Element, ET.Element] | None = None

    for ad in root.findall("Period/AdaptationSet"):
        ctype = ad.get("contentType", "")
        # Skip trickplay / thumbnails
        for rep in ad.findall("Representation"):
            st = rep.find("SegmentTemplate")
            if st is None:
                continue
            init = st.get("initialization") or ""
            if "trickplay" in init or "tiles" in init:
                continue
            if ctype == "video":
                h = int(rep.get("height") or "0")
                bw = int(rep.get("bandwidth") or "0")
                if h > quality_height:
                    continue
                if best is None or h > best[0] or (h == best[0] and bw > int(best[1].get("bandwidth") or "0")):
                    best = (h, rep, st)
            elif ctype == "audio":
                if audio_pick is None or int(rep.get("bandwidth") or "0") > int(
                    audio_pick[0].get("bandwidth") or "0"
                ):
                    audio_pick = (rep, st)

    if best is None or audio_pick is None:
        raise RuntimeError("Impossible de choisir pistes vidéo/audio dans le MPD")

    vrep, vst = best[1], best[2]
    arep, ast = audio_pick

    v_init = vst.get("initialization", "")
    v_media = vst.get("media", "")
    v_n = seg_count(vst)
    v_sn = segment_start_number(vst)
    a_init = ast.get("initialization", "")
    a_media = ast.get("media", "")
    a_n = seg_count(ast)
    a_sn = segment_start_number(ast)
    if not all([v_init, v_media, v_n, a_init, a_media, a_n]):
        raise RuntimeError("SegmentTemplate incomplet dans le MPD")

    return (v_init, v_media, v_n, v_sn), (a_init, a_media, a_n, a_sn)


def _download_file(url: str, dest: Path) -> None:
    dest.parent.mkdir(parents=True, exist_ok=True)
    r = _urlopen(url, timeout=300, headers={
        "User-Agent": "Mozilla/5.0 (compatible; primeran-dl/1.0)",
    })
    with r, dest.open("wb") as f:
        while True:
            b = r.read(1024 * 256)
            if not b:
                break
            f.write(b)


def _concat_segments(
    base_cenc: str,
    init_rel: str,
    media_templ: str,
    count: int,
    out_raw: Path,
    start_number: int = 1,
) -> None:
    parts: list[Path] = []
    with tempfile.TemporaryDirectory() as td:
        tdir = Path(td)
        init_url = f"{base_cenc}/{init_rel}"
        init_path = tdir / "init.mp4"
        _download_file(init_url, init_path)
        parts.append(init_path)
        for i in range(count):
            n = start_number + i
            media = media_templ.replace("$Number$", str(n))
            seg_url = f"{base_cenc}/{media}"
            seg_path = tdir / f"seg_{n}.m4s"
            try:
                _download_file(seg_url, seg_path)
            except urllib.error.HTTPError as e:
                if e.code == 404 and i > count * 0.9:
                    print(f"      … azken segmentua {n} falta da (404), gelditu", flush=True)
                    break
                raise
            parts.append(seg_path)
            if (i + 1) % 50 == 0:
                print(f"      … segmentuak {i + 1}/{count}", flush=True)

        if len(parts) < 5:
            raise RuntimeError(f"Segmentu gutxiegi jaitsi dira ({len(parts)})")

        with out_raw.open("wb") as out:
            for p in parts:
                out.write(p.read_bytes())


def _run_mp4decrypt(mp4decrypt: str, kid: str, key: str, in_path: Path, out_path: Path) -> None:
    cmd = [mp4decrypt, "--key", f"{kid}:{key}", str(in_path), str(out_path)]
    subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL)


def _ffmpeg_mux(ffmpeg: str, video: Path, audio: Path, out_mp4: Path) -> None:
    cmd = [
        ffmpeg,
        "-y",
        "-loglevel",
        "error",
        "-i",
        str(video),
        "-i",
        str(audio),
        "-map",
        "0:v:0",
        "-map",
        "1:a:0",
        "-c",
        "copy",
        str(out_mp4),
    ]
    subprocess.run(cmd, check=True)


def _lang_tag_for_filename(lang: str) -> str:
    t = re.sub(r"[^\w.-]+", "_", lang.strip(), flags=re.ASCII).strip("._")
    return t if t else "sub"


def _vtt_url_to_srt(ffmpeg: str, vtt_url: str, dest_srt: Path) -> None:
    with tempfile.TemporaryDirectory() as td:
        vtt_path = Path(td) / "subs.vtt"
        _download_file(vtt_url, vtt_path)
        cmd = [ffmpeg, "-y", "-loglevel", "error", "-i", str(vtt_path), str(dest_srt)]
        subprocess.run(cmd, check=True)


def _safe_name(s: str, max_len: int = 120) -> str:
    s = re.sub(r'[<>:"/\\|?*]+', "_", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s[:max_len] if len(s) > max_len else s


def main() -> int:
    ap = argparse.ArgumentParser(description="Primeran-etik bideoa jaitsi (DASH CENC + ClearKey).")
    ap.add_argument(
        "page_url",
        metavar="url",
        help="Orria …/s/<slug>, …/m/<slug> edo …/w/<slug> (adb. https://primeran.eus/m/maspalomas)",
    )
    ap.add_argument(
        "--key",
        dest="clearkey",
        help="KID:GILA hex-ean (32 karaktere bakoitza). Bestela PRIMERAN_CLEARKEY ingurune-aldagaia.",
    )
    ap.add_argument("-o", "--output-dir", type=Path, default=Path("."), help="Irteerako karpeta")
    ap.add_argument("--quality", type=int, default=1080, choices=(360, 480, 720, 1080))
    ap.add_argument(
        "--mp4decrypt",
        help="mp4decrypt-en bidea (lehenetsia: ./bin/mp4decrypt badago, bestela PATH)",
    )
    ap.add_argument(
        "--ffmpeg",
        help="ffmpeg-en bidea (lehenetsia: ./bin/ffmpeg badago, bestela PATH)",
    )
    ap.add_argument(
        "--limit",
        type=int,
        default=0,
        help="Lehenengo N elementuak bakarrik prozesatu (atalak edo film bakarra ; 0 = guztiak)",
    )
    ap.add_argument("--no-subs", action="store_true", help="Ez jaitsi / bihurtu azpitituluak .srt")
    args = ap.parse_args()

    ck = args.clearkey or os.environ.get("PRIMERAN_CLEARKEY")
    if not ck:
        print("Eman --key KID:GILA edo exportatu PRIMERAN_CLEARKEY=KID:GILA", file=sys.stderr)
        return 2
    if ":" not in ck:
        print("Giltza formatua: kid_hex:giltza_hex", file=sys.stderr)
        return 2
    kid_raw, key_raw = ck.split(":", 1)
    kid = kid_raw.replace("-", "").lower()
    key = key_raw.replace("-", "").lower()
    if len(kid) != 32 or len(key) != 32:
        print("KID eta giltzak 16 byte izan behar dituzte (32 karaktere hex).", file=sys.stderr)
        return 2

    mp4decrypt = _resolve_tool("mp4decrypt", args.mp4decrypt)
    if not mp4decrypt:
        print(
            f"mp4decrypt ez da aurkitu ({BUNDLE_BIN / 'mp4decrypt'} edo --mp4decrypt espero zen).",
            file=sys.stderr,
        )
        return 2
    ffmpeg = _resolve_tool("ffmpeg", args.ffmpeg)
    if not ffmpeg:
        print(
            "ffmpeg ez da aurkitu (jarri binary bat bin/ffmpeg karpetan, instalatu edo erabili --ffmpeg).",
            file=sys.stderr,
        )
        return 2

    try:
        kind, media_slug = _parse_primeran_page_url(args.page_url)
    except ValueError as ex:
        print(str(ex), file=sys.stderr)
        return 2
    print(f"Orria: {args.page_url} ({kind})", flush=True)
    html = _fetch(args.page_url)
    try:
        if kind == "movie":
            base, episodes = parse_movie(html, media_slug, args.page_url)
        elif kind == "watch":
            base, episodes = parse_watch(html, media_slug, args.page_url)
        else:
            base, episodes = parse_episodes(html, media_slug, args.page_url)
    except RuntimeError as ex:
        print(str(ex), file=sys.stderr)
        return 1
    if not episodes:
        label = "filma" if kind == "movie" else "atala"
        print(f"Ez da {label} aurkitu « {media_slug} »-rentzat.", file=sys.stderr)
        return 1
    if args.limit > 0:
        episodes = episodes[: args.limit]
    unit = "bideoa(k)" if kind == "movie" else "atala(k)"
    print(f"CDN: {base or '(ezezaguna)'} — {len(episodes)} {unit}.", flush=True)

    args.output_dir.mkdir(parents=True, exist_ok=True)

    for ep in episodes:
        if kind == "movie":
            label = f"{_safe_name(ep.title)}.mp4"
        else:
            label = f"S{ep.season:02d}E{ep.episode:02d} - {_safe_name(ep.title)}.mp4"
        dest = args.output_dir / label
        if dest.exists():
            print(f"Dagoeneko existitzen da, saltatu: {dest.name}", flush=True)
            continue

        manifest_url = ep.manifest_url
        print(f"Jaisten: {label}", flush=True)
        try:
            mpd = _urlopen(manifest_url, timeout=60, headers={
                "User-Agent": "Mozilla/5.0 (compatible; primeran-dl/1.0)",
            }).read()
        except urllib.error.HTTPError as e:
            print(f"  MPD errorea {e.code} {manifest_url}-rentzat", file=sys.stderr)
            continue

        try:
            vt, at = pick_video_audio_templates(mpd, args.quality)
        except Exception as ex:
            print(f"  MPD: {ex}", file=sys.stderr)
            continue

        base_cenc = _dash_segments_base_url(mpd, manifest_url)
        subtitle_tracks = ep.subtitles or _subtitles_from_mpd_vtt(mpd)
        with tempfile.TemporaryDirectory() as tmp:
            t = Path(tmp)
            v_raw = t / "v_raw.mp4"
            a_raw = t / "a_raw.mp4"
            v_dec = t / "v_dec.mp4"
            a_dec = t / "a_dec.mp4"
            print("  Bideoa…", flush=True)
            _concat_segments(base_cenc, vt[0], vt[1], vt[2], v_raw, vt[3])
            print("  Audioa…", flush=True)
            _concat_segments(base_cenc, at[0], at[1], at[2], a_raw, at[3])
            print("  Deszifratzen…", flush=True)
            _run_mp4decrypt(mp4decrypt, kid, key, v_raw, v_dec)
            _run_mp4decrypt(mp4decrypt, kid, key, a_raw, a_dec)
            print("  Mux ffmpeg…", flush=True)
            _ffmpeg_mux(ffmpeg, v_dec, a_dec, dest)
            if not args.no_subs and subtitle_tracks:
                print("  Azpitituluak (.srt)…", flush=True)
                lang_count: dict[str, int] = {}
                for lang, vtt_url in subtitle_tracks:
                    tag = _lang_tag_for_filename(lang)
                    n = lang_count.get(tag, 0) + 1
                    lang_count[tag] = n
                    if n == 1:
                        srt_name = f"{dest.stem}.{tag}.srt"
                    else:
                        srt_name = f"{dest.stem}.{tag}.{n}.srt"
                    srt_dest = args.output_dir / srt_name
                    if srt_dest.exists():
                        continue
                    try:
                        _vtt_url_to_srt(ffmpeg, vtt_url, srt_dest)
                    except (OSError, subprocess.CalledProcessError) as ex:
                        print(f"  SRT abisua ({tag}): {ex}", file=sys.stderr)
        print(f"  OK → {dest}", flush=True)

    return 0


if __name__ == "__main__":
    raise SystemExit(main())