kaxil commented on code in PR #1483:
URL: https://github.com/apache/airflow-site/pull/1483#discussion_r3012694232


##########
landing-pages/site/content/en/blog/common-ai-provider/index.md:
##########
@@ -0,0 +1,350 @@
+---
+title: "Introducing the Common AI Provider: LLM and AI Agent Support for 
Apache Airflow"
+linkTitle: "Introducing the Common AI Provider"
+authors:
+  - name: "Kaxil Naik"
+    github: "kaxil"
+    linkedin: "kaxil"
+  - name: "Pavan Kumar Gopidesu"
+    github: "gopidesupavan"
+    linkedin: "pavan-kumar-gopidesu"
+description: "The Common AI Provider adds LLM and AI agent operators to Apache 
Airflow with 6 operators, 5 toolsets, and 20+ model providers in one package."
+tags: [Community, Release]
+date: "2026-04-03"
+---
+
+At [Airflow Summit 
2025](https://airflowsummit.org/sessions/2025/airflow-as-an-ai-agents-toolkit-unlocking-1000-integrations-with-mcp/),
 we previewed what native AI integration in Apache Airflow could look like. 
Today we're shipping it.
+
+**[`apache-airflow-providers-common-ai`](https://pypi.org/project/apache-airflow-providers-common-ai/)
 0.1.0** adds LLM and agent capabilities directly to Airflow. Not a wrapper 
around another framework, but a provider package that plugs into the 
orchestrator you already run. It's built on [Pydantic 
AI](https://ai.pydantic.dev/) and supports 20+ model providers (OpenAI, 
Anthropic, Google, Azure, Bedrock, Ollama, and more) through a single install.
+
+```bash
+pip install 'apache-airflow-providers-common-ai'
+```
+
+Requires Apache Airflow 3.0+.
+
+> **Note:** This is a 0.x release. We're actively looking for feedback and 
iterating fast, so breaking changes are possible between minor versions. Try 
it, tell us what works and what doesn't. Your input directly shapes the API.
+
+## By the Numbers
+
+| | |
+|---|---|
+| **6** | Operators |
+| **6** | TaskFlow decorators |
+| **5** | Toolsets |
+| **4** | Connection types |
+| **20+** | Supported model providers via Pydantic AI |
+
+
+## The Decorator Suite
+
+Every operator has a matching TaskFlow decorator.
+
+### `@task.llm`: Single LLM Call
+
+Send a prompt, get text or structured output back.
+
+```python
+from pydantic import BaseModel
+from airflow.providers.common.compat.sdk import dag, task
+
+
+@dag
+def my_pipeline():
+    class Entities(BaseModel):
+        names: list[str]
+        locations: list[str]
+
+    @task.llm(
+        llm_conn_id="my_openai_conn",
+        system_prompt="Extract named entities.",
+        output_type=Entities,
+    )
+    def extract(text: str):
+        return f"Extract entities from: {text}"
+
+    extract("Alice visited Paris and met Bob in London.")
+
+
+my_pipeline()
+```
+
+The LLM returns a typed `Entities` object, not a string you have to parse. 
Downstream tasks get structured data through `XCom`.
+
+### `@task.agent`: Multi-Step Agent with Tools
+
+When the LLM needs to query databases, call APIs, or read files across 
multiple steps, use `@task.agent`. The agent picks which tools to call and 
loops until it has an answer.
+
+```python
+from airflow.providers.common.ai.toolsets.sql import SQLToolset
+from airflow.providers.common.compat.sdk import dag, task
+
+
+@dag
+def sql_analyst():
+    @task.agent(
+        llm_conn_id="my_openai_conn",
+        system_prompt="You are a SQL analyst. Use tools to answer questions 
with data.",
+        toolsets=[
+            SQLToolset(
+                db_conn_id="postgres_default",
+                allowed_tables=["customers", "orders"],
+                max_rows=20,
+            )
+        ],
+    )
+    def analyze(question: str):
+        return f"Answer this question about our data: {question}"
+
+    analyze("What are the top 5 customers by order count?")
+
+
+sql_analyst()
+```
+
+Under the hood, the agent calls `list_tables`, `get_schema`, and `query` on 
its own until it has the answer.
+
+### `@task.llm_branch`: LLM-Powered Branching
+
+The LLM decides which downstream task(s) to run. No string parsing. The LLM 
returns a constrained enum built from the task's downstream IDs.
+
+```python
[email protected]_branch(
+    llm_conn_id="my_openai_conn",
+    system_prompt="Classify the support ticket priority.",
+)
+def route_ticket(ticket_text: str):
+    return f"Classify this ticket: {ticket_text}"
+```
+
+### `@task.llm_sql`: Text-to-SQL with Safety Rails
+
+Generates SQL from natural language. The operator introspects your database 
schema and validates the output via AST parsing 
([sqlglot](https://github.com/tobymao/sqlglot)) before execution.
+
+```python
+from airflow.providers.common.compat.sdk import dag, task
+
+
+@dag
+def sql_generator():
+    @task.llm_sql(
+        llm_conn_id="my_openai_conn",
+        db_conn_id="postgres_default",
+        table_names=["orders", "customers"],
+        dialect="postgres",
+    )
+    def build_query(ds=None):
+        return f"Find customers who placed no orders after {ds}"
+
+    build_query()
+
+
+sql_generator()
+```
+
+### `@task.llm_file_analysis`: Analyze Files with LLMs
+
+Point it at files in object storage (S3, GCS, local) and let the LLM analyze 
them. Supports CSV, Parquet, Avro, JSON, Markdown, and images (multimodal).
+
+![LLM analyzing a CSV file — identifying columns, counting rows, computing 
totals](images/file-analysis-csv.png)
+
+It also handles multimodal input. Set `multi_modal=True` and the operator 
sends images and PDFs as binary attachments to the LLM:
+
+![Multimodal file analysis — LLM describing components from an architecture 
diagram image](images/file-analysis-image.png)
+
+### `@task.llm_schema_compare`: Cross-Database Schema Drift
+
+Compares schemas across databases and returns structured `SchemaMismatch` 
results with severity levels. Handles the type mapping headaches across systems 
(`varchar(n)` vs `string`, `timestamp` vs `timestamptz`).
+
+
+## 350+ Hooks as AI Tools
+
+Airflow already has 350+ provider hooks with typed methods, docstrings, and 
managed credentials. `S3Hook`, `GCSHook`, `SlackHook`, `SnowflakeHook`, 
`DbApiHook`. They all authenticate through Airflow's secret backend, and they 
all already work.
+
+Rather than setting up separate MCP servers with their own auth for each 
integration, `HookToolset` lets agents call hook methods directly using the 
connections you've already configured.
+
+`HookToolset` turns any of them into AI agent tools:
+
+```python
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.providers.common.ai.toolsets.hook import HookToolset
+
+# S3Hook methods become agent tools: the agent can list, read, and check S3 
objects
+HookToolset(
+    S3Hook(aws_conn_id="aws_default"),
+    allowed_methods=["list_keys", "read_key", "check_for_key"],
+    tool_name_prefix="s3_",
+)
+```
+
+The introspection engine builds JSON Schema from method signatures and 
enriches tool descriptions from docstrings (Sphinx and Google style). You 
explicitly declare which methods to expose. No auto-discovery, no unintended 
access. The agent sees `s3_list_keys`, `s3_read_key`, `s3_check_for_key` as 
typed tools with parameter descriptions pulled straight from the hook.
+
+This works with _any_ hook. Want your agent to send Slack messages? 
`HookToolset(SlackHook(...), allowed_methods=["send_message"])`. Query 
Snowflake? Use `SQLToolset` with a Snowflake connection. Hit an internal API? 
`HookToolset(HttpHook(...), allowed_methods=["run"])`.
+
+You can also compose multiple toolsets in a single agent. Give it `SQLToolset` 
for database access _and_ `HookToolset` for API calls, and the agent picks the 
right tool for each step.
+
+Four toolsets ship with the provider:
+
+| Toolset | What it does |
+|---|---|
+| **`SQLToolset`** | `list_tables`, `get_schema`, `query`, `check_query` for 
any `DbApiHook` database |
+| **`HookToolset`** | Wraps any Airflow hook's methods as agent tools |
+| **`MCPToolset`** | Connects to external 
[MCP](https://modelcontextprotocol.io/) servers via Airflow Connections |
+| **`DataFusionToolset`** | SQL over files in object storage (S3, GCS, Azure 
Blob) via Apache DataFusion |
+
+All toolsets resolve connections lazily through `BaseHook.get_connection()`. 
No hardcoded keys.
+
+Here's what an agent SQL analysis looks like in the Airflow task logs. The 
agent explored the schema, wrote queries, and produced a structured summary:
+
+![Agent SQL analysis showing tool calls and structured output in Airflow task 
logs](images/agent-sql-logs.png)
+
+
+## Not Locked Into Decorators
+
+You don't have to use `@task.agent` or the operator classes. Pydantic AI works 
directly from a plain `@task`, `PythonOperator`, or any custom operator:
+
+```python
+from pydantic_ai import Agent
+from airflow.providers.common.ai.hooks.pydantic_ai import PydanticAIHook
+from airflow.providers.common.ai.toolsets.sql import SQLToolset
+from airflow.providers.common.compat.sdk import dag, task
+
+
+@dag
+def raw_pydantic_ai():
+    @task
+    def multi_agent():
+        hook = PydanticAIHook(llm_conn_id="my_openai_conn")
+        model = hook.get_conn()
+
+        agent = Agent(
+            model,
+            system_prompt="You are a SQL analyst.",
+            toolsets=[SQLToolset(db_conn_id="postgres_default")],
+        )
+        result = agent.run_sync("What are the top products by revenue?")
+        return result.output
+
+    multi_agent()
+
+
+raw_pydantic_ai()
+```
+
+This gives you full control: run multiple agent calls in one task, swap models 
at runtime, combine outputs from different agents before returning.
+
+`@task.agent` adds guardrails on top (durable execution, HITL review, 
automatic tool logging). Raw Pydantic AI skips those. Both paths use the same 
toolsets.
+
+
+## Human-in-the-Loop
+
+Not every LLM output should go straight to production. The provider has two 
levels of human oversight.
+
+**Approval gates**: the task defers after generating output and waits for a 
human to approve before downstream tasks run:
+
+```python
+LLMOperator(
+    task_id="summarize_report",
+    prompt="Summarize the quarterly financial report for stakeholders.",
+    llm_conn_id="my_openai_conn",
+    require_approval=True,
+    approval_timeout=timedelta(hours=24),
+    allow_modifications=True,  # reviewer can edit the output
+)
+```
+
+**Iterative review**: a human reviews agent output, approves, rejects, or 
requests changes, and the agent revises in a loop:
+
+```python
+AgentOperator(
+    task_id="analyst",
+    prompt="Summarize the Q4 sales report.",
+    llm_conn_id="my_openai_conn",
+    enable_hitl_review=True,
+    max_hitl_iterations=5,
+    hitl_timeout=timedelta(minutes=30),
+)
+```
+
+A built-in plugin adds the review UI to the Airflow web interface.
+
+![Human-in-the-loop approval interface showing the generated output with 
approve, reject, and modify options](images/hitl-approval.png)
+
+![Human-in-the-loop review tab in the task instance 
page](images/hitl-review-tab.png)

Review Comment:
   @gopidesupavan Might be worth including a GIF too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to