#!/usr/bin/env python3 """KNIEPUNKT Assistant — Streamlit web interface.""" import json from pathlib import Path import streamlit as st # ── Cached resources (initialised once per server process) ─────────────────── @st.cache_resource def _get_client(): from kniepunkt.llm import get_client return get_client() @st.cache_resource def _get_providers(): from kniepunkt.providers import get_configured return get_configured() @st.cache_resource(show_spinner="Lade Episode-Index (einmalig)...") def _get_index(): from kniepunkt import episodes as ep_module return ep_module.build_index(_get_client()) def _episodes_context() -> str: from kniepunkt.episodes import format_for_context return format_for_context(_get_index()) # ── Session helpers ─────────────────────────────────────────────────────────── def _save(): from kniepunkt import session as sess_module if st.session_state.get("session"): sess_module.save(st.session_state.session) def _go(step: str): st.session_state.session["step"] = step st.session_state.step = step _save() st.rerun() # ── Constants ───────────────────────────────────────────────────────────────── STEPS = ["research", "sources", "storyline", "draft", "quality", "publication"] STEP_LABELS = { "research": "1. Recherche", "sources": "2. Quellen", "storyline": "3. Storyline", "draft": "4. Entwurf", "quality": "5. Qualität", "publication": "6. Publikation", } # ── Shared UI components ────────────────────────────────────────────────────── def _progress_bar(): current = st.session_state.get("step", "start") if current not in STEPS: return idx = STEPS.index(current) st.progress((idx + 1) / len(STEPS), text=STEP_LABELS[current]) st.divider() def _back_next(back_step: str, next_label: str, next_key: str, next_disabled: bool = False): col1, col2 = st.columns([1, 5]) with col1: if st.button("← Zurück", key=f"back_{back_step}"): _go(back_step) with col2: if st.button(next_label, type="primary", key=next_key, disabled=next_disabled): return True return False def _grid_2x2(results: dict, select_key: str, button_label: str = "Auswählen"): """Render provider outputs in a responsive 2-column grid with selection.""" items = list(results.items()) selected = st.session_state.get(select_key) for row_start in range(0, len(items), 2): cols = st.columns(2) for col_idx, col in enumerate(cols): item_idx = row_start + col_idx if item_idx >= len(items): break name, text = items[item_idx] with col: is_selected = selected == name header = f"✅ **{name}**" if is_selected else f"**{name}**" st.markdown(header) with st.container(border=True): st.markdown(text) btn_type = "primary" if is_selected else "secondary" if st.button(button_label, key=f"{select_key}_{name}", type=btn_type): st.session_state[select_key] = name st.rerun() st.write("") # spacer between rows # ── Screens ─────────────────────────────────────────────────────────────────── def screen_start(): st.title("📰 KNIEPUNKT Assistant") st.caption("Redaktioneller Workflow · Dr. André Knie") st.divider() providers = _get_providers() if not providers: st.error("Keine API-Keys konfiguriert. Bitte .env-Datei prüfen.") return names = " · ".join(f"**{p.name}**" for p in providers) st.success(f"Aktive Modelle: {names}") st.write("") from kniepunkt import session as sess_module latest = sess_module.load_latest() col1, col2 = st.columns(2) with col1: st.subheader("Neue Episode") if st.button("Neue Session starten", type="primary", use_container_width=True): # Clear all step-related cached state for key in ["news_digest", "sources", "storyline_results", "storyline_selected", "draft_results", "draft_selected", "draft_text", "draft_text_source", "quality_report", "teasers", "visual_ideas", "package_text"]: st.session_state.pop(key, None) new = sess_module.new_session() st.session_state.session = new st.session_state.step = "research" _save() st.rerun() with col2: if latest and latest.get("step") not in ("done", None): st.subheader(f"Fortfahren: {latest['date']}") st.caption(f"Zuletzt: **{STEP_LABELS.get(latest['step'], latest['step'])}**") if st.button("Session fortsetzen", use_container_width=True): st.session_state.session = latest st.session_state.step = latest["step"] # Restore cached results if latest.get("research"): st.session_state.news_digest = latest["research"].get("news_digest", "") if latest.get("sources"): st.session_state.sources = latest["sources"] if latest.get("storyline"): sl = latest["storyline"] st.session_state.storyline_results = sl.get("results", {}) st.session_state.storyline_selected = sl.get("selected_provider") if latest.get("draft"): st.session_state.draft_text = latest["draft"] if latest.get("quality"): st.session_state.quality_report = latest["quality"] if latest.get("publication"): pub = latest["publication"] st.session_state.teasers = pub.get("teasers", "") st.session_state.visual_ideas = pub.get("visual_ideas", "") st.rerun() else: st.subheader("Keine offene Session") st.caption("Starten Sie eine neue Episode oben links.") def screen_research(): st.header("Recherche") _progress_bar() with st.form("research_form"): author_news = st.text_area( "Ihre KI-Nachrichten der Woche", placeholder="URLs, Schlagzeilen, Beobachtungen – eine pro Zeile...", height=200, ) initial_storyline = st.text_input( "Vorläufige Storyline-Idee (optional)", placeholder="Leer lassen, falls noch keine Idee...", ) submitted = st.form_submit_button("🔍 Recherche starten", type="primary") if submitted: if not author_news.strip(): st.warning("Bitte mindestens eine Nachricht eingeben.") else: author_input = { "author_news": author_news, "initial_storyline": initial_storyline.strip() or None, } with st.spinner("Recherchiere aktuelle KI-Nachrichten..."): from kniepunkt import research as res_module news_digest = res_module.research_news( _get_client(), author_input, _episodes_context() ) st.session_state.news_digest = news_digest st.session_state.session["research"] = { "author_input": author_input, "news_digest": news_digest, } # Clear downstream cached results when research changes for key in ["sources", "storyline_results", "storyline_selected", "draft_results", "draft_selected", "draft_text", "quality_report", "teasers", "visual_ideas", "package_text"]: st.session_state.pop(key, None) _save() if "news_digest" in st.session_state: st.divider() st.subheader("Nachrichten-Kandidaten") st.markdown(st.session_state.news_digest) st.divider() if st.button("Weiter zur Quellenbewertung →", type="primary"): _go("sources") def screen_sources(): st.header("Quellenbewertung") _progress_bar() if "sources" not in st.session_state: with st.spinner("Bewerte Quellen..."): from kniepunkt import research as res_module sources = res_module.assess_sources( _get_client(), st.session_state.session["research"]["news_digest"], ) st.session_state.sources = sources st.session_state.session["sources"] = sources _save() st.markdown(st.session_state.sources) st.divider() col1, col2 = st.columns([1, 5]) with col1: if st.button("← Zurück", key="back_research"): _go("research") with col2: if st.button("Weiter zur Storyline-Entwicklung →", type="primary", key="to_storyline"): _go("storyline") def screen_storyline(): st.header("Storyline-Entwicklung") _progress_bar() if "storyline_results" not in st.session_state: with st.spinner("Generiere Storylines mit allen Modellen..."): from kniepunkt import storyline as sl_module results = sl_module.generate_storylines( _get_providers(), st.session_state.session["research"]["news_digest"], st.session_state.session["sources"], st.session_state.session["research"]["author_input"], _episodes_context(), ) st.session_state.storyline_results = results _grid_2x2(st.session_state.storyline_results, "storyline_selected", "Diese Storyline wählen") if st.session_state.get("storyline_selected"): st.divider() adjustments = st.text_area( "Anpassungen oder Hinweise zur gewählten Storyline (optional)", key="storyline_adjustments", height=80, ) col1, col2 = st.columns([1, 5]) with col1: if st.button("← Zurück", key="back_sources2"): _go("sources") with col2: if st.button("Weiter zum Entwurf →", type="primary", key="to_draft"): selected = st.session_state.storyline_selected st.session_state.session["storyline"] = { "results": st.session_state.storyline_results, "selected_provider": selected, "choice": 1, "selected_text": st.session_state.storyline_results[selected], "adjustments": adjustments, } _save() _go("draft") else: st.info("Wählen Sie eine Storyline aus dem Grid oben.") if st.button("← Zurück", key="back_sources3"): _go("sources") def screen_draft(): st.header("Entwurf") _progress_bar() sl = st.session_state.session["storyline"] if "draft_results" not in st.session_state: with st.spinner("Schreibe Entwürfe mit allen Modellen..."): from kniepunkt import drafting as dr_module results = dr_module.generate_drafts( _get_providers(), sl["selected_text"], sl["choice"], sl["adjustments"], st.session_state.session["research"]["news_digest"], st.session_state.session["sources"], st.session_state.session["research"]["author_input"], ) st.session_state.draft_results = results results = st.session_state.draft_results _grid_2x2(results, "draft_selected", "Diesen Entwurf als Basis") selected = st.session_state.get("draft_selected") if selected: # Reset draft_text when a new model is selected if st.session_state.get("draft_text_source") != selected: st.session_state.draft_text = results[selected] st.session_state.draft_text_source = selected st.divider() st.subheader("Entwurf bearbeiten") draft = st.text_area( "Text direkt bearbeiten oder KI-Feedback unten eingeben", value=st.session_state.draft_text, height=500, key="draft_editor", ) col_fb, col_btn = st.columns([4, 1]) with col_fb: feedback = st.text_input("KI-Überarbeitungshinweis", placeholder="z.B. 'Einleitung kürzen, mehr Meinung im zweiten Absatz'") with col_btn: st.write("") if st.button("Überarbeiten", disabled=not feedback.strip(), key="ai_revise_draft"): with st.spinner("Überarbeite Entwurf..."): from kniepunkt import drafting as dr_module draft = dr_module.enrich_draft(_get_client(), draft, feedback) st.session_state.draft_text = draft st.session_state.draft_text_source = selected st.rerun() st.divider() col1, col2 = st.columns([1, 5]) with col1: if st.button("← Zurück", key="back_storyline"): st.session_state.pop("draft_results", None) st.session_state.pop("draft_selected", None) st.session_state.pop("draft_text", None) _go("storyline") with col2: if st.button("Entwurf übernehmen & Qualität prüfen →", type="primary", key="to_quality"): st.session_state.session["draft"] = draft st.session_state.session["draft_results"] = results st.session_state.draft_text = draft st.session_state.pop("quality_report", None) _save() _go("quality") else: st.info("Wählen Sie einen Entwurf aus dem Grid oben.") if st.button("← Zurück", key="back_storyline2"): _go("storyline") def screen_quality(): st.header("Qualitätsprüfung") _progress_bar() if "quality_report" not in st.session_state: with st.spinner("Prüfe Qualität..."): from kniepunkt import quality as q_module report = q_module.review( _get_client(), st.session_state.session["draft"], st.session_state.session["sources"], _episodes_context(), ) st.session_state.quality_report = report st.session_state.session["quality"] = report _save() st.markdown(st.session_state.quality_report) st.divider() st.subheader("Entwurf finalisieren") draft = st.text_area( "Text", value=st.session_state.session.get("draft", ""), height=500, key="quality_editor", ) col_fb, col_btn = st.columns([4, 1]) with col_fb: feedback = st.text_input("KI-Überarbeitungshinweis", placeholder="Basierend auf der Qualitätsprüfung oben...", key="quality_feedback") with col_btn: st.write("") if st.button("Überarbeiten", disabled=not feedback.strip(), key="ai_revise_quality"): with st.spinner("Überarbeite Entwurf..."): from kniepunkt import drafting as dr_module draft = dr_module.enrich_draft(_get_client(), draft, feedback) st.session_state.session["draft"] = draft st.session_state.pop("quality_report", None) _save() st.rerun() st.divider() col1, col2 = st.columns([1, 5]) with col1: if st.button("← Zurück", key="back_draft"): st.session_state.pop("quality_report", None) _go("draft") with col2: if st.button("Weiter zur Publikationsvorbereitung →", type="primary", key="to_pub"): st.session_state.session["draft"] = draft st.session_state.pop("teasers", None) st.session_state.pop("visual_ideas", None) st.session_state.pop("package_text", None) _save() _go("publication") def screen_publication(): st.header("Publikationsvorbereitung") _progress_bar() from kniepunkt import publication as pub_module if "teasers" not in st.session_state: with st.spinner("Erstelle Teaser-Varianten..."): teasers = pub_module.generate_teasers( _get_client(), st.session_state.session["draft"] ) st.session_state.teasers = teasers st.session_state.session.setdefault("publication", {})["teasers"] = teasers if "visual_ideas" not in st.session_state: with st.spinner("Entwickle Cover-Ideen..."): visual_ideas = pub_module.generate_visual_ideas( _get_client(), st.session_state.session["draft"] ) st.session_state.visual_ideas = visual_ideas st.session_state.session.setdefault("publication", {})["visual_ideas"] = visual_ideas if "package_text" not in st.session_state: out_path = pub_module.save_package( st.session_state.session["date"], st.session_state.session["draft"], st.session_state.teasers, st.session_state.session["sources"], st.session_state.visual_ideas, ) st.session_state.package_text = out_path.read_text(encoding="utf-8") st.session_state.session["step"] = "done" _save() # Teasers st.subheader("Einladungstexte (Teaser-Varianten)") st.markdown(st.session_state.teasers) st.divider() # Visual ideas st.subheader("Cover-Ideen & Bild-Prompts") st.markdown(st.session_state.visual_ideas) st.divider() # Download + copy st.subheader("Publikationspaket") date = st.session_state.session["date"] package_text = st.session_state.package_text col1, col2 = st.columns(2) with col1: st.download_button( "⬇ Als Markdown herunterladen", data=package_text, file_name=f"KNIEPUNKT_{date}_publikation.md", mime="text/markdown", use_container_width=True, ) with col2: st.info("💡 Kopieren: Code-Block unten aufklappen → Kopier-Button oben rechts im Block") with st.expander("Vollständiges Publikationspaket (mit Kopier-Button)", expanded=False): st.code(package_text, language=None) st.divider() if st.button("🏠 Neue Episode starten", key="restart"): for key in list(st.session_state.keys()): if key not in ("step",): st.session_state.pop(key, None) st.session_state.step = "start" st.rerun() def screen_history(): st.header("Frühere Episoden") st.divider() sessions_dir = Path("sessions") if not sessions_dir.exists(): st.info("Noch keine gespeicherten Sessions.") return json_files = sorted(sessions_dir.glob("*.json"), reverse=True) pub_files = { f.stem.replace("_publikation", ""): f for f in sessions_dir.glob("*_publikation.md") } if not json_files: st.info("Noch keine gespeicherten Sessions.") return for f in json_files: with open(f) as fp: sess = json.load(fp) date = sess.get("date", f.stem) step = sess.get("step", "?") status = "✅ Fertig" if step == "done" else f"⏳ {STEP_LABELS.get(step, step)}" with st.expander(f"**{date}** — {status}"): if date in pub_files: pub_text = pub_files[date].read_text(encoding="utf-8") st.markdown(pub_text) st.download_button( "⬇ Herunterladen", data=pub_text, file_name=f"KNIEPUNKT_{date}_publikation.md", mime="text/markdown", key=f"dl_{date}", ) else: st.caption("Kein Publikationspaket für diese Session vorhanden.") # ── Main ────────────────────────────────────────────────────────────────────── def main(): st.set_page_config( page_title="KNIEPUNKT Assistant", page_icon="📰", layout="wide", ) if "step" not in st.session_state: st.session_state.step = "start" if "session" not in st.session_state: st.session_state.session = {} with st.sidebar: st.markdown("## 📰 KNIEPUNKT") st.caption("Redaktioneller Workflow") st.divider() current = st.session_state.step for step, label in STEP_LABELS.items(): if step == current: st.markdown(f"**→ {label}**") elif current in STEPS and STEPS.index(step) < STEPS.index(current): st.markdown(f"✓ {label}") else: st.markdown(f"  {label}", unsafe_allow_html=True) st.divider() if st.button("📚 Frühere Episoden", use_container_width=True): st.session_state.step = "history" st.rerun() if st.button("🏠 Startseite", use_container_width=True): st.session_state.step = "start" st.rerun() st.divider() st.caption("Aktive Modelle:") try: for p in _get_providers(): st.caption(f"• {p.name}") except Exception: st.caption("Nicht geladen") screens = { "start": screen_start, "research": screen_research, "sources": screen_sources, "storyline": screen_storyline, "draft": screen_draft, "quality": screen_quality, "publication": screen_publication, "history": screen_history, } screens.get(st.session_state.step, screen_start)() if __name__ == "__main__": main()