This is an automated email from the ASF dual-hosted git repository.

vikramkoka pushed a commit to branch aip99-example-aiptracker
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ce2f14b3684b3d0c4cdbeafa558bca327eb25306
Author: Vikram Koka <[email protected]>
AuthorDate: Thu Jun 4 13:24:50 2026 -0700

    Improve AIP progress tracker example DAG to produce accurate, 
evidence-backed results
    
    The example DAG was producing hallucinated output -- fabricated completion
    percentages, invented blockers, and missed shipped work -- because the
    evidence pipeline was too thin and the prompts too permissive.
    
    Key changes:
    - Add AIP registry with Confluence page IDs, GitHub search aliases, and
      codebase directory paths for multi-strategy evidence gathering
    - Fetch GitHub file tree (Git Trees API) for codebase-level evidence
    - Replace flat 3000-char spec truncation with section-aware parsing
    - Replace completion_pct/blockers Pydantic model with per-deliverable
      DeliverableStatus (name, status, evidence, confidence)
    - Add grounding rules to analysis/synthesis/validation system prompts
    - Add three-layer quality pipeline: AI validation (LLMOperator) identifies
      ungrounded claims, deterministic apply_validation task does mechanical
      find-and-replace, human reviews the corrected report
    - Add arithmetic validation that cross-checks X/Y fractions against
      structured analysis data (catches validator-introduced errors)
    - Set temperature=0 on all LLM calls for run-to-run consistency
---
 .../example_dags/example_aip_progress_tracker.py   | 786 +++++++++++++++++----
 1 file changed, 667 insertions(+), 119 deletions(-)

diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_aip_progress_tracker.py
 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_aip_progress_tracker.py
index 706a60cb9aa..5b48b625673 100644
--- 
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_aip_progress_tracker.py
+++ 
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_aip_progress_tracker.py
@@ -18,25 +18,30 @@
 AIP progress tracker -- multi-source data fusion with common.ai operators.
 
 Demonstrates Dynamic Task Mapping, structured LLM output, cost-controlled
-synthesis, and HITL approval using only ``LLMOperator`` -- no LlamaIndex or
-LangChain dependency required.
+synthesis, AI-powered hallucination validation, and HITL approval using only
+``LLMOperator`` -- no LlamaIndex or LangChain dependency required.
 
-For each active Airflow Improvement Proposal the Dag gathers evidence from
-two sources (Confluence spec text, GitHub PRs and commits), asks an LLM to
-assess spec-vs-implementation progress, then synthesizes a cross-AIP report
-for maintainer review.
+For each active Airflow Improvement Proposal the DAG gathers evidence from
+three sources (Confluence spec text, GitHub PRs/commits, and the GitHub
+file tree), asks an LLM to assess spec-vs-implementation progress per
+deliverable, validates the synthesized report for hallucinations, then
+presents it for maintainer review.
 
 ``example_aip_progress_tracker`` (manual trigger):
 
 .. code-block:: text
 
-    fetch_aip_list (@task)
-        → gather_aip_evidence  (@task, mapped ×N AIPs)
-        → format_analysis_prompt (@task, mapped ×N)
-        → analyze_aip          (LLMOperator, mapped ×N)
-        → collect_analyses     (@task)
-        → synthesize_report    (LLMOperator, with UsageLimits)
-        → review_report        (ApprovalOperator)
+    fetch_aip_list  (@task)  ─┐
+                               ├─> gather_aip_evidence  (@task, mapped ×N AIPs)
+    fetch_repo_tree (@task)  ─┘    → format_analysis_prompt (@task, mapped ×N)
+                                   → analyze_aip           (LLMOperator, 
mapped ×N)
+                                   → collect_analyses      (@task)
+                                   → format_report         (@task)
+                                   → synthesize_report     (LLMOperator, 
UsageLimits)
+                                   → validate_report       (LLMOperator, 
hallucination check)
+                                   → apply_validation      (@task, 
deterministic corrections)
+                                   → build_review_body     (@task)
+                                   → review_report         (ApprovalOperator)
 
 **What this makes visible that a notebook hides:**
 
@@ -45,23 +50,30 @@ for maintainer review.
 * If the GitHub API is rate-limited for one AIP, only that mapped
   instance retries; the others preserve their XCom results.
 * The synthesis step's inputs and token budget are fully auditable.
-* A maintainer reviews the report before it goes to the dev list.
+* An AI validation step checks for hallucinations, then a deterministic
+  step applies the corrections -- no LLM involved in the fix.
+* A maintainer reviews the corrected report before it goes to the dev list.
 
 Before running:
 
 1. Create an LLM connection named ``pydanticai_default`` (or the value of
    ``LLM_CONN_ID``) for your chosen model provider.
-2. Trigger the DAG with the default ``aip_numbers`` param or edit it to
+2. Optionally set a ``GITHUB_TOKEN`` environment variable for higher API
+   rate limits (unauthenticated: 10 req/min; authenticated: 5,000 req/hr).
+3. Trigger the DAG with the default ``aip_numbers`` param or edit it to
    choose which AIPs to investigate.
 """
 
 from __future__ import annotations
 
 import json
+import os
 import re
+import time
 import urllib.parse
 import urllib.request
 from datetime import timedelta
+from typing import Literal
 
 from pydantic import BaseModel
 from pydantic_ai.usage import UsageLimits
@@ -76,40 +88,170 @@ from airflow.sdk import Param
 # ---------------------------------------------------------------------------
 
 LLM_CONN_ID = "pydanticai_default"
-
-# Confluence wiki -- public REST API, no auth required.
 CONFLUENCE_BASE_URL = "https://cwiki.apache.org/confluence";
-AIP_LISTING_PAGE_ID = "89066602"  # ancestor filter for CQL queries
 GITHUB_REPO = "apache/airflow"
+GITHUB_API_DELAY = 7  # seconds between unauthenticated GitHub API calls
 DEFAULT_AIP_NUMBERS = "76,99,103,105,108"
 
 # ---------------------------------------------------------------------------
-# Structured output model -- enforces a schema on the per-AIP LLM response
+# AIP Registry -- page IDs, search aliases, and codebase paths
+#
+# Each entry enables multi-strategy evidence gathering:
+# - page_id: direct Confluence fetch (no CQL search needed)
+# - search_terms: additional GitHub search keywords beyond "AIP-{N}"
+# - codebase_paths: directory prefixes to look for in the GitHub file tree
+# ---------------------------------------------------------------------------
+
+# [START aip_registry]
+AIP_REGISTRY: dict[int, dict] = {
+    76: {
+        "page_id": "311626969",
+        "topic": "Asset Partitions",
+        "search_terms": ["asset partition", "PartitionMapper", 
"PartitionedAsset"],
+        "codebase_paths": [
+            "airflow-core/src/airflow/models/asset.py",
+            "task-sdk/src/airflow/sdk/definitions/partition_mappers",
+            "task-sdk/src/airflow/sdk/definitions/timetables/assets.py",
+            "airflow-core/src/airflow/migrations/versions/0095_",
+            "airflow-core/src/airflow/migrations/versions/0106_",
+            "airflow-core/src/airflow/migrations/versions/0107_",
+        ],
+    },
+    99: {
+        "page_id": "406618285",
+        "topic": "Common AI Operators",
+        "search_terms": ["LLMOperator", "AgentOperator", "common.ai", 
"LangChainHook"],
+        "codebase_paths": [
+            "providers/common/ai/src/airflow/providers/common/ai/operators",
+            "providers/common/ai/src/airflow/providers/common/ai/toolsets",
+            "providers/common/ai/src/airflow/providers/common/ai/hooks",
+            "providers/common/ai/src/airflow/providers/common/ai/decorators",
+            "providers/common/ai/src/airflow/providers/common/ai/durable",
+            "providers/common/ai/src/airflow/providers/common/ai/example_dags",
+        ],
+    },
+    103: {
+        "page_id": "406623137",
+        "topic": "Task State Management",
+        "search_terms": ["task_store", "asset_store", "state_store", 
"TaskStoreAccessor"],
+        "codebase_paths": [
+            "airflow-core/src/airflow/models/task_store.py",
+            "airflow-core/src/airflow/models/asset_store.py",
+            "airflow-core/src/airflow/state",
+            "shared/state/src/airflow_shared/state",
+            
"airflow-core/src/airflow/api_fastapi/execution_api/routes/task_store.py",
+            
"airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_store.py",
+        ],
+    },
+    105: {
+        "page_id": "421955342",
+        "topic": "Pluggable Retry Policies",
+        "search_terms": ["RetryPolicy", "retry_policy", 
"ExceptionRetryPolicy"],
+        "codebase_paths": [
+            "task-sdk/src/airflow/sdk/definitions/retry_policy.py",
+            "airflow-core/src/airflow/migrations/versions/0113_",
+        ],
+    },
+    108: {
+        "page_id": "421957285",
+        "topic": "Language Task SDK + Coordinator",
+        "search_terms": ["coordinator", "Go-SDK", "Go SDK", "Java SDK", 
"SubprocessCoordinator"],
+        "codebase_paths": [
+            "task-sdk/src/airflow/sdk/coordinators",
+            "task-sdk/src/airflow/sdk/execution_time/coordinator.py",
+            "go-sdk",
+            "java-sdk",
+        ],
+    },
+}
+# [END aip_registry]
+
+# ---------------------------------------------------------------------------
+# Structured output models
 # ---------------------------------------------------------------------------
 
 # [START aip_tracker_structured_output]
 
 
+class DeliverableStatus(BaseModel):
+    """Status of a single deliverable from the AIP spec."""
+
+    name: str
+    status: Literal["shipped", "in_progress", "not_started", "beyond_spec", 
"unclear"]
+    evidence: str
+    confidence: Literal["high", "medium", "low"]
+
+
 class AIPStatus(BaseModel):
     """Per-AIP analysis produced by the LLM."""
 
     aip_number: int
     title: str
     spec_summary: str
-    implementation_status: str
+    deliverables: list[DeliverableStatus]
+    shipped_count: int
+    total_count: int
     key_prs: list[str]
-    blockers: list[str]
-    next_steps: list[str]
-    completion_pct: int
+    confluence_update_needed: bool
+    notes: str
 
 
 # [END aip_tracker_structured_output]
 
+# [START aip_tracker_validation_output]
+
+
+class ClaimValidation(BaseModel):
+    """A single claim from the report checked against evidence."""
+
+    claim: str
+    grounded: bool
+    evidence_found: str
+    correction: str
+
+
+class ValidationResult(BaseModel):
+    """Result of the AI hallucination validation step."""
+
+    overall_verdict: Literal["pass", "pass_with_warnings", "fail"]
+    ungrounded_claims: list[ClaimValidation]
+    hallucination_risk: Literal["low", "medium", "high"]
+
+
+# [END aip_tracker_validation_output]
+
 # ---------------------------------------------------------------------------
 # HTTP helpers
 # ---------------------------------------------------------------------------
 
 
+def _github_headers() -> dict[str, str]:
+    """Build GitHub API headers, using a token if available."""
+    headers = {"Accept": "application/vnd.github.v3+json"}
+    token = os.environ.get("GITHUB_TOKEN")
+    if token:
+        headers["Authorization"] = f"Bearer {token}"
+    return headers
+
+
+def _github_api_get(path: str, *, delay: bool = True) -> dict:
+    """GET a GitHub REST API endpoint with rate-limit awareness."""
+    url = f"https://api.github.com{path}";
+    req = urllib.request.Request(url, headers=_github_headers())
+    try:
+        with urllib.request.urlopen(req, timeout=30) as resp:
+            result = json.loads(resp.read())
+            remaining = resp.headers.get("X-RateLimit-Remaining")
+            if delay and not os.environ.get("GITHUB_TOKEN") and remaining:
+                if int(remaining) < 5:
+                    time.sleep(GITHUB_API_DELAY)
+            return result
+    except urllib.error.HTTPError as e:
+        if e.code == 403:
+            return {}
+        raise
+
+
 def _confluence_rest_get(path: str) -> dict:
     """GET a Confluence REST API endpoint (public, no auth required)."""
     url = f"{CONFLUENCE_BASE_URL}{path}"
@@ -118,40 +260,109 @@ def _confluence_rest_get(path: str) -> dict:
         return json.loads(resp.read())
 
 
-def _github_api_get(path: str) -> dict:
-    """GET a GitHub REST API endpoint (public, rate-limited to 10 req/min)."""
-    url = f"https://api.github.com{path}";
-    req = urllib.request.Request(url, headers={"Accept": 
"application/vnd.github.v3+json"})
-    with urllib.request.urlopen(req, timeout=30) as resp:
-        return json.loads(resp.read())
-
-
 def _strip_html_tags(html: str) -> str:
     """Remove HTML/Confluence markup, returning plain text."""
     text = re.sub(r"<[^>]+>", " ", html)
     return re.sub(r"\s+", " ", text).strip()
 
 
+def _extract_spec_sections(html: str) -> dict:
+    """Parse Confluence HTML into structured sections by heading."""
+    sections: dict[str, str] = {}
+    current_heading = "introduction"
+    current_text: list[str] = []
+
+    for part in re.split(r"(<h[1-4][^>]*>.*?</h[1-4]>)", html, 
flags=re.IGNORECASE | re.DOTALL):
+        heading_match = re.match(r"<h[1-4][^>]*>(.*?)</h[1-4]>", part, 
re.IGNORECASE | re.DOTALL)
+        if heading_match:
+            if current_text:
+                sections[current_heading] = _strip_html_tags(" 
".join(current_text))
+            current_heading = 
_strip_html_tags(heading_match.group(1)).lower().strip()
+            current_text = []
+        else:
+            current_text.append(part)
+
+    if current_text:
+        sections[current_heading] = _strip_html_tags(" ".join(current_text))
+
+    return sections
+
+
 # ---------------------------------------------------------------------------
 # System prompts
 # ---------------------------------------------------------------------------
 
 ANALYSIS_SYSTEM_PROMPT = """\
-You are an Airflow project analyst. Given an AIP specification and its \
-GitHub evidence (pull requests and commits), produce a structured status \
-assessment.
-
-Be specific about what has been implemented versus what remains. Rate \
-completion percentage based on the ratio of spec goals that have \
-corresponding PRs or commits."""
+You are an Airflow project analyst assessing AIP implementation progress.
+
+DELIVERABLE EXTRACTION:
+Extract deliverables from the specification's own structure. Use these \
+sources in priority order:
+1. Numbered completion criteria (e.g. "Definition of Done", "Completion \
+Criteria") -- each numbered item is one deliverable.
+2. Phase definitions -- each bullet or item under a phase heading is one \
+deliverable.
+3. Explicitly enumerated components (classes, operators, API endpoints, \
+CLI commands, UI features) listed in the spec.
+Do NOT split a single spec item into multiple deliverables. Do NOT merge \
+multiple spec items into one. Use the spec's own granularity.
+
+ASSESSMENT RULES:
+1. For each deliverable, you MUST cite specific evidence (a PR number, commit \
+message, or file path from the provided data). If no evidence exists, set \
+status to "not_started" or "unclear" and confidence to "low".
+2. Do NOT guess completion percentages. Instead, count shipped vs total \
+deliverables. shipped_count and total_count must match the deliverables list.
+3. Do NOT invent blockers. Use the notes field for genuine uncertainties only.
+4. If codebase evidence shows shipped work NOT mentioned in the spec, add \
+those as deliverables with status "beyond_spec" and set \
+confluence_update_needed to true.
+5. PR numbers must come from the Pull Requests section of the input. Do not \
+invent PR numbers."""
 
 SYNTHESIS_SYSTEM_PROMPT = """\
 You are an Airflow release coordinator. Given individual AIP status \
 assessments, produce a concise cross-AIP progress report.
 
-Identify the top priorities, shared blockers across AIPs, and recommend \
-where maintainer attention is most needed. Keep the report actionable \
-and under 500 words."""
+RULES:
+1. Use ONLY the data from the individual assessments. Do not add information \
+not present in the inputs.
+2. Always write progress as "X/Y deliverables shipped" (e.g. "8/14 shipped"). \
+NEVER convert to percentages. Do not write "57%" or "90%" or any other \
+percentage. The fraction form is the only acceptable format.
+3. Identify AIPs where confluence_update_needed is true.
+4. Flag deliverables with confidence="low" as needing manual verification.
+5. Do NOT characterize AIPs as "near completion" or "minimal blockers" unless \
+the evidence explicitly supports that. Use the fraction (e.g. "9/10 shipped, \
+1 in progress") and let the reader draw conclusions.
+6. Keep the report actionable and under 500 words."""
+
+VALIDATION_SYSTEM_PROMPT = """\
+You are a fact-checker for an AIP progress report. You receive two inputs:
+1. A synthesized cross-AIP progress report
+2. The raw per-AIP evidence that the report was derived from
+
+Your ONLY job is to verify claims. A separate downstream step applies your \
+corrections, so each correction must be a self-contained replacement string \
+that can be substituted for the original claim text.
+
+RULES:
+1. Any deliverable status, PR number, shipped count, or recommendation in \
+the report must have a corresponding entry in the raw evidence.
+2. If a claim has no supporting evidence, set grounded=false and provide a \
+correction. The "claim" field must contain the EXACT text from the report \
+(so it can be found by string search). The "correction" field must contain \
+the replacement text, or "REMOVE" if the claim should be deleted entirely.
+3. Flag invented blockers, fabricated statistics, and PR numbers not in the \
+evidence.
+4. Flag any percentages (e.g. "57%", "90%") as ungrounded. Progress must be \
+expressed as fractions ("8/14 shipped"), never as percentages.
+5. Flag vague characterizations ("near completion", "minimal blockers", \
+"requires foundational work") that editorialize beyond the evidence. Provide \
+a factual replacement using data from the evidence.
+6. Set overall_verdict to "fail" if any high-confidence claims are ungrounded, 
\
+"pass_with_warnings" if only low-confidence claims are flagged, "pass" if \
+all claims are grounded."""
 
 
 # ---------------------------------------------------------------------------
@@ -178,94 +389,230 @@ def example_aip_progress_tracker():
 
     Task graph::
 
-        fetch_aip_list (@task)
-            → gather_aip_evidence    (@task ×N, via Dynamic Task Mapping)
-            → format_analysis_prompt (@task ×N)
-            → analyze_aip            (LLMOperator ×N, structured output)
-            → collect_analyses       (@task)
-            → synthesize_report      (LLMOperator, with UsageLimits)
-            → review_report          (ApprovalOperator)
+        fetch_aip_list  ─┐
+                          ├─> gather_aip_evidence   (@task ×N, Dynamic Task 
Mapping)
+        fetch_repo_tree ─┘    → format_analysis_prompt (@task ×N)
+                              → analyze_aip            (LLMOperator ×N, 
structured output)
+                              → collect_analyses       (@task)
+                              → format_report          (@task)
+                              → synthesize_report      (LLMOperator, with 
UsageLimits)
+                              → validate_report        (LLMOperator, 
hallucination check)
+                              → apply_validation       (@task, deterministic 
corrections)
+                              → build_review_body      (@task)
+                              → review_report          (ApprovalOperator)
     """
 
     # ------------------------------------------------------------------
-    # Step 1: Fetch the list of active AIPs to investigate.
-    # The length of this list determines how many mapped instances are
-    # created in the downstream steps -- N is decided at runtime.
+    # Step 1: Build the AIP list from the registry, using Confluence page
+    # IDs for direct spec fetching (no CQL search needed).
     # ------------------------------------------------------------------
     @task
     def fetch_aip_list(params: dict) -> list[dict]:
         aip_numbers = [int(n.strip()) for n in 
params["aip_numbers"].split(",") if n.strip()]
         aips = []
         for num in aip_numbers:
-            cql = urllib.parse.quote(
-                f'space="AIRFLOW" AND title~"AIP-{num}" AND 
ancestor={AIP_LISTING_PAGE_ID}'
-            )
-            results = 
_confluence_rest_get(f"/rest/api/content/search?cql={cql}&limit=1")
-            if results.get("results"):
-                title = results["results"][0]["title"]
+            registry_entry = AIP_REGISTRY.get(num)
+            if registry_entry:
+                aips.append(
+                    {
+                        "aip_number": num,
+                        "title": f"AIP-{num}: {registry_entry['topic']}",
+                        "page_id": registry_entry["page_id"],
+                        "search_terms": registry_entry["search_terms"],
+                        "codebase_paths": registry_entry["codebase_paths"],
+                    }
+                )
             else:
-                title = f"AIP-{num}"
-            aips.append({"aip_number": num, "title": title})
+                aips.append(
+                    {
+                        "aip_number": num,
+                        "title": f"AIP-{num}",
+                        "page_id": None,
+                        "search_terms": [],
+                        "codebase_paths": [],
+                    }
+                )
         return aips
 
     aip_list = fetch_aip_list()
 
     # ------------------------------------------------------------------
-    # Step 2: Gather evidence for each AIP from multiple sources.
-    # Each mapped instance fetches one AIP's spec text from the
-    # Confluence wiki (cwiki.apache.org) and searches GitHub for
-    # related PRs and commits.  If the GitHub API is rate-limited
-    # for one AIP, only that instance retries.
+    # Step 2: Fetch the repo file tree from GitHub once, shared across
+    # all mapped AIP tasks.  Falls back to per-path directory listings
+    # if the tree is truncated (common for large repos).
     # ------------------------------------------------------------------
     @task
-    def gather_aip_evidence(aip: dict) -> dict:
+    def fetch_repo_tree() -> list[str]:
+        data = 
_github_api_get(f"/repos/{GITHUB_REPO}/git/trees/main?recursive=1", delay=False)
+        if not data:
+            return []
+
+        if not data.get("truncated"):
+            return [item["path"] for item in data.get("tree", []) if 
item["type"] == "blob"]
+
+        # Tree was truncated -- fetch directory listings for known paths
+        known_dirs: set[str] = set()
+        for entry in AIP_REGISTRY.values():
+            for p in entry["codebase_paths"]:
+                parts = p.rstrip("/").split("/")
+                if len(parts) >= 2:
+                    known_dirs.add("/".join(parts[:3]))
+
+        all_files: list[str] = []
+        for dir_path in sorted(known_dirs):
+            contents = 
_github_api_get(f"/repos/{GITHUB_REPO}/contents/{dir_path}")
+            if isinstance(contents, list):
+                for item in contents:
+                    if item["type"] == "file":
+                        all_files.append(item["path"])
+                    elif item["type"] == "dir":
+                        sub = 
_github_api_get(f"/repos/{GITHUB_REPO}/contents/{item['path']}")
+                        if isinstance(sub, list):
+                            all_files.extend(s["path"] for s in sub if 
s["type"] == "file")
+        return all_files
+
+    repo_tree = fetch_repo_tree()
+
+    # ------------------------------------------------------------------
+    # Step 3: Gather evidence from Confluence, GitHub, and the file tree.
+    # Each mapped instance fetches one AIP's data.  Multi-strategy GitHub
+    # search uses both the AIP tag and topic-specific keywords.
+    # ------------------------------------------------------------------
+    @task
+    def gather_aip_evidence(aip: dict, repo_tree: list[str]) -> dict:
         aip_number = aip["aip_number"]
-        cql = urllib.parse.quote(
-            f'space="AIRFLOW" AND title~"AIP-{aip_number}" AND 
ancestor={AIP_LISTING_PAGE_ID}'
-        )
-        results = 
_confluence_rest_get(f"/rest/api/content/search?cql={cql}&expand=body.view&limit=1")
-        spec_text = ""
-        if results.get("results"):
-            raw_html = results["results"][0]["body"]["view"]["value"]
-            spec_text = _strip_html_tags(raw_html)[:3000]
-        pr_query = urllib.parse.quote(f"AIP-{aip_number} repo:{GITHUB_REPO} 
is:pr")
-        pr_data = _github_api_get(f"/search/issues?q={pr_query}&per_page=10")
-        prs = [f"#{it['number']} -- {it['title']}" for it in 
pr_data.get("items", [])]
-        commit_query = urllib.parse.quote(f"AIP-{aip_number} 
repo:{GITHUB_REPO}")
-        commit_data = 
_github_api_get(f"/search/commits?q={commit_query}&per_page=10")
-        commits = [it["commit"]["message"].split("\n")[0] for it in 
commit_data.get("items", [])]
+
+        # --- Confluence spec ---
+        spec_sections: dict[str, str] = {}
+        last_modified = ""
+        if aip.get("page_id"):
+            page = 
_confluence_rest_get(f"/rest/api/content/{aip['page_id']}?expand=body.storage,version")
+            raw_html = page.get("body", {}).get("storage", {}).get("value", "")
+            spec_sections = _extract_spec_sections(raw_html)
+            version_info = page.get("version", {})
+            last_modified = version_info.get("when", "")
+        else:
+            cql = urllib.parse.quote(f'space="AIRFLOW" AND 
title~"AIP-{aip_number}"')
+            results = 
_confluence_rest_get(f"/rest/api/content/search?cql={cql}&expand=body.storage&limit=1")
+            if results.get("results"):
+                raw_html = results["results"][0].get("body", 
{}).get("storage", {}).get("value", "")
+                spec_sections = _extract_spec_sections(raw_html)
+
+        # Build spec text with section budgets
+        spec_parts = []
+        for heading in (
+            "status",
+            "deliverables",
+            "milestones",
+            "phases",
+            "scope",
+            "completion criteria",
+            "definition of done",
+        ):
+            for key, text in spec_sections.items():
+                if heading in key:
+                    spec_parts.append(f"[{key}]: {text[:4000]}")
+                    break
+
+        remaining_budget = 8000 - sum(len(p) for p in spec_parts)
+        for key, text in spec_sections.items():
+            if not any(key in p for p in spec_parts):
+                chunk = text[: min(1000, max(200, remaining_budget))]
+                spec_parts.append(f"[{key}]: {chunk}")
+                remaining_budget -= len(chunk)
+                if remaining_budget <= 0:
+                    break
+
+        # --- GitHub: multi-strategy PR and commit search ---
+        seen_pr_numbers: set[int] = set()
+        prs: list[str] = []
+        seen_commit_shas: set[str] = set()
+        commits: list[str] = []
+
+        search_queries = [f"AIP-{aip_number}"]
+        for term in aip.get("search_terms", [])[:2]:
+            search_queries.append(term)
+
+        for query_term in search_queries:
+            # PR search
+            pr_query = urllib.parse.quote(f"{query_term} repo:{GITHUB_REPO} 
is:pr")
+            pr_data = 
_github_api_get(f"/search/issues?q={pr_query}&per_page=10")
+            for item in pr_data.get("items", []):
+                if item["number"] not in seen_pr_numbers:
+                    seen_pr_numbers.add(item["number"])
+                    state = "merged" if item.get("pull_request", 
{}).get("merged_at") else item["state"]
+                    prs.append(f"#{item['number']} ({state}) -- 
{item['title']}")
+
+            time.sleep(GITHUB_API_DELAY if not os.environ.get("GITHUB_TOKEN") 
else 1)
+
+            # Commit search
+            commit_query = urllib.parse.quote(f"{query_term} 
repo:{GITHUB_REPO}")
+            commit_data = 
_github_api_get(f"/search/commits?q={commit_query}&per_page=10")
+            for item in commit_data.get("items", []):
+                sha = item["sha"][:7]
+                if sha not in seen_commit_shas:
+                    seen_commit_shas.add(sha)
+                    commits.append(f"{sha} -- 
{item['commit']['message'].split(chr(10))[0]}")
+
+            time.sleep(GITHUB_API_DELAY if not os.environ.get("GITHUB_TOKEN") 
else 1)
+
+        # --- File tree evidence ---
+        file_evidence: list[str] = []
+        for path_prefix in aip.get("codebase_paths", []):
+            matching = [f for f in repo_tree if f.startswith(path_prefix)]
+            if matching:
+                test_files = [f for f in matching if "/test_" in f or 
"/tests/" in f]
+                file_evidence.append(
+                    f"{path_prefix}: {len(matching)} files"
+                    + (f" ({len(test_files)} test files)" if test_files else 
"")
+                )
+                for f in matching[:5]:
+                    file_evidence.append(f"  - {f}")
+                if len(matching) > 5:
+                    file_evidence.append(f"  ... and {len(matching) - 5} more")
+
         return {
             "aip_number": aip_number,
             "title": aip["title"],
-            "spec_text": spec_text,
+            "last_modified": last_modified,
+            "spec_sections": spec_parts,
             "prs": prs,
             "commits": commits,
+            "file_evidence": file_evidence,
         }
 
-    evidence = gather_aip_evidence.expand(aip=aip_list)
+    evidence = 
gather_aip_evidence.partial(repo_tree=repo_tree).expand(aip=aip_list)
 
     # ------------------------------------------------------------------
-    # Step 3: Format the gathered evidence into an LLM analysis prompt.
-    # Separating formatting from data gathering keeps each task focused
-    # and makes prompt iteration independent of API logic.
+    # Step 4: Format the gathered evidence into an LLM analysis prompt.
+    # Clear section headers help the LLM distinguish spec claims from
+    # implementation evidence.
     # ------------------------------------------------------------------
     @task
     def format_analysis_prompt(evidence: dict) -> str:
-        prs_text = "\n".join(f"  - {pr}" for pr in evidence["prs"])
-        commits_text = "\n".join(f"  - {c}" for c in evidence["commits"])
+        spec_text = "\n".join(evidence.get("spec_sections", [])) or "(spec not 
available)"
+        prs_text = "\n".join(f"  - {pr}" for pr in evidence["prs"]) or "  
(none found)"
+        commits_text = "\n".join(f"  - {c}" for c in evidence["commits"]) or " 
 (none found)"
+        files_text = "\n".join(evidence.get("file_evidence", [])) or "  (none 
found)"
+
+        modified = (
+            f"\nLast Confluence update: {evidence['last_modified']}" if 
evidence.get("last_modified") else ""
+        )
+
         return (
-            f"Analyze AIP-{evidence['aip_number']}: {evidence['title']}\n\n"
-            f"Specification:\n{evidence['spec_text']}\n\n"
-            f"Pull Requests:\n{prs_text}\n\n"
-            f"Recent Commits:\n{commits_text}"
+            f"Analyze {evidence['title']}{modified}\n\n"
+            f"=== SPECIFICATION (from Confluence) ===\n{spec_text}\n\n"
+            f"=== PULL REQUESTS (from GitHub, {len(evidence['prs'])} found) 
===\n{prs_text}\n\n"
+            f"=== COMMITS (from GitHub, {len(evidence['commits'])} found) 
===\n{commits_text}\n\n"
+            f"=== CODEBASE FILES (from GitHub tree) ===\n{files_text}"
         )
 
     prompts = format_analysis_prompt.expand(evidence=evidence)
 
     # ------------------------------------------------------------------
-    # Step 4: Analyze each AIP with a structured LLM call.
+    # Step 5: Analyze each AIP with a structured LLM call.
     # Dynamic Task Mapping creates one LLMOperator instance per AIP.
-    # output_type=AIPStatus enforces the Pydantic schema on the response.
+    # output_type=AIPStatus enforces the Pydantic schema.
     # ------------------------------------------------------------------
     # [START aip_tracker_dtm_analysis]
     analyses = LLMOperator.partial(
@@ -273,37 +620,83 @@ def example_aip_progress_tracker():
         llm_conn_id=LLM_CONN_ID,
         system_prompt=ANALYSIS_SYSTEM_PROMPT,
         output_type=AIPStatus,
+        serialize_output=True,
+        agent_params={"model_settings": {"temperature": 0}},
     ).expand(prompt=prompts)
     # [END aip_tracker_dtm_analysis]
 
     # ------------------------------------------------------------------
-    # Step 5: Collect all per-AIP analyses into a single context string
-    # for the synthesis step.
+    # Step 6: Collect all per-AIP analyses into a combined context.
     # ------------------------------------------------------------------
     @task
-    def collect_analyses(analyses: list) -> str:
-        sections = []
+    def collect_analyses(analyses: list) -> list[dict]:
+        result = []
         for raw in analyses:
-            a = json.loads(raw) if isinstance(raw, str) else raw
-            blockers = ", ".join(a["blockers"]) if a["blockers"] else "None 
identified"
-            next_steps = ", ".join(a["next_steps"]) if a["next_steps"] else 
"N/A"
-            sections.append(
-                f"## AIP-{a['aip_number']}: {a['title']}\n"
-                f"Status: {a['implementation_status']} "
-                f"({a['completion_pct']}% complete)\n"
-                f"Summary: {a['spec_summary']}\n"
-                f"Key PRs: {', '.join(a['key_prs'])}\n"
-                f"Blockers: {blockers}\n"
-                f"Next steps: {next_steps}"
-            )
-        return "\n\n".join(sections)
+            if hasattr(raw, "model_dump"):
+                result.append(raw.model_dump())
+            elif isinstance(raw, str):
+                result.append(json.loads(raw))
+            else:
+                result.append(raw)
+        return result
 
     collected = collect_analyses(analyses.output)
 
     # ------------------------------------------------------------------
-    # Step 6: Synthesize a cross-AIP progress report.
+    # Step 7: Format the structured analyses as readable markdown.
+    # This serves as a non-LLM audit artifact and provides cleaner
+    # input for the synthesis and validation steps.
+    # ------------------------------------------------------------------
+    @task
+    def format_report(analyses: list[dict]) -> str:
+        sections = []
+        for a in analyses:
+            shipped = [d for d in a["deliverables"] if d["status"] == 
"shipped"]
+            beyond = [d for d in a["deliverables"] if d["status"] == 
"beyond_spec"]
+            in_progress = [d for d in a["deliverables"] if d["status"] == 
"in_progress"]
+            not_started = [d for d in a["deliverables"] if d["status"] == 
"not_started"]
+            unclear = [d for d in a["deliverables"] if d["status"] == 
"unclear"]
+
+            lines = [
+                f"## AIP-{a['aip_number']}: {a['title']}",
+                f"Progress: {a['shipped_count']}/{a['total_count']} 
deliverables shipped",
+                f"Confluence update needed: {'YES' if 
a['confluence_update_needed'] else 'No'}",
+                f"\n{a['spec_summary']}",
+            ]
+            if shipped:
+                lines.append(f"\n**Shipped ({len(shipped)}):**")
+                for d in shipped:
+                    lines.append(f"  - {d['name']}: {d['evidence']} 
[{d['confidence']}]")
+            if beyond:
+                lines.append(f"\n**Beyond spec ({len(beyond)}):**")
+                for d in beyond:
+                    lines.append(f"  - {d['name']}: {d['evidence']} 
[{d['confidence']}]")
+            if in_progress:
+                lines.append(f"\n**In progress ({len(in_progress)}):**")
+                for d in in_progress:
+                    lines.append(f"  - {d['name']}: {d['evidence']} 
[{d['confidence']}]")
+            if not_started:
+                lines.append(f"\n**Not started ({len(not_started)}):**")
+                for d in not_started:
+                    lines.append(f"  - {d['name']}")
+            if unclear:
+                lines.append(f"\n**Unclear ({len(unclear)}):**")
+                for d in unclear:
+                    lines.append(f"  - {d['name']}: {d['evidence']} 
[{d['confidence']}]")
+            if a.get("notes"):
+                lines.append(f"\n**Notes:** {a['notes']}")
+            if a["key_prs"]:
+                lines.append(f"\n**Key PRs:** {', '.join(a['key_prs'])}")
+            sections.append("\n".join(lines))
+
+        return "\n\n---\n\n".join(sections)
+
+    formatted = format_report(collected)
+
+    # ------------------------------------------------------------------
+    # Step 8: Synthesize a cross-AIP progress report.
     # UsageLimits caps the token spend so a runaway prompt cannot
-    # exhaust the API budget in a single Dag run.
+    # exhaust the API budget in a single DAG run.
     # ------------------------------------------------------------------
     # [START aip_tracker_synthesis]
     synthesize = LLMOperator(
@@ -312,9 +705,11 @@ def example_aip_progress_tracker():
         system_prompt=SYNTHESIS_SYSTEM_PROMPT,
         prompt="""\
 Create a cross-AIP progress report from these individual assessments.
-Prioritize AIPs that are close to completion or have shared blockers.
+Prioritize AIPs that are close to completion or have shared dependencies.
+Use only the data below -- do not add external information.
 
-{{ ti.xcom_pull(task_ids='collect_analyses') }}""",
+{{ ti.xcom_pull(task_ids='format_report') }}""",
+        agent_params={"model_settings": {"temperature": 0}},
         usage_limits=UsageLimits(
             request_limit=5,
             input_tokens_limit=20_000,
@@ -322,18 +717,171 @@ Prioritize AIPs that are close to completion or have 
shared blockers.
         ),
     )
     # [END aip_tracker_synthesis]
-    collected >> synthesize
+    formatted >> synthesize
+
+    # ------------------------------------------------------------------
+    # Step 9: AI-powered hallucination validation.
+    # A second LLM checks every claim in the synthesized report against
+    # the raw per-AIP evidence.  Its only job is to judge and propose
+    # corrections -- a separate deterministic step applies them.
+    # ------------------------------------------------------------------
+    # [START aip_tracker_validation]
+    validate = LLMOperator(
+        task_id="validate_report",
+        llm_conn_id=LLM_CONN_ID,
+        system_prompt=VALIDATION_SYSTEM_PROMPT,
+        prompt="""\
+Verify the following synthesized report against the raw per-AIP evidence.
+Flag any claims not grounded in the evidence.
+
+=== SYNTHESIZED REPORT ===
+{{ ti.xcom_pull(task_ids='synthesize_report') }}
+
+=== RAW PER-AIP EVIDENCE ===
+{{ ti.xcom_pull(task_ids='format_report') }}""",
+        output_type=ValidationResult,
+        serialize_output=True,
+        usage_limits=UsageLimits(
+            request_limit=5,
+            input_tokens_limit=30_000,
+            output_tokens_limit=8_000,
+        ),
+        agent_params={"model_settings": {"temperature": 0}},
+    )
+    # [END aip_tracker_validation]
+    synthesize >> validate
+
+    # ------------------------------------------------------------------
+    # Step 10: Apply validation corrections deterministically.
+    # No LLM involved -- this is a mechanical find-and-replace using
+    # the validator's claim/correction pairs.  Ensures every flagged
+    # issue is actually fixed in the final report.
+    # ------------------------------------------------------------------
+    @task
+    def apply_validation(report: str, validation: dict, analyses: list[dict]) 
-> dict:
+        import re
+
+        corrected = report
+        applied = 0
+        for claim in validation.get("ungrounded_claims", []):
+            if claim["grounded"]:
+                continue
+            original = claim.get("claim", "")
+            correction = claim.get("correction", "")
+            if not original:
+                continue
+            if correction == "REMOVE":
+                if original in corrected:
+                    corrected = corrected.replace(original, "")
+                    applied += 1
+            elif correction and original in corrected:
+                corrected = corrected.replace(original, correction)
+                applied += 1
+
+        # --- Arithmetic validation against ground-truth analysis data ---
+        # Build lookup of correct counts from the structured per-AIP analyses.
+        truth = {}
+        total_shipped_all = 0
+        total_all = 0
+        for a in analyses:
+            aip_num = a["aip_number"]
+            shipped = a["shipped_count"]
+            total = a["total_count"]
+            truth[aip_num] = (shipped, total)
+            total_shipped_all += shipped
+            total_all += total
+
+        arithmetic_fixes = 0
+
+        # Fix per-AIP fractions: find "X/Y" near "AIP-{N}" and correct if 
wrong.
+        for aip_num, (shipped, total) in truth.items():
+            pattern = 
re.compile(rf"(AIP-{aip_num}\b[^\n]{{0,80}}?)\b(\d+)/(\d+)\b")
+            for match in pattern.finditer(corrected):
+                found_x, found_y = int(match.group(2)), int(match.group(3))
+                if found_x != shipped or found_y != total:
+                    old = match.group(0)
+                    new = old[: match.start(2) - match.start(0)] + 
f"{shipped}/{total}"
+                    corrected = corrected.replace(old, new, 1)
+                    arithmetic_fixes += 1
+
+        # Fix the cross-AIP summary total.
+        summary_pattern = 
re.compile(r"(Five|Four|Three|\d+)\s+AIPs\s+tracked\s+across\s+(\d+)/(\d+)")
+        summary_match = summary_pattern.search(corrected)
+        if summary_match:
+            found_shipped = int(summary_match.group(2))
+            found_total = int(summary_match.group(3))
+            if found_shipped != total_shipped_all or found_total != total_all:
+                old_summary = summary_match.group(0)
+                aip_count = len(truth)
+                number_words = {3: "Three", 4: "Four", 5: "Five"}
+                count_word = number_words.get(aip_count, str(aip_count))
+                new_summary = f"{count_word} AIPs tracked across 
{total_shipped_all}/{total_all}"
+                corrected = corrected.replace(old_summary, new_summary, 1)
+                arithmetic_fixes += 1
+
+        return {
+            "corrected_report": corrected.strip(),
+            "verdict": validation.get("overall_verdict", "unknown"),
+            "hallucination_risk": validation.get("hallucination_risk", 
"unknown"),
+            "total_claims_flagged": len(
+                [c for c in validation.get("ungrounded_claims", []) if not 
c["grounded"]]
+            ),
+            "corrections_applied": applied,
+            "arithmetic_fixes": arithmetic_fixes,
+            "ungrounded_claims": validation.get("ungrounded_claims", []),
+        }
+
+    validated = apply_validation(synthesize.output, validate.output, collected)
+
+    # ------------------------------------------------------------------
+    # Step 11: Build the review body for the human reviewer, showing
+    # the corrected report, validation verdict, and any flagged claims.
+    # ------------------------------------------------------------------
+    @task
+    def build_review_body(validated: dict) -> str:
+        verdict = validated["verdict"]
+        risk = validated["hallucination_risk"]
+        flagged = validated["total_claims_flagged"]
+        applied = validated["corrections_applied"]
+        arith = validated.get("arithmetic_fixes", 0)
+        claims = validated["ungrounded_claims"]
+
+        lines = [
+            f"# AIP Progress Report (Validation: {verdict.upper()})",
+            f"Hallucination risk: {risk}",
+            f"Claims flagged: {flagged} | Corrections applied: {applied} | 
Arithmetic fixes: {arith}\n",
+        ]
+
+        if claims:
+            ungrounded = [c for c in claims if not c["grounded"]]
+            if ungrounded:
+                lines.append(f"## {len(ungrounded)} Ungrounded Claim(s)\n")
+                for claim in ungrounded:
+                    lines.append(f"- **Original:** {claim['claim']}")
+                    correction = claim.get("correction", "")
+                    if correction == "REMOVE":
+                        lines.append("  **Action:** Removed")
+                    elif correction:
+                        lines.append(f"  **Replaced with:** {correction}")
+                lines.append("")
+
+        lines.append("---\n")
+        lines.append(validated["corrected_report"])
+
+        return "\n".join(lines)
+
+    review_body = build_review_body(validated)
 
     # ------------------------------------------------------------------
-    # Step 7: A maintainer reviews the synthesized report before it is
-    # shared on the dev list.  The Dag pauses here until the human
-    # approves, requests changes, or the timeout expires.
+    # Step 12: A maintainer reviews the corrected report.  The DAG
+    # pauses here until the human approves, requests changes, or the
+    # timeout expires.
     # ------------------------------------------------------------------
     # [START aip_tracker_hitl]
     ApprovalOperator(
         task_id="review_report",
-        subject="Review AIP Progress Report before sharing",
-        body=synthesize.output,
+        subject="Review AIP Progress Report (AI-validated)",
+        body=review_body,
         response_timeout=timedelta(hours=24),
     )
     # [END aip_tracker_hitl]


Reply via email to