This is an automated email from the ASF dual-hosted git repository. vikramkoka pushed a commit to branch aip99_langchain_example in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d5c0ac35d525f799a32e37ff7c19ee295309b84d Author: Vikram Koka <[email protected]> AuthorDate: Tue Jun 2 18:44:11 2026 -0700 Fix XCom deserialization of Pydantic models in LangChain 10-K example DAG Pydantic models defined in DAG files (SubQuestion, AnalysisReport) are not in Airflow's allowed_deserialization_classes, so passing them through XCom between tasks fails with an ImportError. Convert structured output to plain dicts at the task boundary via model_dump() so downstream tasks can deserialize safely. --- .../ai/example_dags/example_langchain_10k.py | 36 ++++++++++++---------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_langchain_10k.py b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_langchain_10k.py index e807fe47670..94e3582a6ae 100644 --- a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_langchain_10k.py +++ b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_langchain_10k.py @@ -461,8 +461,8 @@ def example_langchain_10k_analysis(): # [END 10k_langchain_decompose] @task - def extract_sub_questions(decomposed: DecomposedQuestion) -> list[SubQuestion]: - return decomposed.sub_questions + def extract_sub_questions(decomposed: DecomposedQuestion) -> list[dict]: + return [sq.model_dump() for sq in decomposed.sub_questions] sub_questions = extract_sub_questions(decomposed) @@ -471,12 +471,12 @@ def example_langchain_10k_analysis(): # Each sub-question targets a specific company's FAISS index. # ------------------------------------------------------------------ @task - def build_retrieval_kwargs(sub_questions: list[SubQuestion]) -> list[dict]: + def build_retrieval_kwargs(sub_questions: list[dict]) -> list[dict]: return [ { - "query": sq.sub_question, - "ticker": sq.ticker, - "index_dir": f"{INDEX_BASE_DIR}/{sq.ticker.lower()}", + "query": sq["sub_question"], + "ticker": sq["ticker"], + "index_dir": f"{INDEX_BASE_DIR}/{sq['ticker'].lower()}", } for sq in sub_questions ] @@ -523,13 +523,13 @@ def example_langchain_10k_analysis(): # Step 5: Collect all retrieval results into a single context. # ------------------------------------------------------------------ @task - def collect_results(sub_questions: list[SubQuestion], results: list[dict]) -> str: + def collect_results(sub_questions: list[dict], results: list[dict]) -> str: sections = [] for sq, r in zip(sub_questions, results): chunks_text = "\n".join( f" [{i + 1}] (score {c['score']:.2f}) {c['text']}" for i, c in enumerate(r["chunks"]) ) - sections.append(f"## {sq.ticker} -- {sq.sub_question}\n{chunks_text}") + sections.append(f"## {sq['ticker']} -- {sq['sub_question']}\n{chunks_text}") return "\n\n".join(sections) collected = collect_results(sub_questions, retrieval_results) @@ -561,30 +561,32 @@ Cite specific data points and scores. # ------------------------------------------------------------------ # Step 7: Format the structured report into readable text for the - # human reviewer. The LLM produced an AnalysisReport instance (via - # output_type=AnalysisReport); this task renders it as clean prose. + # human reviewer. The LLM produced a dict (via output_type= + # AnalysisReport); this task renders it as clean prose. # ------------------------------------------------------------------ @task def format_report(report: AnalysisReport) -> str: - lines = [f"# Executive Summary\n\n{report.executive_summary}"] + if hasattr(report, "model_dump"): + report = report.model_dump() + lines = [f"# Executive Summary\n\n{report['executive_summary']}"] - if report.company_findings: + if report.get("company_findings"): lines.append("\n# Company Findings") - for finding in report.company_findings: + for finding in report["company_findings"]: company = finding.get("company") or finding.get("ticker", "Unknown") lines.append(f"\n## {company}") for key, value in finding.items(): if key not in ("company", "ticker"): lines.append(f"- **{key}**: {value}") - if report.key_risks: + if report.get("key_risks"): lines.append("\n# Key Risks") - for risk in report.key_risks: + for risk in report["key_risks"]: lines.append(f"- {risk}") - if report.recommendations: + if report.get("recommendations"): lines.append("\n# Recommendations") - for rec in report.recommendations: + for rec in report["recommendations"]: lines.append(f"- {rec}") return "\n".join(lines)
