"""Load and index existing KNIEPUNKT episodes for redundancy checks.""" import json from pathlib import Path EPISODES_DIR = Path(__file__).parent.parent / "KNIEPUNKTe" CACHE_FILE = Path(__file__).parent.parent / "episodes_cache.json" _SYSTEM = ( "Du bist ein erfahrener Redaktionsassistent für die LinkedIn-Kolumne KNIEPUNKT " "von Dr. André Knie. Analysiere Episodentexte präzise und strukturiert." ) def _read_pdf(path: Path) -> str: from pypdf import PdfReader reader = PdfReader(str(path)) return "\n".join(page.extract_text() or "" for page in reader.pages) def _read_docx(path: Path) -> str: from docx import Document doc = Document(str(path)) return "\n".join(p.text for p in doc.paragraphs) def _read_odt(path: Path) -> str: from odf.opendocument import load from odf.text import P doc = load(str(path)) paragraphs = doc.getElementsByType(P) return "\n".join( "".join(node.data for node in p.childNodes if hasattr(node, "data")) for p in paragraphs ) def _read_episode(path: Path) -> str: suffix = path.suffix.lower() if suffix == ".pdf": return _read_pdf(path) if suffix == ".docx": return _read_docx(path) if suffix == ".odt": return _read_odt(path) return "" def _episode_files() -> list[Path]: """Return one canonical file per episode number, preferring DOCX/ODT over PDF.""" if not EPISODES_DIR.exists(): return [] by_number: dict[str, Path] = {} for f in sorted(EPISODES_DIR.iterdir()): if f.suffix.lower() not in (".pdf", ".docx", ".odt"): continue num = f.name.split("_")[0].split(" ")[0].strip() try: int(num) except ValueError: continue existing = by_number.get(num) if existing is None or f.suffix.lower() in (".docx", ".odt"): by_number[num] = f return [by_number[k] for k in sorted(by_number)] def build_index(client, force: bool = False) -> list[dict]: """Build or load the episode summary index.""" if not force and CACHE_FILE.exists(): with open(CACHE_FILE) as f: return json.load(f) from kniepunkt.llm import chat index = [] files = _episode_files() for path in files: text = _read_episode(path) if not text.strip(): continue prompt = f"""Analysiere diese KNIEPUNKT-Episode und antworte NUR mit einem JSON-Objekt: {{ "nummer": "z.B. 001", "titel": "Titel der Episode", "hauptthema": "Hauptthema in 1-2 Sätzen", "allegories": ["verwendete Allegorie oder Metapher", ...], "kulturelle_referenzen": ["Mythos/Literatur-Referenz", ...], "kernargument": "Hauptaussage des Autors in 1-2 Sätzen" }} Episodentext (gekürzt): {text[:5000]}""" response = chat(client, [{"role": "user", "content": prompt}], _SYSTEM, max_tokens=512) try: start = response.find("{") end = response.rfind("}") + 1 data = json.loads(response[start:end]) data["file"] = str(path) index.append(data) except (json.JSONDecodeError, ValueError): index.append({ "nummer": path.name.split("_")[0].strip(), "titel": path.stem, "hauptthema": "", "allegories": [], "kulturelle_referenzen": [], "kernargument": "", "file": str(path), }) with open(CACHE_FILE, "w") as f: json.dump(index, f, ensure_ascii=False, indent=2) return index def format_for_context(index: list[dict]) -> str: """Format the episode index as a compact context string for prompts.""" lines = [] for ep in index: allegs = ", ".join(ep.get("allegories", [])) or "–" refs = ", ".join(ep.get("kulturelle_referenzen", [])) or "–" lines.append( f"Ep.{ep.get('nummer', '?')} »{ep.get('titel', '?')}«: " f"{ep.get('hauptthema', '')} | Allegorien: {allegs} | Referenzen: {refs}" ) return "\n".join(lines)