This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 42d24243fb3 Add docs for using toolsets directly with PydanticAI in
any task (#64234)
42d24243fb3 is described below
commit 42d24243fb37b293714405aa2546f3ce2c5d619e
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Mar 26 01:51:19 2026 +0000
Add docs for using toolsets directly with PydanticAI in any task (#64234)
Toolsets like SQLToolset are standard pydantic-ai AbstractToolset
implementations that work independently of AgentOperator. Document this
pattern with an example showing @task + PydanticAIHook + SQLToolset,
and explain the tradeoffs vs AgentOperator (no durable execution, HITL,
or automatic tool logging).
SQLToolset import is inside the task function because common-sql is an
optional dependency of common-ai -- a top-level import would break the
other example DAGs in the file at parse time.
---
providers/common/ai/docs/toolsets.rst | 25 +++++++++++++++
.../ai/example_dags/example_pydantic_ai_hook.py | 37 +++++++++++++++++++++-
2 files changed, 61 insertions(+), 1 deletion(-)
diff --git a/providers/common/ai/docs/toolsets.rst
b/providers/common/ai/docs/toolsets.rst
index 8d295ac361c..ec3927946f9 100644
--- a/providers/common/ai/docs/toolsets.rst
+++ b/providers/common/ai/docs/toolsets.rst
@@ -49,6 +49,31 @@ passed to any pydantic-ai ``Agent``, including via
but you are not locked in.
+Using Toolsets Directly with PydanticAI
+---------------------------------------
+
+Toolsets are standard pydantic-ai ``AbstractToolset`` implementations with no
+dependency on ``AgentOperator`` or ``@task.agent``. You can use them anywhere
+you can run Python within Airflow -- ``@task`` functions, ``PythonOperator``
+callables, or any custom operator's ``execute()`` method -- by creating a
+``pydantic_ai.Agent`` yourself:
+
+.. exampleinclude::
/../../ai/src/airflow/providers/common/ai/example_dags/example_pydantic_ai_hook.py
+ :language: python
+ :start-after: [START howto_task_with_toolsets]
+ :end-before: [END howto_task_with_toolsets]
+
+This works because toolsets resolve Airflow connections lazily via
+``BaseHook.get_connection()``, which is available in any task execution
+context.
+
+This approach gives you full control over the agent lifecycle -- you can call
+``agent.run_sync()`` multiple times, swap models at runtime, or combine
+results from several agents in a single task. The tradeoff is that you lose
+the durable execution (step-level caching with retry replay), HITL review
+integration, and automatic tool call logging that ``AgentOperator`` provides.
+
+
``HookToolset``
---------------
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_pydantic_ai_hook.py
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_pydantic_ai_hook.py
index 1bb0f6bf444..16b961438bc 100644
---
a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_pydantic_ai_hook.py
+++
b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_pydantic_ai_hook.py
@@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Example DAG demonstrating PydanticAIHook usage."""
+"""Example DAGs demonstrating PydanticAIHook and direct pydantic-ai Agent
usage."""
from __future__ import annotations
@@ -65,3 +65,38 @@ def example_pydantic_ai_structured_output():
# [END howto_hook_pydantic_ai_structured_output]
example_pydantic_ai_structured_output()
+
+
+# [START howto_task_with_toolsets]
+@dag(schedule=None)
+def example_task_with_toolsets():
+ """Use toolsets directly in a @task function without AgentOperator."""
+
+ @task
+ def analyze_revenue() -> str:
+ from airflow.providers.common.ai.toolsets.sql import SQLToolset
+
+ hook = PydanticAIHook(llm_conn_id="pydanticai_default")
+ agent = hook.create_agent(
+ output_type=str,
+ instructions=(
+ "You are a sales analytics assistant. "
+ "Use the SQL tools to explore the database schema and answer
questions."
+ ),
+ toolsets=[
+ SQLToolset(
+ db_conn_id="my_database",
+ allowed_tables=["customers", "orders"],
+ max_rows=20,
+ ),
+ ],
+ )
+ result = agent.run_sync("Which customers have spent the most? Show the
top 5.")
+ return result.output
+
+ analyze_revenue()
+
+
+# [END howto_task_with_toolsets]
+
+example_task_with_toolsets()