kaxil commented on code in PR #68037:
URL: https://github.com/apache/airflow/pull/68037#discussion_r3359399335


##########
providers/common/ai/src/airflow/providers/common/ai/example_dags/example_aip_progress_tracker.py:
##########
@@ -342,3 +904,250 @@ def collect_analyses(analyses: list) -> str:
 # [END example_aip_progress_tracker]
 
 example_aip_progress_tracker()
+
+
+# ===========================================================================
+# DAG 2: Agent-based AIP tracker (AgentOperator + AgentSkillsToolset)
+#
+# Same use case, different architecture.  Instead of a 12-task deterministic
+# pipeline, a single AgentOperator with the aip-tracker skill loaded via
+# AgentSkillsToolset.  The agent discovers the skill's grounding rules and
+# calls custom tools to gather evidence from Confluence and GitHub.
+# ===========================================================================
+
+SKILLS_DIR = str(Path(__file__).parent / "skills")
+
+# ---------------------------------------------------------------------------
+# Tool functions the agent can call to gather evidence.
+# These are plain Python functions -- the agent sees their docstrings and
+# decides when and how to call them based on the skill instructions.
+# ---------------------------------------------------------------------------
+
+
+def _github_headers() -> dict[str, str]:

Review Comment:
   `_github_headers` is also defined at module level up at line 242. Python 
keeps this second definition, so the pipeline DAG's `_github_api_get` (line 
254) ends up calling this one, not the line 242 version. The two differ: line 
242 uses `Authorization: Bearer {token}` and no User-Agent, this one uses 
`token {token}` plus a User-Agent. Both auth schemes work so nothing breaks, 
but the line 242 definition is now dead code, and editing it would have no 
effect. Worth collapsing to a single helper, or renaming one if the two are 
meant to be distinct.



##########
providers/common/ai/src/airflow/providers/common/ai/example_dags/example_aip_progress_tracker.py:
##########
@@ -312,28 +719,183 @@ def collect_analyses(analyses: list) -> str:
         system_prompt=SYNTHESIS_SYSTEM_PROMPT,
         prompt="""\
 Create a cross-AIP progress report from these individual assessments.
-Prioritize AIPs that are close to completion or have shared blockers.
+Prioritize AIPs that are close to completion or have shared dependencies.
+Use only the data below -- do not add external information.
 
-{{ ti.xcom_pull(task_ids='collect_analyses') }}""",
+{{ ti.xcom_pull(task_ids='format_report') }}""",
+        agent_params={"model_settings": {"temperature": 0}},
         usage_limits=UsageLimits(
             request_limit=5,
             input_tokens_limit=20_000,
             output_tokens_limit=4_000,
         ),
     )
     # [END aip_tracker_synthesis]
-    collected >> synthesize
+    formatted >> synthesize
+
+    # ------------------------------------------------------------------
+    # Step 9: AI-powered hallucination validation.
+    # A second LLM checks every claim in the synthesized report against
+    # the raw per-AIP evidence.  Its only job is to judge and propose
+    # corrections -- a separate deterministic step applies them.
+    # ------------------------------------------------------------------
+    # [START aip_tracker_validation]
+    validate = LLMOperator(
+        task_id="validate_report",
+        llm_conn_id=LLM_CONN_ID,
+        system_prompt=VALIDATION_SYSTEM_PROMPT,
+        prompt="""\
+Verify the following synthesized report against the raw per-AIP evidence.
+Flag any claims not grounded in the evidence.
+
+=== SYNTHESIZED REPORT ===
+{{ ti.xcom_pull(task_ids='synthesize_report') }}
+
+=== RAW PER-AIP EVIDENCE ===
+{{ ti.xcom_pull(task_ids='format_report') }}""",
+        output_type=ValidationResult,
+        serialize_output=True,
+        usage_limits=UsageLimits(
+            request_limit=5,
+            input_tokens_limit=30_000,
+            output_tokens_limit=8_000,
+        ),
+        agent_params={"model_settings": {"temperature": 0}},
+    )
+    # [END aip_tracker_validation]
+    synthesize >> validate
 
     # ------------------------------------------------------------------
-    # Step 7: A maintainer reviews the synthesized report before it is
-    # shared on the dev list.  The Dag pauses here until the human
-    # approves, requests changes, or the timeout expires.
+    # Step 10: Apply validation corrections deterministically.
+    # No LLM involved -- this is a mechanical find-and-replace using
+    # the validator's claim/correction pairs.  Ensures every flagged
+    # issue is actually fixed in the final report.
+    # ------------------------------------------------------------------
+    @task
+    def apply_validation(report: str, validation: dict, analyses: list[dict]) 
-> dict:
+        import re

Review Comment:
   `re` is already imported at the top of the file (line 82), so this inline 
import is redundant. Since example DAGs tend to get copied as starting points, 
probably worth dropping it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to