Files
Kniepunkt/app.py
T
ankn fd011f6db3 Fix text area reset after AI revision, add API key error handling
- Pop draft_editor/quality_editor widget keys before rerun so revised
  text is correctly displayed after AI enrichment calls
- Wrap _get_client() with friendly st.error/st.stop instead of stack trace
- Remove unused _back_next() helper

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-25 23:26:27 +02:00

599 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""KNIEPUNKT Assistant — Streamlit web interface."""
import json
from pathlib import Path
import streamlit as st
# ── Cached resources (initialised once per server process) ───────────────────
@st.cache_resource
def _get_client():
from kniepunkt.llm import get_client
try:
return get_client()
except RuntimeError as e:
st.error(f"{e}\n\nBitte ANTHROPIC_API_KEY in der .env-Datei eintragen und Streamlit neu starten.")
st.stop()
@st.cache_resource
def _get_providers():
from kniepunkt.providers import get_configured
return get_configured()
@st.cache_resource(show_spinner="Lade Episode-Index (einmalig)...")
def _get_index():
from kniepunkt import episodes as ep_module
return ep_module.build_index(_get_client())
def _episodes_context() -> str:
from kniepunkt.episodes import format_for_context
return format_for_context(_get_index())
# ── Session helpers ───────────────────────────────────────────────────────────
def _save():
from kniepunkt import session as sess_module
if st.session_state.get("session"):
sess_module.save(st.session_state.session)
def _go(step: str):
st.session_state.session["step"] = step
st.session_state.step = step
_save()
st.rerun()
# ── Constants ─────────────────────────────────────────────────────────────────
STEPS = ["research", "sources", "storyline", "draft", "quality", "publication"]
STEP_LABELS = {
"research": "1. Recherche",
"sources": "2. Quellen",
"storyline": "3. Storyline",
"draft": "4. Entwurf",
"quality": "5. Qualität",
"publication": "6. Publikation",
}
# ── Shared UI components ──────────────────────────────────────────────────────
def _progress_bar():
current = st.session_state.get("step", "start")
if current not in STEPS:
return
idx = STEPS.index(current)
st.progress((idx + 1) / len(STEPS), text=STEP_LABELS[current])
st.divider()
def _grid_2x2(results: dict, select_key: str, button_label: str = "Auswählen"):
"""Render provider outputs in a responsive 2-column grid with selection."""
items = list(results.items())
selected = st.session_state.get(select_key)
for row_start in range(0, len(items), 2):
cols = st.columns(2)
for col_idx, col in enumerate(cols):
item_idx = row_start + col_idx
if item_idx >= len(items):
break
name, text = items[item_idx]
with col:
is_selected = selected == name
header = f"✅ **{name}**" if is_selected else f"**{name}**"
st.markdown(header)
with st.container(border=True):
st.markdown(text)
btn_type = "primary" if is_selected else "secondary"
if st.button(button_label, key=f"{select_key}_{name}", type=btn_type):
st.session_state[select_key] = name
st.rerun()
st.write("") # spacer between rows
# ── Screens ───────────────────────────────────────────────────────────────────
def screen_start():
st.title("📰 KNIEPUNKT Assistant")
st.caption("Redaktioneller Workflow · Dr. André Knie")
st.divider()
providers = _get_providers()
if not providers:
st.error("Keine API-Keys konfiguriert. Bitte .env-Datei prüfen.")
return
names = " · ".join(f"**{p.name}**" for p in providers)
st.success(f"Aktive Modelle: {names}")
st.write("")
from kniepunkt import session as sess_module
latest = sess_module.load_latest()
col1, col2 = st.columns(2)
with col1:
st.subheader("Neue Episode")
if st.button("Neue Session starten", type="primary", use_container_width=True):
# Clear all step-related cached state
for key in ["news_digest", "sources", "storyline_results", "storyline_selected",
"draft_results", "draft_selected", "draft_text", "draft_text_source",
"quality_report", "teasers", "visual_ideas", "package_text"]:
st.session_state.pop(key, None)
new = sess_module.new_session()
st.session_state.session = new
st.session_state.step = "research"
_save()
st.rerun()
with col2:
if latest and latest.get("step") not in ("done", None):
st.subheader(f"Fortfahren: {latest['date']}")
st.caption(f"Zuletzt: **{STEP_LABELS.get(latest['step'], latest['step'])}**")
if st.button("Session fortsetzen", use_container_width=True):
st.session_state.session = latest
st.session_state.step = latest["step"]
# Restore cached results
if latest.get("research"):
st.session_state.news_digest = latest["research"].get("news_digest", "")
if latest.get("sources"):
st.session_state.sources = latest["sources"]
if latest.get("storyline"):
sl = latest["storyline"]
st.session_state.storyline_results = sl.get("results", {})
st.session_state.storyline_selected = sl.get("selected_provider")
if latest.get("draft"):
st.session_state.draft_text = latest["draft"]
if latest.get("quality"):
st.session_state.quality_report = latest["quality"]
if latest.get("publication"):
pub = latest["publication"]
st.session_state.teasers = pub.get("teasers", "")
st.session_state.visual_ideas = pub.get("visual_ideas", "")
st.rerun()
else:
st.subheader("Keine offene Session")
st.caption("Starten Sie eine neue Episode oben links.")
def screen_research():
st.header("Recherche")
_progress_bar()
with st.form("research_form"):
author_news = st.text_area(
"Ihre KI-Nachrichten der Woche",
placeholder="URLs, Schlagzeilen, Beobachtungen eine pro Zeile...",
height=200,
)
initial_storyline = st.text_input(
"Vorläufige Storyline-Idee (optional)",
placeholder="Leer lassen, falls noch keine Idee...",
)
submitted = st.form_submit_button("🔍 Recherche starten", type="primary")
if submitted:
if not author_news.strip():
st.warning("Bitte mindestens eine Nachricht eingeben.")
else:
author_input = {
"author_news": author_news,
"initial_storyline": initial_storyline.strip() or None,
}
with st.spinner("Recherchiere aktuelle KI-Nachrichten..."):
from kniepunkt import research as res_module
news_digest = res_module.research_news(
_get_client(), author_input, _episodes_context()
)
st.session_state.news_digest = news_digest
st.session_state.session["research"] = {
"author_input": author_input,
"news_digest": news_digest,
}
# Clear downstream cached results when research changes
for key in ["sources", "storyline_results", "storyline_selected",
"draft_results", "draft_selected", "draft_text",
"quality_report", "teasers", "visual_ideas", "package_text"]:
st.session_state.pop(key, None)
_save()
if "news_digest" in st.session_state:
st.divider()
st.subheader("Nachrichten-Kandidaten")
st.markdown(st.session_state.news_digest)
st.divider()
if st.button("Weiter zur Quellenbewertung →", type="primary"):
_go("sources")
def screen_sources():
st.header("Quellenbewertung")
_progress_bar()
if "sources" not in st.session_state:
with st.spinner("Bewerte Quellen..."):
from kniepunkt import research as res_module
sources = res_module.assess_sources(
_get_client(),
st.session_state.session["research"]["news_digest"],
)
st.session_state.sources = sources
st.session_state.session["sources"] = sources
_save()
st.markdown(st.session_state.sources)
st.divider()
col1, col2 = st.columns([1, 5])
with col1:
if st.button("← Zurück", key="back_research"):
_go("research")
with col2:
if st.button("Weiter zur Storyline-Entwicklung →", type="primary", key="to_storyline"):
_go("storyline")
def screen_storyline():
st.header("Storyline-Entwicklung")
_progress_bar()
if "storyline_results" not in st.session_state:
with st.spinner("Generiere Storylines mit allen Modellen..."):
from kniepunkt import storyline as sl_module
results = sl_module.generate_storylines(
_get_providers(),
st.session_state.session["research"]["news_digest"],
st.session_state.session["sources"],
st.session_state.session["research"]["author_input"],
_episodes_context(),
)
st.session_state.storyline_results = results
_grid_2x2(st.session_state.storyline_results, "storyline_selected", "Diese Storyline wählen")
if st.session_state.get("storyline_selected"):
st.divider()
adjustments = st.text_area(
"Anpassungen oder Hinweise zur gewählten Storyline (optional)",
key="storyline_adjustments",
height=80,
)
col1, col2 = st.columns([1, 5])
with col1:
if st.button("← Zurück", key="back_sources2"):
_go("sources")
with col2:
if st.button("Weiter zum Entwurf →", type="primary", key="to_draft"):
selected = st.session_state.storyline_selected
st.session_state.session["storyline"] = {
"results": st.session_state.storyline_results,
"selected_provider": selected,
"choice": 1,
"selected_text": st.session_state.storyline_results[selected],
"adjustments": adjustments,
}
_save()
_go("draft")
else:
st.info("Wählen Sie eine Storyline aus dem Grid oben.")
if st.button("← Zurück", key="back_sources3"):
_go("sources")
def screen_draft():
st.header("Entwurf")
_progress_bar()
sl = st.session_state.session["storyline"]
if "draft_results" not in st.session_state:
with st.spinner("Schreibe Entwürfe mit allen Modellen..."):
from kniepunkt import drafting as dr_module
results = dr_module.generate_drafts(
_get_providers(),
sl["selected_text"],
sl["choice"],
sl["adjustments"],
st.session_state.session["research"]["news_digest"],
st.session_state.session["sources"],
st.session_state.session["research"]["author_input"],
)
st.session_state.draft_results = results
results = st.session_state.draft_results
_grid_2x2(results, "draft_selected", "Diesen Entwurf als Basis")
selected = st.session_state.get("draft_selected")
if selected:
# Reset draft_text when a new model is selected
if st.session_state.get("draft_text_source") != selected:
st.session_state.draft_text = results[selected]
st.session_state.draft_text_source = selected
st.divider()
st.subheader("Entwurf bearbeiten")
draft = st.text_area(
"Text direkt bearbeiten oder KI-Feedback unten eingeben",
value=st.session_state.draft_text,
height=500,
key="draft_editor",
)
col_fb, col_btn = st.columns([4, 1])
with col_fb:
feedback = st.text_input("KI-Überarbeitungshinweis", placeholder="z.B. 'Einleitung kürzen, mehr Meinung im zweiten Absatz'")
with col_btn:
st.write("")
if st.button("Überarbeiten", disabled=not feedback.strip(), key="ai_revise_draft"):
with st.spinner("Überarbeite Entwurf..."):
from kniepunkt import drafting as dr_module
draft = dr_module.enrich_draft(_get_client(), draft, feedback)
st.session_state.draft_text = draft
st.session_state.draft_text_source = selected
st.session_state.pop("draft_editor", None)
st.rerun()
st.divider()
col1, col2 = st.columns([1, 5])
with col1:
if st.button("← Zurück", key="back_storyline"):
st.session_state.pop("draft_results", None)
st.session_state.pop("draft_selected", None)
st.session_state.pop("draft_text", None)
_go("storyline")
with col2:
if st.button("Entwurf übernehmen & Qualität prüfen →", type="primary", key="to_quality"):
st.session_state.session["draft"] = draft
st.session_state.session["draft_results"] = results
st.session_state.draft_text = draft
st.session_state.pop("quality_report", None)
_save()
_go("quality")
else:
st.info("Wählen Sie einen Entwurf aus dem Grid oben.")
if st.button("← Zurück", key="back_storyline2"):
_go("storyline")
def screen_quality():
st.header("Qualitätsprüfung")
_progress_bar()
if "quality_report" not in st.session_state:
with st.spinner("Prüfe Qualität..."):
from kniepunkt import quality as q_module
report = q_module.review(
_get_client(),
st.session_state.session["draft"],
st.session_state.session["sources"],
_episodes_context(),
)
st.session_state.quality_report = report
st.session_state.session["quality"] = report
_save()
st.markdown(st.session_state.quality_report)
st.divider()
st.subheader("Entwurf finalisieren")
draft = st.text_area(
"Text",
value=st.session_state.session.get("draft", ""),
height=500,
key="quality_editor",
)
col_fb, col_btn = st.columns([4, 1])
with col_fb:
feedback = st.text_input("KI-Überarbeitungshinweis", placeholder="Basierend auf der Qualitätsprüfung oben...", key="quality_feedback")
with col_btn:
st.write("")
if st.button("Überarbeiten", disabled=not feedback.strip(), key="ai_revise_quality"):
with st.spinner("Überarbeite Entwurf..."):
from kniepunkt import drafting as dr_module
draft = dr_module.enrich_draft(_get_client(), draft, feedback)
st.session_state.session["draft"] = draft
st.session_state.pop("quality_report", None)
st.session_state.pop("quality_editor", None)
_save()
st.rerun()
st.divider()
col1, col2 = st.columns([1, 5])
with col1:
if st.button("← Zurück", key="back_draft"):
st.session_state.pop("quality_report", None)
_go("draft")
with col2:
if st.button("Weiter zur Publikationsvorbereitung →", type="primary", key="to_pub"):
st.session_state.session["draft"] = draft
st.session_state.pop("teasers", None)
st.session_state.pop("visual_ideas", None)
st.session_state.pop("package_text", None)
_save()
_go("publication")
def screen_publication():
st.header("Publikationsvorbereitung")
_progress_bar()
from kniepunkt import publication as pub_module
if "teasers" not in st.session_state:
with st.spinner("Erstelle Teaser-Varianten..."):
teasers = pub_module.generate_teasers(
_get_client(), st.session_state.session["draft"]
)
st.session_state.teasers = teasers
st.session_state.session.setdefault("publication", {})["teasers"] = teasers
if "visual_ideas" not in st.session_state:
with st.spinner("Entwickle Cover-Ideen..."):
visual_ideas = pub_module.generate_visual_ideas(
_get_client(), st.session_state.session["draft"]
)
st.session_state.visual_ideas = visual_ideas
st.session_state.session.setdefault("publication", {})["visual_ideas"] = visual_ideas
if "package_text" not in st.session_state:
out_path = pub_module.save_package(
st.session_state.session["date"],
st.session_state.session["draft"],
st.session_state.teasers,
st.session_state.session["sources"],
st.session_state.visual_ideas,
)
st.session_state.package_text = out_path.read_text(encoding="utf-8")
st.session_state.session["step"] = "done"
_save()
# Teasers
st.subheader("Einladungstexte (Teaser-Varianten)")
st.markdown(st.session_state.teasers)
st.divider()
# Visual ideas
st.subheader("Cover-Ideen & Bild-Prompts")
st.markdown(st.session_state.visual_ideas)
st.divider()
# Download + copy
st.subheader("Publikationspaket")
date = st.session_state.session["date"]
package_text = st.session_state.package_text
col1, col2 = st.columns(2)
with col1:
st.download_button(
"⬇ Als Markdown herunterladen",
data=package_text,
file_name=f"KNIEPUNKT_{date}_publikation.md",
mime="text/markdown",
use_container_width=True,
)
with col2:
st.info("💡 Kopieren: Code-Block unten aufklappen → Kopier-Button oben rechts im Block")
with st.expander("Vollständiges Publikationspaket (mit Kopier-Button)", expanded=False):
st.code(package_text, language=None)
st.divider()
if st.button("🏠 Neue Episode starten", key="restart"):
for key in list(st.session_state.keys()):
if key not in ("step",):
st.session_state.pop(key, None)
st.session_state.step = "start"
st.rerun()
def screen_history():
st.header("Frühere Episoden")
st.divider()
sessions_dir = Path("sessions")
if not sessions_dir.exists():
st.info("Noch keine gespeicherten Sessions.")
return
json_files = sorted(sessions_dir.glob("*.json"), reverse=True)
pub_files = {
f.stem.replace("_publikation", ""): f
for f in sessions_dir.glob("*_publikation.md")
}
if not json_files:
st.info("Noch keine gespeicherten Sessions.")
return
for f in json_files:
with open(f) as fp:
sess = json.load(fp)
date = sess.get("date", f.stem)
step = sess.get("step", "?")
status = "✅ Fertig" if step == "done" else f"{STEP_LABELS.get(step, step)}"
with st.expander(f"**{date}** — {status}"):
if date in pub_files:
pub_text = pub_files[date].read_text(encoding="utf-8")
st.markdown(pub_text)
st.download_button(
"⬇ Herunterladen",
data=pub_text,
file_name=f"KNIEPUNKT_{date}_publikation.md",
mime="text/markdown",
key=f"dl_{date}",
)
else:
st.caption("Kein Publikationspaket für diese Session vorhanden.")
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
st.set_page_config(
page_title="KNIEPUNKT Assistant",
page_icon="📰",
layout="wide",
)
if "step" not in st.session_state:
st.session_state.step = "start"
if "session" not in st.session_state:
st.session_state.session = {}
with st.sidebar:
st.markdown("## 📰 KNIEPUNKT")
st.caption("Redaktioneller Workflow")
st.divider()
current = st.session_state.step
for step, label in STEP_LABELS.items():
if step == current:
st.markdown(f"**→ {label}**")
elif current in STEPS and STEPS.index(step) < STEPS.index(current):
st.markdown(f"{label}")
else:
st.markdown(f"&nbsp;&nbsp;{label}", unsafe_allow_html=True)
st.divider()
if st.button("📚 Frühere Episoden", use_container_width=True):
st.session_state.step = "history"
st.rerun()
if st.button("🏠 Startseite", use_container_width=True):
st.session_state.step = "start"
st.rerun()
st.divider()
st.caption("Aktive Modelle:")
try:
for p in _get_providers():
st.caption(f"{p.name}")
except Exception:
st.caption("Nicht geladen")
screens = {
"start": screen_start,
"research": screen_research,
"sources": screen_sources,
"storyline": screen_storyline,
"draft": screen_draft,
"quality": screen_quality,
"publication": screen_publication,
"history": screen_history,
}
screens.get(st.session_state.step, screen_start)()
if __name__ == "__main__":
main()