kaxil commented on code in PR #67438:
URL: https://github.com/apache/airflow/pull/67438#discussion_r3337891059


##########
providers/common/ai/src/airflow/providers/common/ai/hooks/base.py:
##########
@@ -0,0 +1,427 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Shared contract for agent-framework hooks used by 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`."""
+
+from __future__ import annotations
+
+import functools
+import json
+import time
+from abc import ABCMeta, abstractmethod
+from collections.abc import Callable, Sequence
+from dataclasses import dataclass, field
+from typing import Any, ClassVar
+
+from airflow.providers.common.ai.utils.function_schema import 
callable_to_tool_spec
+from airflow.providers.common.compat.sdk import BaseHook
+
+# Attribute name for durable storage/counter bound to a framework agent 
instance.
+_AIRFLOW_DURABLE_ATTR = "_airflow_durable_state"
+
+
+@dataclass
+class AgentUsage:
+    """Token and request usage from an agent run, when the backend exposes 
it."""
+
+    requests: int = 0
+    tool_calls: int = 0
+    input_tokens: int = 0
+    output_tokens: int = 0
+    total_tokens: int = 0
+
+
+@dataclass
+class DurableStats:
+    """Step-level cache statistics from a durable agent run."""
+
+    replayed_model: int = 0
+    replayed_tool: int = 0
+    cached_model: int = 0
+    cached_tool: int = 0
+
+
+@dataclass
+class AgentRunResult:
+    """
+    Backend-neutral result from :meth:`BaseAIHook.run_agent`.
+
+    :param output: Final agent output (``str``, Pydantic model instance, etc.).
+    :param message_history: Opaque conversation state for HITL regeneration; 
only pass back to the
+        same hook implementation that produced it.
+    :param model_name: Resolved model identifier, when available.
+    :param usage: Usage counters when the backend exposes them.
+    :param tool_names: Ordered tool names invoked during the run, when known.
+    :param durable_stats: Durable step-cache statistics, populated when 
durable execution is enabled.
+    """
+
+    output: Any
+    message_history: Any = None
+    model_name: str | None = None
+    usage: AgentUsage | None = None
+    tool_names: list[str] | None = None
+    durable_stats: DurableStats | None = None
+
+
+@dataclass
+class ToolSpec:
+    """
+    Framework-neutral tool descriptor.
+
+    Toolsets produce :class:`ToolSpec` objects; each hook converts them to its
+    native tool representation via :meth:`BaseAIHook._tool_spec_to_native`.
+
+    :param name: Tool name exposed to the LLM.
+    :param description: Human-readable description used by the LLM to decide 
when to call this tool.
+    :param parameters: JSON Schema ``object`` describing the tool's parameters.
+    :param fn: Callable that implements the tool. Must accept keyword 
arguments matching *parameters*.
+    :param sequential: When ``True``, the backend must not invoke this tool 
concurrently with others
+        in the same turn (for example when tools share a non-thread-safe 
connection).
+    """
+
+    name: str
+    description: str
+    parameters: dict[str, Any]
+    fn: Callable[..., Any]
+    sequential: bool = False
+
+
+@dataclass
+class DurableContext:
+    """Framework-neutral identity of the running task, used to locate the 
durable cache file."""
+
+    dag_id: str
+    task_id: str
+    run_id: str
+    map_index: int = -1
+
+
+@dataclass
+class AgentRunRequest:
+    """
+    Parameter object passed to :meth:`BaseAIHook.create_agent` and 
:meth:`BaseAIHook.run_agent`.
+
+    Encapsulates everything the hook needs to build and run an agent in a 
single
+    framework-neutral structure, so that 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`
+    has zero framework-specific imports.
+
+    :param prompt: User prompt for this invocation (plain ``str`` or a 
multimodal
+        ``Sequence`` accepted by the backend agent's run API).
+    :param output_type: Expected structured output type (default: ``str``).
+    :param instructions: System-level instructions for the agent.
+    :param toolsets: List of :class:`BaseToolset` instances the agent may call.
+    :param usage_limits: Backend-specific usage limits; ignored if the hook 
does not support them.
+    :param message_history: Prior conversation state from a previous 
:class:`AgentRunResult`.
+    :param enable_tool_logging: When ``True`` (default), wraps 
Airflow-resolved tool callables with
+        a logging shim. Backend-native tool objects may be passed through 
unchanged by the concrete
+        hook and might not receive this wrapper.
+    :param durable_context: When set, enables step-level durable caching for 
the run.
+    :param agent_params: Extra keyword arguments forwarded to the underlying 
agent constructor.
+        Use this escape hatch for framework-specific options.
+    """
+
+    prompt: str | Sequence[Any]
+    output_type: type[Any] = str
+    instructions: str = ""
+    toolsets: list[Any] | None = None
+    usage_limits: Any = None
+    message_history: Any = None
+    enable_tool_logging: bool = True
+    durable_context: DurableContext | None = None
+    agent_params: dict[str, Any] = field(default_factory=dict)
+
+
+class BaseToolset(metaclass=ABCMeta):
+    """
+    Abstract base for framework-agnostic toolsets.
+
+    Subclasses implement :meth:`as_tools` to return a list of :class:`ToolSpec`
+    objects.  Each hook converts those specs to its native tool representation
+    via :meth:`BaseAIHook._tool_spec_to_native`.
+    """
+
+    @abstractmethod
+    def as_tools(self) -> list[ToolSpec]:
+        """Return the list of tools this toolset exposes."""
+
+
+class BaseAIHook(BaseHook, metaclass=ABCMeta):
+    """
+    Abstract hook for multi-turn LLM agents.
+
+    :class:`~airflow.providers.common.ai.operators.agent.AgentOperator` 
resolves the concrete hook
+    from the Airflow connection ``conn_type`` (for example ``pydanticai`` or 
``pydanticai-bedrock``).
+
+    :param llm_conn_id: Optional connection ID override (subclasses may apply 
a default).
+    :param model_id: Optional model override; not all backends use this 
parameter.
+
+    Subclasses implement :meth:`get_model`, :meth:`create_agent`, 
:meth:`run_agent`, and
+    :meth:`_tool_spec_to_native`.
+
+    Shared helpers :meth:`_init_durable`, :meth:`_resolve_tools`, 
:meth:`_logged_callable`, and
+    :meth:`_cached_callable` are provided for all hooks.
+    """
+
+    conn_name_attr = "llm_conn_id"
+
+    supports_toolsets: ClassVar[bool] = False
+    supports_durable: ClassVar[bool] = False
+    supports_usage_limits: ClassVar[bool] = False
+
+    def __init__(
+        self,
+        llm_conn_id: str | None = None,
+        model_id: str | None = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.llm_conn_id = llm_conn_id
+        self.model_id = model_id
+
+    @classmethod
+    def get_agent_hook(cls, conn_id: str, *, hook_params: dict[str, Any] | 
None = None) -> BaseAIHook:
+        """
+        Return an agent hook for *conn_id*, verifying it implements this 
contract.
+
+        Uses the connection's ``conn_type`` to select the hook class 
registered in
+        ``provider.yaml``.
+        """
+        hook = cls.get_hook(conn_id, hook_params=hook_params)
+        if not isinstance(hook, BaseAIHook):
+            raise TypeError(
+                f"Connection {conn_id!r} resolved to {type(hook).__name__}, 
which is not a BaseAIHook. "
+                "Use a connection type registered for agent frameworks (e.g. 
pydanticai, pydanticai-bedrock)."
+            )
+        return hook
+
+    @abstractmethod
+    def get_model(self) -> Any:
+        """Return the backend model/client used to construct agents."""
+
+    def get_conn(self) -> Any:
+        """Return the backend model/client. Delegates to :meth:`get_model`."""
+        return self.get_model()
+
+    @abstractmethod
+    def create_agent(self, request: AgentRunRequest) -> Any:
+        """
+        Build (but do not run) the agent described by *request*.
+
+        Responsible for resolving :attr:`AgentRunRequest.toolsets` via
+        :meth:`_resolve_tools` and constructing the framework-native agent 
object
+        with the model, tools, instructions, and output type from *request*.
+
+        When :attr:`AgentRunRequest.durable_context` is set, implementations
+        should call :meth:`_init_durable` and bind the returned storage/counter
+        to the agent via :meth:`_bind_agent_durable` so that :meth:`run_agent`
+        can retrieve and clean them up.
+
+        Implementations must call :meth:`validate_run_request` at the start of
+        this method before any agent construction or durable initialisation.
+
+        :param request: All parameters needed to configure the agent.
+        :returns: Framework-native agent object, ready to be passed to 
:meth:`run_agent`.
+        """
+
+    @abstractmethod
+    def run_agent(self, agent: Any, request: AgentRunRequest) -> 
AgentRunResult:
+        """
+        Execute *agent* for *request* and return a normalized 
:class:`AgentRunResult`.
+
+        Implementations with durable execution should pop durable state via
+        :meth:`_pop_agent_durable`, apply it during the run, and call
+        ``storage.cleanup()`` only after a successful run (keep the cache file
+        when the run raises so Airflow retries can replay cached steps).
+
+        :param agent: Framework-native agent produced by :meth:`create_agent`.
+        :param request: The same request used to create the agent (prompt, 
usage
+            limits, message history, etc.).
+        """
+
+    @abstractmethod
+    def _tool_spec_to_native(self, spec: ToolSpec) -> Any:
+        """
+        Convert a :class:`ToolSpec` to the agent framework's native tool 
representation.
+
+        Called once per tool inside :meth:`_resolve_tools`. The returned object
+        is collected into a list and passed to the underlying agent 
constructor.
+
+        :param spec: Universal tool descriptor, with the callable already 
wrapped
+            by any enabled logging / caching shims.
+        """
+
+    def validate_run_request(self, request: AgentRunRequest) -> None:
+        """
+        Raise if *request* uses features this hook implementation does not 
support.
+
+        Hook implementations call this at the start of :meth:`create_agent`.
+        """
+        hook_name = type(self).__name__
+        conn_id = self.llm_conn_id or "unknown"
+        if request.toolsets and not self.supports_toolsets:
+            raise ValueError(
+                f"toolsets are not supported for connection {conn_id!r} 
(conn_type resolves to {hook_name}). "
+            )
+        if request.usage_limits is not None and not self.supports_usage_limits:
+            raise ValueError(
+                f"usage_limits are not supported for connection {conn_id!r} "
+                f"(conn_type resolves to {hook_name})."
+            )
+        if request.durable_context is not None and not self.supports_durable:
+            raise ValueError(
+                f"durable execution requires a hook that supports durable 
caching; "
+                f"got {hook_name} for connection {conn_id!r}."
+            )
+
+    def _init_durable(self, ctx: DurableContext) -> tuple[Any, Any]:
+        """
+        Create and return a ``DurableStorage`` / ``DurableStepCounter`` pair 
for *ctx*.
+
+        Hooks call this inside :meth:`create_agent` when
+        :attr:`AgentRunRequest.durable_context` is set.
+        """
+        from airflow.providers.common.ai.durable.step_counter import 
DurableStepCounter
+        from airflow.providers.common.ai.durable.storage import DurableStorage
+
+        storage = DurableStorage(
+            dag_id=ctx.dag_id,
+            task_id=ctx.task_id,
+            run_id=ctx.run_id,
+            map_index=ctx.map_index,
+        )
+        counter = DurableStepCounter()
+        return storage, counter
+
+    @staticmethod
+    def _bind_agent_durable(agent: Any, storage: Any, counter: Any) -> None:
+        """Associate *storage* and *counter* with *agent* until 
:meth:`run_agent` completes."""
+        setattr(agent, _AIRFLOW_DURABLE_ATTR, (storage, counter))
+
+    @staticmethod
+    def _pop_agent_durable(agent: Any) -> tuple[Any, Any] | None:
+        """Remove and return durable state bound to *agent*, if any."""
+        state = getattr(agent, _AIRFLOW_DURABLE_ATTR, None)
+        if state is None:
+            return None
+        delattr(agent, _AIRFLOW_DURABLE_ATTR)
+        return state
+
+    def _resolve_tools(
+        self,
+        toolsets: list[Any],
+        enable_logging: bool,
+        storage: Any,
+        counter: Any,
+    ) -> list[Any]:
+        """
+        Convert a mixed list of toolsets / callables / native tools into 
framework-native tools.
+
+        Three cases per item:
+
+        * :class:`BaseToolset` — calls ``as_tools()`` and processes each 
:class:`ToolSpec`.
+        * Any callable (plain function, bound method, 
:func:`functools.partial`, or callable
+          object) — auto-wraps into a :class:`ToolSpec` using ``__name__`` and 
``__doc__``
+          (with sensible fallbacks for partials and callable objects), then 
processes it the
+          same way.
+        * Anything else — passed through unchanged (assumed to be a native 
tool object already
+          constructed for the target framework).
+
+        The processing pipeline for ``BaseToolset`` and callable items:
+        *fn* → optional cache wrap → optional log wrap → 
:meth:`_tool_spec_to_native`.
+
+        :param toolsets: Mix of :class:`BaseToolset` instances, callables 
(functions, bound
+            methods, :func:`functools.partial`, or callable objects), and 
native tool objects.
+        :param enable_logging: When ``True``, wrap each callable with 
:meth:`_logged_callable`.
+        :param storage: ``DurableStorage`` instance, or ``None`` to skip 
caching.
+        :param counter: ``DurableStepCounter`` instance, or ``None`` to skip 
caching.
+        """
+        native: list[Any] = []
+        for ts in toolsets:
+            if isinstance(ts, BaseToolset):
+                specs = ts.as_tools()
+            elif callable(ts):
+                specs = [callable_to_tool_spec(ts)]
+            else:
+                native.append(ts)
+                continue
+            for spec in specs:
+                fn = spec.fn
+                if storage is not None and counter is not None:
+                    fn = self._cached_callable(fn, storage, counter)
+                if enable_logging:
+                    fn = self._logged_callable(fn, self.log, name=spec.name)
+                adapted = ToolSpec(
+                    name=spec.name,
+                    description=spec.description,
+                    parameters=spec.parameters,
+                    fn=fn,
+                    sequential=spec.sequential,
+                )
+                native.append(self._tool_spec_to_native(adapted))
+        return native
+
+    @staticmethod
+    def _logged_callable(
+        fn: Callable[..., Any],
+        logger: Any,
+        *,
+        name: str | None = None,
+    ) -> Callable[..., Any]:
+        """Wrap *fn* to log tool name, args, timing, and exceptions."""
+        _tool_name = name or getattr(fn, "__name__", type(fn).__name__)
+
+        @functools.wraps(fn)

Review Comment:
   `@functools.wraps(fn)` over a plain `def wrapper` doesn't carry 
coroutine-function-ness, so an `async def` tool wrapped here reads as sync to 
the backend (`inspect.iscoroutinefunction(wrapper)` returns False). pydantic-ai 
then runs it on the sync path and gets back an un-awaited coroutine, and the 
run dies serializing it (`Unable to serialize unknown type: <class 
'coroutine'>`). This fires on the default path (`enable_tool_logging=True`) for 
any async user tool -- sync tools and `SQLToolset` happen to dodge it. 
`_cached_callable` at L414 has the same shape. Branch on 
`inspect.iscoroutinefunction(fn)` and return an `async def` wrapper that 
awaits, or reject async callables in `_resolve_tools`.



##########
providers/common/ai/src/airflow/providers/common/ai/hooks/base.py:
##########
@@ -0,0 +1,427 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Shared contract for agent-framework hooks used by 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`."""
+
+from __future__ import annotations
+
+import functools
+import json
+import time
+from abc import ABCMeta, abstractmethod
+from collections.abc import Callable, Sequence
+from dataclasses import dataclass, field
+from typing import Any, ClassVar
+
+from airflow.providers.common.ai.utils.function_schema import 
callable_to_tool_spec
+from airflow.providers.common.compat.sdk import BaseHook
+
+# Attribute name for durable storage/counter bound to a framework agent 
instance.
+_AIRFLOW_DURABLE_ATTR = "_airflow_durable_state"
+
+
+@dataclass
+class AgentUsage:
+    """Token and request usage from an agent run, when the backend exposes 
it."""
+
+    requests: int = 0
+    tool_calls: int = 0
+    input_tokens: int = 0
+    output_tokens: int = 0
+    total_tokens: int = 0
+
+
+@dataclass
+class DurableStats:
+    """Step-level cache statistics from a durable agent run."""
+
+    replayed_model: int = 0
+    replayed_tool: int = 0
+    cached_model: int = 0
+    cached_tool: int = 0
+
+
+@dataclass
+class AgentRunResult:
+    """
+    Backend-neutral result from :meth:`BaseAIHook.run_agent`.
+
+    :param output: Final agent output (``str``, Pydantic model instance, etc.).
+    :param message_history: Opaque conversation state for HITL regeneration; 
only pass back to the
+        same hook implementation that produced it.
+    :param model_name: Resolved model identifier, when available.
+    :param usage: Usage counters when the backend exposes them.
+    :param tool_names: Ordered tool names invoked during the run, when known.
+    :param durable_stats: Durable step-cache statistics, populated when 
durable execution is enabled.
+    """
+
+    output: Any
+    message_history: Any = None
+    model_name: str | None = None
+    usage: AgentUsage | None = None
+    tool_names: list[str] | None = None
+    durable_stats: DurableStats | None = None
+
+
+@dataclass
+class ToolSpec:
+    """
+    Framework-neutral tool descriptor.
+
+    Toolsets produce :class:`ToolSpec` objects; each hook converts them to its
+    native tool representation via :meth:`BaseAIHook._tool_spec_to_native`.
+
+    :param name: Tool name exposed to the LLM.
+    :param description: Human-readable description used by the LLM to decide 
when to call this tool.
+    :param parameters: JSON Schema ``object`` describing the tool's parameters.
+    :param fn: Callable that implements the tool. Must accept keyword 
arguments matching *parameters*.
+    :param sequential: When ``True``, the backend must not invoke this tool 
concurrently with others
+        in the same turn (for example when tools share a non-thread-safe 
connection).
+    """
+
+    name: str
+    description: str
+    parameters: dict[str, Any]
+    fn: Callable[..., Any]
+    sequential: bool = False
+
+
+@dataclass
+class DurableContext:
+    """Framework-neutral identity of the running task, used to locate the 
durable cache file."""
+
+    dag_id: str
+    task_id: str
+    run_id: str
+    map_index: int = -1
+
+
+@dataclass
+class AgentRunRequest:
+    """
+    Parameter object passed to :meth:`BaseAIHook.create_agent` and 
:meth:`BaseAIHook.run_agent`.
+
+    Encapsulates everything the hook needs to build and run an agent in a 
single
+    framework-neutral structure, so that 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`
+    has zero framework-specific imports.
+
+    :param prompt: User prompt for this invocation (plain ``str`` or a 
multimodal
+        ``Sequence`` accepted by the backend agent's run API).
+    :param output_type: Expected structured output type (default: ``str``).
+    :param instructions: System-level instructions for the agent.
+    :param toolsets: List of :class:`BaseToolset` instances the agent may call.
+    :param usage_limits: Backend-specific usage limits; ignored if the hook 
does not support them.
+    :param message_history: Prior conversation state from a previous 
:class:`AgentRunResult`.
+    :param enable_tool_logging: When ``True`` (default), wraps 
Airflow-resolved tool callables with
+        a logging shim. Backend-native tool objects may be passed through 
unchanged by the concrete
+        hook and might not receive this wrapper.
+    :param durable_context: When set, enables step-level durable caching for 
the run.
+    :param agent_params: Extra keyword arguments forwarded to the underlying 
agent constructor.
+        Use this escape hatch for framework-specific options.
+    """
+
+    prompt: str | Sequence[Any]
+    output_type: type[Any] = str
+    instructions: str = ""
+    toolsets: list[Any] | None = None
+    usage_limits: Any = None
+    message_history: Any = None
+    enable_tool_logging: bool = True
+    durable_context: DurableContext | None = None
+    agent_params: dict[str, Any] = field(default_factory=dict)
+
+
+class BaseToolset(metaclass=ABCMeta):
+    """
+    Abstract base for framework-agnostic toolsets.
+
+    Subclasses implement :meth:`as_tools` to return a list of :class:`ToolSpec`
+    objects.  Each hook converts those specs to its native tool representation
+    via :meth:`BaseAIHook._tool_spec_to_native`.
+    """
+
+    @abstractmethod
+    def as_tools(self) -> list[ToolSpec]:
+        """Return the list of tools this toolset exposes."""
+
+
+class BaseAIHook(BaseHook, metaclass=ABCMeta):
+    """
+    Abstract hook for multi-turn LLM agents.
+
+    :class:`~airflow.providers.common.ai.operators.agent.AgentOperator` 
resolves the concrete hook
+    from the Airflow connection ``conn_type`` (for example ``pydanticai`` or 
``pydanticai-bedrock``).
+
+    :param llm_conn_id: Optional connection ID override (subclasses may apply 
a default).
+    :param model_id: Optional model override; not all backends use this 
parameter.
+
+    Subclasses implement :meth:`get_model`, :meth:`create_agent`, 
:meth:`run_agent`, and
+    :meth:`_tool_spec_to_native`.
+
+    Shared helpers :meth:`_init_durable`, :meth:`_resolve_tools`, 
:meth:`_logged_callable`, and
+    :meth:`_cached_callable` are provided for all hooks.
+    """
+
+    conn_name_attr = "llm_conn_id"
+
+    supports_toolsets: ClassVar[bool] = False
+    supports_durable: ClassVar[bool] = False
+    supports_usage_limits: ClassVar[bool] = False
+
+    def __init__(
+        self,
+        llm_conn_id: str | None = None,
+        model_id: str | None = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.llm_conn_id = llm_conn_id
+        self.model_id = model_id
+
+    @classmethod
+    def get_agent_hook(cls, conn_id: str, *, hook_params: dict[str, Any] | 
None = None) -> BaseAIHook:
+        """
+        Return an agent hook for *conn_id*, verifying it implements this 
contract.
+
+        Uses the connection's ``conn_type`` to select the hook class 
registered in
+        ``provider.yaml``.
+        """
+        hook = cls.get_hook(conn_id, hook_params=hook_params)
+        if not isinstance(hook, BaseAIHook):
+            raise TypeError(
+                f"Connection {conn_id!r} resolved to {type(hook).__name__}, 
which is not a BaseAIHook. "
+                "Use a connection type registered for agent frameworks (e.g. 
pydanticai, pydanticai-bedrock)."
+            )
+        return hook
+
+    @abstractmethod
+    def get_model(self) -> Any:
+        """Return the backend model/client used to construct agents."""
+
+    def get_conn(self) -> Any:
+        """Return the backend model/client. Delegates to :meth:`get_model`."""
+        return self.get_model()
+
+    @abstractmethod
+    def create_agent(self, request: AgentRunRequest) -> Any:
+        """
+        Build (but do not run) the agent described by *request*.
+
+        Responsible for resolving :attr:`AgentRunRequest.toolsets` via
+        :meth:`_resolve_tools` and constructing the framework-native agent 
object
+        with the model, tools, instructions, and output type from *request*.
+
+        When :attr:`AgentRunRequest.durable_context` is set, implementations
+        should call :meth:`_init_durable` and bind the returned storage/counter
+        to the agent via :meth:`_bind_agent_durable` so that :meth:`run_agent`
+        can retrieve and clean them up.
+
+        Implementations must call :meth:`validate_run_request` at the start of
+        this method before any agent construction or durable initialisation.
+
+        :param request: All parameters needed to configure the agent.
+        :returns: Framework-native agent object, ready to be passed to 
:meth:`run_agent`.
+        """
+
+    @abstractmethod
+    def run_agent(self, agent: Any, request: AgentRunRequest) -> 
AgentRunResult:
+        """
+        Execute *agent* for *request* and return a normalized 
:class:`AgentRunResult`.
+
+        Implementations with durable execution should pop durable state via
+        :meth:`_pop_agent_durable`, apply it during the run, and call
+        ``storage.cleanup()`` only after a successful run (keep the cache file
+        when the run raises so Airflow retries can replay cached steps).
+
+        :param agent: Framework-native agent produced by :meth:`create_agent`.
+        :param request: The same request used to create the agent (prompt, 
usage
+            limits, message history, etc.).
+        """
+
+    @abstractmethod
+    def _tool_spec_to_native(self, spec: ToolSpec) -> Any:
+        """
+        Convert a :class:`ToolSpec` to the agent framework's native tool 
representation.
+
+        Called once per tool inside :meth:`_resolve_tools`. The returned object
+        is collected into a list and passed to the underlying agent 
constructor.
+
+        :param spec: Universal tool descriptor, with the callable already 
wrapped
+            by any enabled logging / caching shims.
+        """
+
+    def validate_run_request(self, request: AgentRunRequest) -> None:
+        """
+        Raise if *request* uses features this hook implementation does not 
support.
+
+        Hook implementations call this at the start of :meth:`create_agent`.
+        """
+        hook_name = type(self).__name__
+        conn_id = self.llm_conn_id or "unknown"
+        if request.toolsets and not self.supports_toolsets:
+            raise ValueError(
+                f"toolsets are not supported for connection {conn_id!r} 
(conn_type resolves to {hook_name}). "
+            )
+        if request.usage_limits is not None and not self.supports_usage_limits:
+            raise ValueError(
+                f"usage_limits are not supported for connection {conn_id!r} "
+                f"(conn_type resolves to {hook_name})."
+            )
+        if request.durable_context is not None and not self.supports_durable:
+            raise ValueError(
+                f"durable execution requires a hook that supports durable 
caching; "
+                f"got {hook_name} for connection {conn_id!r}."
+            )
+
+    def _init_durable(self, ctx: DurableContext) -> tuple[Any, Any]:
+        """
+        Create and return a ``DurableStorage`` / ``DurableStepCounter`` pair 
for *ctx*.
+
+        Hooks call this inside :meth:`create_agent` when
+        :attr:`AgentRunRequest.durable_context` is set.
+        """
+        from airflow.providers.common.ai.durable.step_counter import 
DurableStepCounter
+        from airflow.providers.common.ai.durable.storage import DurableStorage
+
+        storage = DurableStorage(
+            dag_id=ctx.dag_id,
+            task_id=ctx.task_id,
+            run_id=ctx.run_id,
+            map_index=ctx.map_index,
+        )
+        counter = DurableStepCounter()
+        return storage, counter
+
+    @staticmethod
+    def _bind_agent_durable(agent: Any, storage: Any, counter: Any) -> None:
+        """Associate *storage* and *counter* with *agent* until 
:meth:`run_agent` completes."""
+        setattr(agent, _AIRFLOW_DURABLE_ATTR, (storage, counter))
+
+    @staticmethod
+    def _pop_agent_durable(agent: Any) -> tuple[Any, Any] | None:
+        """Remove and return durable state bound to *agent*, if any."""
+        state = getattr(agent, _AIRFLOW_DURABLE_ATTR, None)
+        if state is None:
+            return None
+        delattr(agent, _AIRFLOW_DURABLE_ATTR)
+        return state
+
+    def _resolve_tools(
+        self,
+        toolsets: list[Any],
+        enable_logging: bool,
+        storage: Any,
+        counter: Any,
+    ) -> list[Any]:
+        """
+        Convert a mixed list of toolsets / callables / native tools into 
framework-native tools.
+
+        Three cases per item:
+
+        * :class:`BaseToolset` — calls ``as_tools()`` and processes each 
:class:`ToolSpec`.
+        * Any callable (plain function, bound method, 
:func:`functools.partial`, or callable
+          object) — auto-wraps into a :class:`ToolSpec` using ``__name__`` and 
``__doc__``
+          (with sensible fallbacks for partials and callable objects), then 
processes it the
+          same way.
+        * Anything else — passed through unchanged (assumed to be a native 
tool object already
+          constructed for the target framework).
+
+        The processing pipeline for ``BaseToolset`` and callable items:
+        *fn* → optional cache wrap → optional log wrap → 
:meth:`_tool_spec_to_native`.
+
+        :param toolsets: Mix of :class:`BaseToolset` instances, callables 
(functions, bound
+            methods, :func:`functools.partial`, or callable objects), and 
native tool objects.
+        :param enable_logging: When ``True``, wrap each callable with 
:meth:`_logged_callable`.
+        :param storage: ``DurableStorage`` instance, or ``None`` to skip 
caching.
+        :param counter: ``DurableStepCounter`` instance, or ``None`` to skip 
caching.
+        """
+        native: list[Any] = []
+        for ts in toolsets:
+            if isinstance(ts, BaseToolset):
+                specs = ts.as_tools()
+            elif callable(ts):
+                specs = [callable_to_tool_spec(ts)]
+            else:
+                native.append(ts)
+                continue
+            for spec in specs:
+                fn = spec.fn
+                if storage is not None and counter is not None:
+                    fn = self._cached_callable(fn, storage, counter)
+                if enable_logging:
+                    fn = self._logged_callable(fn, self.log, name=spec.name)
+                adapted = ToolSpec(
+                    name=spec.name,
+                    description=spec.description,
+                    parameters=spec.parameters,
+                    fn=fn,
+                    sequential=spec.sequential,
+                )
+                native.append(self._tool_spec_to_native(adapted))
+        return native
+
+    @staticmethod
+    def _logged_callable(
+        fn: Callable[..., Any],
+        logger: Any,
+        *,
+        name: str | None = None,
+    ) -> Callable[..., Any]:
+        """Wrap *fn* to log tool name, args, timing, and exceptions."""
+        _tool_name = name or getattr(fn, "__name__", type(fn).__name__)
+
+        @functools.wraps(fn)
+        def wrapper(*args, **kwargs):
+            logger.info("::group::Tool call: %s", _tool_name)
+            if kwargs:
+                logger.debug("Tool args: %s", json.dumps(kwargs, default=str))
+            start = time.monotonic()
+            try:
+                result = fn(*args, **kwargs)
+                elapsed = time.monotonic() - start
+                logger.info("Tool %s returned in %.2fs", _tool_name, elapsed)
+                logger.info("::endgroup::")
+                return result
+            except Exception:
+                elapsed = time.monotonic() - start
+                logger.exception("Tool %s failed after %.2fs", _tool_name, 
elapsed)
+                logger.info("::endgroup::")
+                raise
+
+        return wrapper
+
+    @staticmethod
+    def _cached_callable(
+        fn: Callable[..., Any],
+        storage: Any,
+        counter: Any,
+    ) -> Callable[..., Any]:
+        """Wrap *fn* to cache its result in *storage* using a monotonic step 
counter."""
+
+        @functools.wraps(fn)
+        def wrapper(*args, **kwargs):
+            step = counter.next_step()

Review Comment:
   The cache key is `counter.next_step()`, a process-wide ordinal, and 
`next_step()` is non-atomic (`step = self._step; self._step += 1` in 
step_counter.py, no lock). pydantic-ai runs non-sequential sync tools in one 
model turn concurrently via `run_in_executor` on a thread pool, and plain 
callables / custom `BaseToolset` specs default to `sequential=False`. Two 
cached tools in a turn race the counter, so the step->tool mapping is 
non-deterministic; on a retry the order can flip and tool A's cached result 
replays into tool B -- silent wrong data, which is exactly what durable mode is 
meant to prevent. `SQLToolset` is safe because it pins `sequential=True`. 
Either force `sequential=True` for cached callable/`BaseToolset` tools in 
durable mode, or key the cache on stable call identity (tool name + args) 
rather than the shared ordinal.



##########
providers/common/ai/src/airflow/providers/common/ai/hooks/base.py:
##########
@@ -0,0 +1,427 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Shared contract for agent-framework hooks used by 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`."""
+
+from __future__ import annotations
+
+import functools
+import json
+import time
+from abc import ABCMeta, abstractmethod
+from collections.abc import Callable, Sequence
+from dataclasses import dataclass, field
+from typing import Any, ClassVar
+
+from airflow.providers.common.ai.utils.function_schema import 
callable_to_tool_spec
+from airflow.providers.common.compat.sdk import BaseHook
+
+# Attribute name for durable storage/counter bound to a framework agent 
instance.
+_AIRFLOW_DURABLE_ATTR = "_airflow_durable_state"
+
+
+@dataclass
+class AgentUsage:
+    """Token and request usage from an agent run, when the backend exposes 
it."""
+
+    requests: int = 0
+    tool_calls: int = 0
+    input_tokens: int = 0
+    output_tokens: int = 0
+    total_tokens: int = 0
+
+
+@dataclass
+class DurableStats:
+    """Step-level cache statistics from a durable agent run."""
+
+    replayed_model: int = 0
+    replayed_tool: int = 0
+    cached_model: int = 0
+    cached_tool: int = 0
+
+
+@dataclass
+class AgentRunResult:
+    """
+    Backend-neutral result from :meth:`BaseAIHook.run_agent`.
+
+    :param output: Final agent output (``str``, Pydantic model instance, etc.).
+    :param message_history: Opaque conversation state for HITL regeneration; 
only pass back to the
+        same hook implementation that produced it.
+    :param model_name: Resolved model identifier, when available.
+    :param usage: Usage counters when the backend exposes them.
+    :param tool_names: Ordered tool names invoked during the run, when known.
+    :param durable_stats: Durable step-cache statistics, populated when 
durable execution is enabled.
+    """
+
+    output: Any
+    message_history: Any = None
+    model_name: str | None = None
+    usage: AgentUsage | None = None
+    tool_names: list[str] | None = None
+    durable_stats: DurableStats | None = None
+
+
+@dataclass
+class ToolSpec:
+    """
+    Framework-neutral tool descriptor.
+
+    Toolsets produce :class:`ToolSpec` objects; each hook converts them to its
+    native tool representation via :meth:`BaseAIHook._tool_spec_to_native`.
+
+    :param name: Tool name exposed to the LLM.
+    :param description: Human-readable description used by the LLM to decide 
when to call this tool.
+    :param parameters: JSON Schema ``object`` describing the tool's parameters.
+    :param fn: Callable that implements the tool. Must accept keyword 
arguments matching *parameters*.
+    :param sequential: When ``True``, the backend must not invoke this tool 
concurrently with others
+        in the same turn (for example when tools share a non-thread-safe 
connection).
+    """
+
+    name: str
+    description: str
+    parameters: dict[str, Any]
+    fn: Callable[..., Any]
+    sequential: bool = False
+
+
+@dataclass
+class DurableContext:
+    """Framework-neutral identity of the running task, used to locate the 
durable cache file."""
+
+    dag_id: str
+    task_id: str
+    run_id: str
+    map_index: int = -1
+
+
+@dataclass
+class AgentRunRequest:
+    """
+    Parameter object passed to :meth:`BaseAIHook.create_agent` and 
:meth:`BaseAIHook.run_agent`.
+
+    Encapsulates everything the hook needs to build and run an agent in a 
single
+    framework-neutral structure, so that 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`
+    has zero framework-specific imports.
+
+    :param prompt: User prompt for this invocation (plain ``str`` or a 
multimodal
+        ``Sequence`` accepted by the backend agent's run API).
+    :param output_type: Expected structured output type (default: ``str``).
+    :param instructions: System-level instructions for the agent.
+    :param toolsets: List of :class:`BaseToolset` instances the agent may call.
+    :param usage_limits: Backend-specific usage limits; ignored if the hook 
does not support them.
+    :param message_history: Prior conversation state from a previous 
:class:`AgentRunResult`.
+    :param enable_tool_logging: When ``True`` (default), wraps 
Airflow-resolved tool callables with
+        a logging shim. Backend-native tool objects may be passed through 
unchanged by the concrete
+        hook and might not receive this wrapper.
+    :param durable_context: When set, enables step-level durable caching for 
the run.
+    :param agent_params: Extra keyword arguments forwarded to the underlying 
agent constructor.
+        Use this escape hatch for framework-specific options.
+    """
+
+    prompt: str | Sequence[Any]
+    output_type: type[Any] = str
+    instructions: str = ""
+    toolsets: list[Any] | None = None
+    usage_limits: Any = None
+    message_history: Any = None
+    enable_tool_logging: bool = True
+    durable_context: DurableContext | None = None
+    agent_params: dict[str, Any] = field(default_factory=dict)
+
+
+class BaseToolset(metaclass=ABCMeta):
+    """
+    Abstract base for framework-agnostic toolsets.
+
+    Subclasses implement :meth:`as_tools` to return a list of :class:`ToolSpec`
+    objects.  Each hook converts those specs to its native tool representation
+    via :meth:`BaseAIHook._tool_spec_to_native`.
+    """
+
+    @abstractmethod
+    def as_tools(self) -> list[ToolSpec]:
+        """Return the list of tools this toolset exposes."""
+
+
+class BaseAIHook(BaseHook, metaclass=ABCMeta):
+    """
+    Abstract hook for multi-turn LLM agents.
+
+    :class:`~airflow.providers.common.ai.operators.agent.AgentOperator` 
resolves the concrete hook
+    from the Airflow connection ``conn_type`` (for example ``pydanticai`` or 
``pydanticai-bedrock``).
+
+    :param llm_conn_id: Optional connection ID override (subclasses may apply 
a default).
+    :param model_id: Optional model override; not all backends use this 
parameter.
+
+    Subclasses implement :meth:`get_model`, :meth:`create_agent`, 
:meth:`run_agent`, and
+    :meth:`_tool_spec_to_native`.
+
+    Shared helpers :meth:`_init_durable`, :meth:`_resolve_tools`, 
:meth:`_logged_callable`, and
+    :meth:`_cached_callable` are provided for all hooks.
+    """
+
+    conn_name_attr = "llm_conn_id"
+
+    supports_toolsets: ClassVar[bool] = False
+    supports_durable: ClassVar[bool] = False
+    supports_usage_limits: ClassVar[bool] = False
+
+    def __init__(
+        self,
+        llm_conn_id: str | None = None,
+        model_id: str | None = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.llm_conn_id = llm_conn_id
+        self.model_id = model_id
+
+    @classmethod
+    def get_agent_hook(cls, conn_id: str, *, hook_params: dict[str, Any] | 
None = None) -> BaseAIHook:
+        """
+        Return an agent hook for *conn_id*, verifying it implements this 
contract.
+
+        Uses the connection's ``conn_type`` to select the hook class 
registered in
+        ``provider.yaml``.
+        """
+        hook = cls.get_hook(conn_id, hook_params=hook_params)
+        if not isinstance(hook, BaseAIHook):
+            raise TypeError(
+                f"Connection {conn_id!r} resolved to {type(hook).__name__}, 
which is not a BaseAIHook. "
+                "Use a connection type registered for agent frameworks (e.g. 
pydanticai, pydanticai-bedrock)."
+            )
+        return hook
+
+    @abstractmethod
+    def get_model(self) -> Any:
+        """Return the backend model/client used to construct agents."""
+
+    def get_conn(self) -> Any:
+        """Return the backend model/client. Delegates to :meth:`get_model`."""
+        return self.get_model()
+
+    @abstractmethod
+    def create_agent(self, request: AgentRunRequest) -> Any:
+        """
+        Build (but do not run) the agent described by *request*.
+
+        Responsible for resolving :attr:`AgentRunRequest.toolsets` via
+        :meth:`_resolve_tools` and constructing the framework-native agent 
object
+        with the model, tools, instructions, and output type from *request*.
+
+        When :attr:`AgentRunRequest.durable_context` is set, implementations
+        should call :meth:`_init_durable` and bind the returned storage/counter
+        to the agent via :meth:`_bind_agent_durable` so that :meth:`run_agent`
+        can retrieve and clean them up.
+
+        Implementations must call :meth:`validate_run_request` at the start of
+        this method before any agent construction or durable initialisation.
+
+        :param request: All parameters needed to configure the agent.
+        :returns: Framework-native agent object, ready to be passed to 
:meth:`run_agent`.
+        """
+
+    @abstractmethod
+    def run_agent(self, agent: Any, request: AgentRunRequest) -> 
AgentRunResult:
+        """
+        Execute *agent* for *request* and return a normalized 
:class:`AgentRunResult`.
+
+        Implementations with durable execution should pop durable state via
+        :meth:`_pop_agent_durable`, apply it during the run, and call
+        ``storage.cleanup()`` only after a successful run (keep the cache file
+        when the run raises so Airflow retries can replay cached steps).
+
+        :param agent: Framework-native agent produced by :meth:`create_agent`.
+        :param request: The same request used to create the agent (prompt, 
usage
+            limits, message history, etc.).
+        """
+
+    @abstractmethod
+    def _tool_spec_to_native(self, spec: ToolSpec) -> Any:
+        """
+        Convert a :class:`ToolSpec` to the agent framework's native tool 
representation.
+
+        Called once per tool inside :meth:`_resolve_tools`. The returned object
+        is collected into a list and passed to the underlying agent 
constructor.
+
+        :param spec: Universal tool descriptor, with the callable already 
wrapped
+            by any enabled logging / caching shims.
+        """
+
+    def validate_run_request(self, request: AgentRunRequest) -> None:
+        """
+        Raise if *request* uses features this hook implementation does not 
support.
+
+        Hook implementations call this at the start of :meth:`create_agent`.
+        """
+        hook_name = type(self).__name__
+        conn_id = self.llm_conn_id or "unknown"
+        if request.toolsets and not self.supports_toolsets:
+            raise ValueError(
+                f"toolsets are not supported for connection {conn_id!r} 
(conn_type resolves to {hook_name}). "

Review Comment:
   This message ends mid-sentence with a trailing space (`(conn_type resolves 
to {hook_name}). `), where the `usage_limits` and `durable` raises just below 
give a complete reason. Looks like a truncated edit -- drop the trailing space 
or finish the sentence.



##########
providers/common/ai/src/airflow/providers/common/ai/hooks/base.py:
##########
@@ -0,0 +1,427 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Shared contract for agent-framework hooks used by 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`."""
+
+from __future__ import annotations
+
+import functools
+import json
+import time
+from abc import ABCMeta, abstractmethod
+from collections.abc import Callable, Sequence
+from dataclasses import dataclass, field
+from typing import Any, ClassVar
+
+from airflow.providers.common.ai.utils.function_schema import 
callable_to_tool_spec
+from airflow.providers.common.compat.sdk import BaseHook
+
+# Attribute name for durable storage/counter bound to a framework agent 
instance.
+_AIRFLOW_DURABLE_ATTR = "_airflow_durable_state"
+
+
+@dataclass
+class AgentUsage:
+    """Token and request usage from an agent run, when the backend exposes 
it."""
+
+    requests: int = 0
+    tool_calls: int = 0
+    input_tokens: int = 0
+    output_tokens: int = 0
+    total_tokens: int = 0
+
+
+@dataclass
+class DurableStats:
+    """Step-level cache statistics from a durable agent run."""
+
+    replayed_model: int = 0
+    replayed_tool: int = 0
+    cached_model: int = 0
+    cached_tool: int = 0
+
+
+@dataclass
+class AgentRunResult:
+    """
+    Backend-neutral result from :meth:`BaseAIHook.run_agent`.
+
+    :param output: Final agent output (``str``, Pydantic model instance, etc.).
+    :param message_history: Opaque conversation state for HITL regeneration; 
only pass back to the
+        same hook implementation that produced it.
+    :param model_name: Resolved model identifier, when available.
+    :param usage: Usage counters when the backend exposes them.
+    :param tool_names: Ordered tool names invoked during the run, when known.
+    :param durable_stats: Durable step-cache statistics, populated when 
durable execution is enabled.
+    """
+
+    output: Any
+    message_history: Any = None
+    model_name: str | None = None
+    usage: AgentUsage | None = None
+    tool_names: list[str] | None = None
+    durable_stats: DurableStats | None = None
+
+
+@dataclass
+class ToolSpec:
+    """
+    Framework-neutral tool descriptor.
+
+    Toolsets produce :class:`ToolSpec` objects; each hook converts them to its
+    native tool representation via :meth:`BaseAIHook._tool_spec_to_native`.
+
+    :param name: Tool name exposed to the LLM.
+    :param description: Human-readable description used by the LLM to decide 
when to call this tool.
+    :param parameters: JSON Schema ``object`` describing the tool's parameters.
+    :param fn: Callable that implements the tool. Must accept keyword 
arguments matching *parameters*.
+    :param sequential: When ``True``, the backend must not invoke this tool 
concurrently with others
+        in the same turn (for example when tools share a non-thread-safe 
connection).
+    """
+
+    name: str
+    description: str
+    parameters: dict[str, Any]
+    fn: Callable[..., Any]
+    sequential: bool = False
+
+
+@dataclass
+class DurableContext:
+    """Framework-neutral identity of the running task, used to locate the 
durable cache file."""
+
+    dag_id: str
+    task_id: str
+    run_id: str
+    map_index: int = -1
+
+
+@dataclass
+class AgentRunRequest:
+    """
+    Parameter object passed to :meth:`BaseAIHook.create_agent` and 
:meth:`BaseAIHook.run_agent`.
+
+    Encapsulates everything the hook needs to build and run an agent in a 
single
+    framework-neutral structure, so that 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`
+    has zero framework-specific imports.
+
+    :param prompt: User prompt for this invocation (plain ``str`` or a 
multimodal
+        ``Sequence`` accepted by the backend agent's run API).
+    :param output_type: Expected structured output type (default: ``str``).
+    :param instructions: System-level instructions for the agent.
+    :param toolsets: List of :class:`BaseToolset` instances the agent may call.
+    :param usage_limits: Backend-specific usage limits; ignored if the hook 
does not support them.
+    :param message_history: Prior conversation state from a previous 
:class:`AgentRunResult`.
+    :param enable_tool_logging: When ``True`` (default), wraps 
Airflow-resolved tool callables with
+        a logging shim. Backend-native tool objects may be passed through 
unchanged by the concrete
+        hook and might not receive this wrapper.
+    :param durable_context: When set, enables step-level durable caching for 
the run.
+    :param agent_params: Extra keyword arguments forwarded to the underlying 
agent constructor.
+        Use this escape hatch for framework-specific options.
+    """
+
+    prompt: str | Sequence[Any]
+    output_type: type[Any] = str
+    instructions: str = ""
+    toolsets: list[Any] | None = None
+    usage_limits: Any = None
+    message_history: Any = None
+    enable_tool_logging: bool = True
+    durable_context: DurableContext | None = None
+    agent_params: dict[str, Any] = field(default_factory=dict)
+
+
+class BaseToolset(metaclass=ABCMeta):
+    """
+    Abstract base for framework-agnostic toolsets.
+
+    Subclasses implement :meth:`as_tools` to return a list of :class:`ToolSpec`
+    objects.  Each hook converts those specs to its native tool representation
+    via :meth:`BaseAIHook._tool_spec_to_native`.
+    """
+
+    @abstractmethod
+    def as_tools(self) -> list[ToolSpec]:
+        """Return the list of tools this toolset exposes."""
+
+
+class BaseAIHook(BaseHook, metaclass=ABCMeta):
+    """
+    Abstract hook for multi-turn LLM agents.
+
+    :class:`~airflow.providers.common.ai.operators.agent.AgentOperator` 
resolves the concrete hook
+    from the Airflow connection ``conn_type`` (for example ``pydanticai`` or 
``pydanticai-bedrock``).
+
+    :param llm_conn_id: Optional connection ID override (subclasses may apply 
a default).
+    :param model_id: Optional model override; not all backends use this 
parameter.
+
+    Subclasses implement :meth:`get_model`, :meth:`create_agent`, 
:meth:`run_agent`, and
+    :meth:`_tool_spec_to_native`.
+
+    Shared helpers :meth:`_init_durable`, :meth:`_resolve_tools`, 
:meth:`_logged_callable`, and
+    :meth:`_cached_callable` are provided for all hooks.
+    """
+
+    conn_name_attr = "llm_conn_id"
+
+    supports_toolsets: ClassVar[bool] = False

Review Comment:
   Three `supports_*` boolean ClassVars plus the hand-written 
`validate_run_request` is readable at this size -- no change needed now. 
Flagging the direction only: each new capability adds a flag and a branch, and 
some capabilities aren't binary (CrewAI supports toolsets but only 
per-agent-within-crew; LlamaIndex toolsets are async). When the second backend 
lands, a single `capabilities: frozenset[...]` driving validation in a loop 
would scale better than a growing flag + if-ladder.



##########
providers/common/ai/src/airflow/providers/common/ai/hooks/base.py:
##########
@@ -0,0 +1,427 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Shared contract for agent-framework hooks used by 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`."""
+
+from __future__ import annotations
+
+import functools
+import json
+import time
+from abc import ABCMeta, abstractmethod
+from collections.abc import Callable, Sequence
+from dataclasses import dataclass, field
+from typing import Any, ClassVar
+
+from airflow.providers.common.ai.utils.function_schema import 
callable_to_tool_spec
+from airflow.providers.common.compat.sdk import BaseHook
+
+# Attribute name for durable storage/counter bound to a framework agent 
instance.
+_AIRFLOW_DURABLE_ATTR = "_airflow_durable_state"
+
+
+@dataclass
+class AgentUsage:
+    """Token and request usage from an agent run, when the backend exposes 
it."""
+
+    requests: int = 0
+    tool_calls: int = 0
+    input_tokens: int = 0
+    output_tokens: int = 0
+    total_tokens: int = 0
+
+
+@dataclass
+class DurableStats:
+    """Step-level cache statistics from a durable agent run."""
+
+    replayed_model: int = 0
+    replayed_tool: int = 0
+    cached_model: int = 0
+    cached_tool: int = 0
+
+
+@dataclass
+class AgentRunResult:
+    """
+    Backend-neutral result from :meth:`BaseAIHook.run_agent`.
+
+    :param output: Final agent output (``str``, Pydantic model instance, etc.).
+    :param message_history: Opaque conversation state for HITL regeneration; 
only pass back to the
+        same hook implementation that produced it.
+    :param model_name: Resolved model identifier, when available.
+    :param usage: Usage counters when the backend exposes them.
+    :param tool_names: Ordered tool names invoked during the run, when known.
+    :param durable_stats: Durable step-cache statistics, populated when 
durable execution is enabled.
+    """
+
+    output: Any
+    message_history: Any = None
+    model_name: str | None = None
+    usage: AgentUsage | None = None
+    tool_names: list[str] | None = None
+    durable_stats: DurableStats | None = None
+
+
+@dataclass
+class ToolSpec:
+    """
+    Framework-neutral tool descriptor.
+
+    Toolsets produce :class:`ToolSpec` objects; each hook converts them to its
+    native tool representation via :meth:`BaseAIHook._tool_spec_to_native`.
+
+    :param name: Tool name exposed to the LLM.
+    :param description: Human-readable description used by the LLM to decide 
when to call this tool.
+    :param parameters: JSON Schema ``object`` describing the tool's parameters.
+    :param fn: Callable that implements the tool. Must accept keyword 
arguments matching *parameters*.
+    :param sequential: When ``True``, the backend must not invoke this tool 
concurrently with others
+        in the same turn (for example when tools share a non-thread-safe 
connection).
+    """
+
+    name: str
+    description: str
+    parameters: dict[str, Any]
+    fn: Callable[..., Any]
+    sequential: bool = False
+
+
+@dataclass
+class DurableContext:
+    """Framework-neutral identity of the running task, used to locate the 
durable cache file."""
+
+    dag_id: str
+    task_id: str
+    run_id: str
+    map_index: int = -1
+
+
+@dataclass
+class AgentRunRequest:
+    """
+    Parameter object passed to :meth:`BaseAIHook.create_agent` and 
:meth:`BaseAIHook.run_agent`.
+
+    Encapsulates everything the hook needs to build and run an agent in a 
single
+    framework-neutral structure, so that 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`
+    has zero framework-specific imports.
+
+    :param prompt: User prompt for this invocation (plain ``str`` or a 
multimodal
+        ``Sequence`` accepted by the backend agent's run API).
+    :param output_type: Expected structured output type (default: ``str``).
+    :param instructions: System-level instructions for the agent.
+    :param toolsets: List of :class:`BaseToolset` instances the agent may call.
+    :param usage_limits: Backend-specific usage limits; ignored if the hook 
does not support them.
+    :param message_history: Prior conversation state from a previous 
:class:`AgentRunResult`.
+    :param enable_tool_logging: When ``True`` (default), wraps 
Airflow-resolved tool callables with
+        a logging shim. Backend-native tool objects may be passed through 
unchanged by the concrete
+        hook and might not receive this wrapper.
+    :param durable_context: When set, enables step-level durable caching for 
the run.
+    :param agent_params: Extra keyword arguments forwarded to the underlying 
agent constructor.
+        Use this escape hatch for framework-specific options.
+    """
+
+    prompt: str | Sequence[Any]
+    output_type: type[Any] = str
+    instructions: str = ""
+    toolsets: list[Any] | None = None
+    usage_limits: Any = None
+    message_history: Any = None
+    enable_tool_logging: bool = True
+    durable_context: DurableContext | None = None
+    agent_params: dict[str, Any] = field(default_factory=dict)
+
+
+class BaseToolset(metaclass=ABCMeta):
+    """
+    Abstract base for framework-agnostic toolsets.
+
+    Subclasses implement :meth:`as_tools` to return a list of :class:`ToolSpec`
+    objects.  Each hook converts those specs to its native tool representation
+    via :meth:`BaseAIHook._tool_spec_to_native`.
+    """
+
+    @abstractmethod
+    def as_tools(self) -> list[ToolSpec]:
+        """Return the list of tools this toolset exposes."""
+
+
+class BaseAIHook(BaseHook, metaclass=ABCMeta):
+    """
+    Abstract hook for multi-turn LLM agents.
+
+    :class:`~airflow.providers.common.ai.operators.agent.AgentOperator` 
resolves the concrete hook
+    from the Airflow connection ``conn_type`` (for example ``pydanticai`` or 
``pydanticai-bedrock``).
+
+    :param llm_conn_id: Optional connection ID override (subclasses may apply 
a default).
+    :param model_id: Optional model override; not all backends use this 
parameter.
+
+    Subclasses implement :meth:`get_model`, :meth:`create_agent`, 
:meth:`run_agent`, and
+    :meth:`_tool_spec_to_native`.
+
+    Shared helpers :meth:`_init_durable`, :meth:`_resolve_tools`, 
:meth:`_logged_callable`, and
+    :meth:`_cached_callable` are provided for all hooks.
+    """
+
+    conn_name_attr = "llm_conn_id"
+
+    supports_toolsets: ClassVar[bool] = False
+    supports_durable: ClassVar[bool] = False
+    supports_usage_limits: ClassVar[bool] = False
+
+    def __init__(
+        self,
+        llm_conn_id: str | None = None,
+        model_id: str | None = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.llm_conn_id = llm_conn_id
+        self.model_id = model_id
+
+    @classmethod
+    def get_agent_hook(cls, conn_id: str, *, hook_params: dict[str, Any] | 
None = None) -> BaseAIHook:
+        """
+        Return an agent hook for *conn_id*, verifying it implements this 
contract.
+
+        Uses the connection's ``conn_type`` to select the hook class 
registered in
+        ``provider.yaml``.
+        """
+        hook = cls.get_hook(conn_id, hook_params=hook_params)
+        if not isinstance(hook, BaseAIHook):
+            raise TypeError(
+                f"Connection {conn_id!r} resolved to {type(hook).__name__}, 
which is not a BaseAIHook. "
+                "Use a connection type registered for agent frameworks (e.g. 
pydanticai, pydanticai-bedrock)."
+            )
+        return hook
+
+    @abstractmethod
+    def get_model(self) -> Any:
+        """Return the backend model/client used to construct agents."""
+
+    def get_conn(self) -> Any:
+        """Return the backend model/client. Delegates to :meth:`get_model`."""
+        return self.get_model()
+
+    @abstractmethod
+    def create_agent(self, request: AgentRunRequest) -> Any:
+        """
+        Build (but do not run) the agent described by *request*.
+
+        Responsible for resolving :attr:`AgentRunRequest.toolsets` via
+        :meth:`_resolve_tools` and constructing the framework-native agent 
object
+        with the model, tools, instructions, and output type from *request*.
+
+        When :attr:`AgentRunRequest.durable_context` is set, implementations
+        should call :meth:`_init_durable` and bind the returned storage/counter
+        to the agent via :meth:`_bind_agent_durable` so that :meth:`run_agent`
+        can retrieve and clean them up.
+
+        Implementations must call :meth:`validate_run_request` at the start of

Review Comment:
   "Implementations must call `validate_run_request` at the start of 
`create_agent`" is an unenforced convention. `PydanticAIHook` does call it, but 
nothing makes a new backend do so -- one that forgets typechecks, instantiates, 
and runs, silently ignoring `supports_toolsets=False` / 
`supports_usage_limits=False` and either crashing deep in the framework or 
dropping the toolsets and running a tool-less agent. This is the classic 
template-method case: make the public `create_agent` on the base call 
`validate_run_request` then delegate to an abstract `_build_agent` hook, so a 
subclass can't skip validation because it no longer owns the entry point.



##########
providers/common/ai/src/airflow/providers/common/ai/hooks/base.py:
##########
@@ -0,0 +1,427 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Shared contract for agent-framework hooks used by 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`."""
+
+from __future__ import annotations
+
+import functools
+import json
+import time
+from abc import ABCMeta, abstractmethod
+from collections.abc import Callable, Sequence
+from dataclasses import dataclass, field
+from typing import Any, ClassVar
+
+from airflow.providers.common.ai.utils.function_schema import 
callable_to_tool_spec
+from airflow.providers.common.compat.sdk import BaseHook
+
+# Attribute name for durable storage/counter bound to a framework agent 
instance.
+_AIRFLOW_DURABLE_ATTR = "_airflow_durable_state"
+
+
+@dataclass
+class AgentUsage:
+    """Token and request usage from an agent run, when the backend exposes 
it."""
+
+    requests: int = 0

Review Comment:
   All five `AgentUsage` fields default to `0`, so a backend that doesn't 
report, say, `tool_calls` produces `tool_calls=0`, indistinguishable from a 
genuine zero. pydantic-ai populates all five so it's fine today, but a 
partial-reporting backend would silently report `input_tokens=0` as if 
measured, and `log_run_summary` can't tell the difference. Defaulting these to 
`None` (`int | None = None`) makes "not reported" distinct from "zero" and 
costs the current backend nothing.



##########
providers/common/ai/src/airflow/providers/common/ai/hooks/base.py:
##########
@@ -0,0 +1,427 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Shared contract for agent-framework hooks used by 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`."""
+
+from __future__ import annotations
+
+import functools
+import json
+import time
+from abc import ABCMeta, abstractmethod
+from collections.abc import Callable, Sequence
+from dataclasses import dataclass, field
+from typing import Any, ClassVar
+
+from airflow.providers.common.ai.utils.function_schema import 
callable_to_tool_spec
+from airflow.providers.common.compat.sdk import BaseHook
+
+# Attribute name for durable storage/counter bound to a framework agent 
instance.
+_AIRFLOW_DURABLE_ATTR = "_airflow_durable_state"
+
+
+@dataclass
+class AgentUsage:
+    """Token and request usage from an agent run, when the backend exposes 
it."""
+
+    requests: int = 0
+    tool_calls: int = 0
+    input_tokens: int = 0
+    output_tokens: int = 0
+    total_tokens: int = 0
+
+
+@dataclass
+class DurableStats:
+    """Step-level cache statistics from a durable agent run."""
+
+    replayed_model: int = 0
+    replayed_tool: int = 0
+    cached_model: int = 0
+    cached_tool: int = 0
+
+
+@dataclass
+class AgentRunResult:
+    """
+    Backend-neutral result from :meth:`BaseAIHook.run_agent`.
+
+    :param output: Final agent output (``str``, Pydantic model instance, etc.).
+    :param message_history: Opaque conversation state for HITL regeneration; 
only pass back to the
+        same hook implementation that produced it.
+    :param model_name: Resolved model identifier, when available.
+    :param usage: Usage counters when the backend exposes them.
+    :param tool_names: Ordered tool names invoked during the run, when known.
+    :param durable_stats: Durable step-cache statistics, populated when 
durable execution is enabled.
+    """
+
+    output: Any
+    message_history: Any = None
+    model_name: str | None = None
+    usage: AgentUsage | None = None
+    tool_names: list[str] | None = None
+    durable_stats: DurableStats | None = None
+
+
+@dataclass
+class ToolSpec:
+    """
+    Framework-neutral tool descriptor.
+
+    Toolsets produce :class:`ToolSpec` objects; each hook converts them to its
+    native tool representation via :meth:`BaseAIHook._tool_spec_to_native`.
+
+    :param name: Tool name exposed to the LLM.
+    :param description: Human-readable description used by the LLM to decide 
when to call this tool.
+    :param parameters: JSON Schema ``object`` describing the tool's parameters.
+    :param fn: Callable that implements the tool. Must accept keyword 
arguments matching *parameters*.
+    :param sequential: When ``True``, the backend must not invoke this tool 
concurrently with others
+        in the same turn (for example when tools share a non-thread-safe 
connection).
+    """
+
+    name: str
+    description: str
+    parameters: dict[str, Any]
+    fn: Callable[..., Any]
+    sequential: bool = False
+
+
+@dataclass
+class DurableContext:
+    """Framework-neutral identity of the running task, used to locate the 
durable cache file."""
+
+    dag_id: str
+    task_id: str
+    run_id: str
+    map_index: int = -1
+
+
+@dataclass
+class AgentRunRequest:
+    """
+    Parameter object passed to :meth:`BaseAIHook.create_agent` and 
:meth:`BaseAIHook.run_agent`.
+
+    Encapsulates everything the hook needs to build and run an agent in a 
single
+    framework-neutral structure, so that 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`
+    has zero framework-specific imports.
+
+    :param prompt: User prompt for this invocation (plain ``str`` or a 
multimodal
+        ``Sequence`` accepted by the backend agent's run API).
+    :param output_type: Expected structured output type (default: ``str``).
+    :param instructions: System-level instructions for the agent.
+    :param toolsets: List of :class:`BaseToolset` instances the agent may call.
+    :param usage_limits: Backend-specific usage limits; ignored if the hook 
does not support them.
+    :param message_history: Prior conversation state from a previous 
:class:`AgentRunResult`.
+    :param enable_tool_logging: When ``True`` (default), wraps 
Airflow-resolved tool callables with
+        a logging shim. Backend-native tool objects may be passed through 
unchanged by the concrete
+        hook and might not receive this wrapper.
+    :param durable_context: When set, enables step-level durable caching for 
the run.
+    :param agent_params: Extra keyword arguments forwarded to the underlying 
agent constructor.
+        Use this escape hatch for framework-specific options.
+    """
+
+    prompt: str | Sequence[Any]
+    output_type: type[Any] = str
+    instructions: str = ""
+    toolsets: list[Any] | None = None
+    usage_limits: Any = None
+    message_history: Any = None
+    enable_tool_logging: bool = True
+    durable_context: DurableContext | None = None
+    agent_params: dict[str, Any] = field(default_factory=dict)
+
+
+class BaseToolset(metaclass=ABCMeta):
+    """
+    Abstract base for framework-agnostic toolsets.
+
+    Subclasses implement :meth:`as_tools` to return a list of :class:`ToolSpec`
+    objects.  Each hook converts those specs to its native tool representation
+    via :meth:`BaseAIHook._tool_spec_to_native`.
+    """
+
+    @abstractmethod
+    def as_tools(self) -> list[ToolSpec]:
+        """Return the list of tools this toolset exposes."""
+
+
+class BaseAIHook(BaseHook, metaclass=ABCMeta):
+    """
+    Abstract hook for multi-turn LLM agents.
+
+    :class:`~airflow.providers.common.ai.operators.agent.AgentOperator` 
resolves the concrete hook
+    from the Airflow connection ``conn_type`` (for example ``pydanticai`` or 
``pydanticai-bedrock``).
+
+    :param llm_conn_id: Optional connection ID override (subclasses may apply 
a default).
+    :param model_id: Optional model override; not all backends use this 
parameter.
+
+    Subclasses implement :meth:`get_model`, :meth:`create_agent`, 
:meth:`run_agent`, and
+    :meth:`_tool_spec_to_native`.
+
+    Shared helpers :meth:`_init_durable`, :meth:`_resolve_tools`, 
:meth:`_logged_callable`, and
+    :meth:`_cached_callable` are provided for all hooks.
+    """
+
+    conn_name_attr = "llm_conn_id"
+
+    supports_toolsets: ClassVar[bool] = False
+    supports_durable: ClassVar[bool] = False
+    supports_usage_limits: ClassVar[bool] = False
+
+    def __init__(
+        self,
+        llm_conn_id: str | None = None,
+        model_id: str | None = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.llm_conn_id = llm_conn_id
+        self.model_id = model_id
+
+    @classmethod
+    def get_agent_hook(cls, conn_id: str, *, hook_params: dict[str, Any] | 
None = None) -> BaseAIHook:
+        """
+        Return an agent hook for *conn_id*, verifying it implements this 
contract.
+
+        Uses the connection's ``conn_type`` to select the hook class 
registered in
+        ``provider.yaml``.
+        """
+        hook = cls.get_hook(conn_id, hook_params=hook_params)
+        if not isinstance(hook, BaseAIHook):
+            raise TypeError(
+                f"Connection {conn_id!r} resolved to {type(hook).__name__}, 
which is not a BaseAIHook. "
+                "Use a connection type registered for agent frameworks (e.g. 
pydanticai, pydanticai-bedrock)."
+            )
+        return hook
+
+    @abstractmethod
+    def get_model(self) -> Any:
+        """Return the backend model/client used to construct agents."""
+
+    def get_conn(self) -> Any:
+        """Return the backend model/client. Delegates to :meth:`get_model`."""
+        return self.get_model()
+
+    @abstractmethod
+    def create_agent(self, request: AgentRunRequest) -> Any:
+        """
+        Build (but do not run) the agent described by *request*.
+
+        Responsible for resolving :attr:`AgentRunRequest.toolsets` via
+        :meth:`_resolve_tools` and constructing the framework-native agent 
object
+        with the model, tools, instructions, and output type from *request*.
+
+        When :attr:`AgentRunRequest.durable_context` is set, implementations
+        should call :meth:`_init_durable` and bind the returned storage/counter
+        to the agent via :meth:`_bind_agent_durable` so that :meth:`run_agent`
+        can retrieve and clean them up.
+
+        Implementations must call :meth:`validate_run_request` at the start of
+        this method before any agent construction or durable initialisation.
+
+        :param request: All parameters needed to configure the agent.
+        :returns: Framework-native agent object, ready to be passed to 
:meth:`run_agent`.
+        """
+
+    @abstractmethod
+    def run_agent(self, agent: Any, request: AgentRunRequest) -> 
AgentRunResult:
+        """
+        Execute *agent* for *request* and return a normalized 
:class:`AgentRunResult`.
+
+        Implementations with durable execution should pop durable state via
+        :meth:`_pop_agent_durable`, apply it during the run, and call
+        ``storage.cleanup()`` only after a successful run (keep the cache file
+        when the run raises so Airflow retries can replay cached steps).
+
+        :param agent: Framework-native agent produced by :meth:`create_agent`.
+        :param request: The same request used to create the agent (prompt, 
usage
+            limits, message history, etc.).
+        """
+
+    @abstractmethod
+    def _tool_spec_to_native(self, spec: ToolSpec) -> Any:
+        """
+        Convert a :class:`ToolSpec` to the agent framework's native tool 
representation.
+
+        Called once per tool inside :meth:`_resolve_tools`. The returned object
+        is collected into a list and passed to the underlying agent 
constructor.
+
+        :param spec: Universal tool descriptor, with the callable already 
wrapped
+            by any enabled logging / caching shims.
+        """
+
+    def validate_run_request(self, request: AgentRunRequest) -> None:
+        """
+        Raise if *request* uses features this hook implementation does not 
support.
+
+        Hook implementations call this at the start of :meth:`create_agent`.
+        """
+        hook_name = type(self).__name__
+        conn_id = self.llm_conn_id or "unknown"
+        if request.toolsets and not self.supports_toolsets:
+            raise ValueError(
+                f"toolsets are not supported for connection {conn_id!r} 
(conn_type resolves to {hook_name}). "
+            )
+        if request.usage_limits is not None and not self.supports_usage_limits:
+            raise ValueError(
+                f"usage_limits are not supported for connection {conn_id!r} "
+                f"(conn_type resolves to {hook_name})."
+            )
+        if request.durable_context is not None and not self.supports_durable:
+            raise ValueError(
+                f"durable execution requires a hook that supports durable 
caching; "
+                f"got {hook_name} for connection {conn_id!r}."
+            )
+
+    def _init_durable(self, ctx: DurableContext) -> tuple[Any, Any]:
+        """
+        Create and return a ``DurableStorage`` / ``DurableStepCounter`` pair 
for *ctx*.
+
+        Hooks call this inside :meth:`create_agent` when
+        :attr:`AgentRunRequest.durable_context` is set.
+        """
+        from airflow.providers.common.ai.durable.step_counter import 
DurableStepCounter
+        from airflow.providers.common.ai.durable.storage import DurableStorage
+
+        storage = DurableStorage(
+            dag_id=ctx.dag_id,
+            task_id=ctx.task_id,
+            run_id=ctx.run_id,
+            map_index=ctx.map_index,
+        )
+        counter = DurableStepCounter()
+        return storage, counter
+
+    @staticmethod
+    def _bind_agent_durable(agent: Any, storage: Any, counter: Any) -> None:
+        """Associate *storage* and *counter* with *agent* until 
:meth:`run_agent` completes."""
+        setattr(agent, _AIRFLOW_DURABLE_ATTR, (storage, counter))

Review Comment:
   The two-phase split hands durable state from `create_agent` to `run_agent` 
by `setattr`-ing a private attribute onto the framework agent object (popped at 
L316-319). That makes the real contract "call `run_agent` with the exact object 
`create_agent` returned": if a caller ever passes a reused or different agent, 
`_pop_agent_durable` returns `None` and the run silently executes non-durably 
with no error. A backend whose agent is frozen / `__slots__`-based can't use 
`_bind_agent_durable` at all (`setattr` raises). The split also doesn't map 
onto CrewAI, which needs the prompt (carried on `request`) to build the crew, 
so its `create_agent` would be an empty shell and all work would land in 
`run_agent`. Two options worth considering: collapse to a single `run(request) 
-> AgentRunResult` (the only cross-call reuse today is the durable path, which 
is pydantic-ai-only anyway), or make the base `Generic[AgentT]` so the agent 
handle is typed -- that turns "wrong agent passed to run_agent" 
 into a type error -- and return the durable state explicitly instead of 
through a mutated attribute.



##########
providers/common/ai/src/airflow/providers/common/ai/hooks/base.py:
##########
@@ -0,0 +1,427 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Shared contract for agent-framework hooks used by 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`."""
+
+from __future__ import annotations
+
+import functools
+import json
+import time
+from abc import ABCMeta, abstractmethod
+from collections.abc import Callable, Sequence
+from dataclasses import dataclass, field
+from typing import Any, ClassVar
+
+from airflow.providers.common.ai.utils.function_schema import 
callable_to_tool_spec
+from airflow.providers.common.compat.sdk import BaseHook
+
+# Attribute name for durable storage/counter bound to a framework agent 
instance.
+_AIRFLOW_DURABLE_ATTR = "_airflow_durable_state"
+
+
+@dataclass
+class AgentUsage:
+    """Token and request usage from an agent run, when the backend exposes 
it."""
+
+    requests: int = 0
+    tool_calls: int = 0
+    input_tokens: int = 0
+    output_tokens: int = 0
+    total_tokens: int = 0
+
+
+@dataclass
+class DurableStats:
+    """Step-level cache statistics from a durable agent run."""
+
+    replayed_model: int = 0
+    replayed_tool: int = 0
+    cached_model: int = 0
+    cached_tool: int = 0
+
+
+@dataclass
+class AgentRunResult:
+    """
+    Backend-neutral result from :meth:`BaseAIHook.run_agent`.
+
+    :param output: Final agent output (``str``, Pydantic model instance, etc.).
+    :param message_history: Opaque conversation state for HITL regeneration; 
only pass back to the
+        same hook implementation that produced it.
+    :param model_name: Resolved model identifier, when available.
+    :param usage: Usage counters when the backend exposes them.
+    :param tool_names: Ordered tool names invoked during the run, when known.
+    :param durable_stats: Durable step-cache statistics, populated when 
durable execution is enabled.
+    """
+
+    output: Any
+    message_history: Any = None
+    model_name: str | None = None
+    usage: AgentUsage | None = None
+    tool_names: list[str] | None = None
+    durable_stats: DurableStats | None = None
+
+
+@dataclass
+class ToolSpec:
+    """
+    Framework-neutral tool descriptor.
+
+    Toolsets produce :class:`ToolSpec` objects; each hook converts them to its
+    native tool representation via :meth:`BaseAIHook._tool_spec_to_native`.
+
+    :param name: Tool name exposed to the LLM.
+    :param description: Human-readable description used by the LLM to decide 
when to call this tool.
+    :param parameters: JSON Schema ``object`` describing the tool's parameters.
+    :param fn: Callable that implements the tool. Must accept keyword 
arguments matching *parameters*.
+    :param sequential: When ``True``, the backend must not invoke this tool 
concurrently with others
+        in the same turn (for example when tools share a non-thread-safe 
connection).
+    """
+
+    name: str
+    description: str
+    parameters: dict[str, Any]
+    fn: Callable[..., Any]
+    sequential: bool = False
+
+
+@dataclass
+class DurableContext:
+    """Framework-neutral identity of the running task, used to locate the 
durable cache file."""
+
+    dag_id: str
+    task_id: str
+    run_id: str
+    map_index: int = -1
+
+
+@dataclass
+class AgentRunRequest:
+    """
+    Parameter object passed to :meth:`BaseAIHook.create_agent` and 
:meth:`BaseAIHook.run_agent`.
+
+    Encapsulates everything the hook needs to build and run an agent in a 
single
+    framework-neutral structure, so that 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`
+    has zero framework-specific imports.
+
+    :param prompt: User prompt for this invocation (plain ``str`` or a 
multimodal
+        ``Sequence`` accepted by the backend agent's run API).
+    :param output_type: Expected structured output type (default: ``str``).
+    :param instructions: System-level instructions for the agent.
+    :param toolsets: List of :class:`BaseToolset` instances the agent may call.
+    :param usage_limits: Backend-specific usage limits; ignored if the hook 
does not support them.
+    :param message_history: Prior conversation state from a previous 
:class:`AgentRunResult`.
+    :param enable_tool_logging: When ``True`` (default), wraps 
Airflow-resolved tool callables with
+        a logging shim. Backend-native tool objects may be passed through 
unchanged by the concrete
+        hook and might not receive this wrapper.
+    :param durable_context: When set, enables step-level durable caching for 
the run.
+    :param agent_params: Extra keyword arguments forwarded to the underlying 
agent constructor.
+        Use this escape hatch for framework-specific options.
+    """
+
+    prompt: str | Sequence[Any]
+    output_type: type[Any] = str

Review Comment:
   `output_type: type[Any]` is passed straight to `Agent(output_type=...)` in 
`pydantic_ai.py`, which is pydantic-ai's one-agent-one-output-type model. It 
doesn't map onto two of the target frameworks: CrewAI types output per `Task` 
(`output_pydantic` is a Task kwarg), so a single `request.output_type` can't 
express a multi-task crew's outputs; LangChain supports a single schema but 
also accepts a raw JSON Schema / TypedDict via `with_structured_output`, which 
a bare Python `type` rejects. Either document this as single-output-only behind 
a capability flag, or widen to `type | dict[str, Any] | None` so 
JSON-schema-based backends fit.



##########
providers/common/ai/src/airflow/providers/common/ai/hooks/base.py:
##########
@@ -0,0 +1,427 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Shared contract for agent-framework hooks used by 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`."""
+
+from __future__ import annotations
+
+import functools
+import json
+import time
+from abc import ABCMeta, abstractmethod
+from collections.abc import Callable, Sequence
+from dataclasses import dataclass, field
+from typing import Any, ClassVar
+
+from airflow.providers.common.ai.utils.function_schema import 
callable_to_tool_spec
+from airflow.providers.common.compat.sdk import BaseHook
+
+# Attribute name for durable storage/counter bound to a framework agent 
instance.
+_AIRFLOW_DURABLE_ATTR = "_airflow_durable_state"
+
+
+@dataclass
+class AgentUsage:
+    """Token and request usage from an agent run, when the backend exposes 
it."""
+
+    requests: int = 0
+    tool_calls: int = 0
+    input_tokens: int = 0
+    output_tokens: int = 0
+    total_tokens: int = 0
+
+
+@dataclass
+class DurableStats:
+    """Step-level cache statistics from a durable agent run."""
+
+    replayed_model: int = 0
+    replayed_tool: int = 0
+    cached_model: int = 0
+    cached_tool: int = 0
+
+
+@dataclass
+class AgentRunResult:
+    """
+    Backend-neutral result from :meth:`BaseAIHook.run_agent`.
+
+    :param output: Final agent output (``str``, Pydantic model instance, etc.).
+    :param message_history: Opaque conversation state for HITL regeneration; 
only pass back to the
+        same hook implementation that produced it.
+    :param model_name: Resolved model identifier, when available.
+    :param usage: Usage counters when the backend exposes them.
+    :param tool_names: Ordered tool names invoked during the run, when known.
+    :param durable_stats: Durable step-cache statistics, populated when 
durable execution is enabled.
+    """
+
+    output: Any
+    message_history: Any = None
+    model_name: str | None = None
+    usage: AgentUsage | None = None
+    tool_names: list[str] | None = None
+    durable_stats: DurableStats | None = None
+
+
+@dataclass
+class ToolSpec:
+    """
+    Framework-neutral tool descriptor.
+
+    Toolsets produce :class:`ToolSpec` objects; each hook converts them to its
+    native tool representation via :meth:`BaseAIHook._tool_spec_to_native`.
+
+    :param name: Tool name exposed to the LLM.
+    :param description: Human-readable description used by the LLM to decide 
when to call this tool.
+    :param parameters: JSON Schema ``object`` describing the tool's parameters.
+    :param fn: Callable that implements the tool. Must accept keyword 
arguments matching *parameters*.
+    :param sequential: When ``True``, the backend must not invoke this tool 
concurrently with others
+        in the same turn (for example when tools share a non-thread-safe 
connection).
+    """
+
+    name: str
+    description: str
+    parameters: dict[str, Any]
+    fn: Callable[..., Any]
+    sequential: bool = False
+
+
+@dataclass
+class DurableContext:
+    """Framework-neutral identity of the running task, used to locate the 
durable cache file."""
+
+    dag_id: str
+    task_id: str
+    run_id: str
+    map_index: int = -1
+
+
+@dataclass
+class AgentRunRequest:
+    """
+    Parameter object passed to :meth:`BaseAIHook.create_agent` and 
:meth:`BaseAIHook.run_agent`.
+
+    Encapsulates everything the hook needs to build and run an agent in a 
single
+    framework-neutral structure, so that 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`
+    has zero framework-specific imports.
+
+    :param prompt: User prompt for this invocation (plain ``str`` or a 
multimodal
+        ``Sequence`` accepted by the backend agent's run API).
+    :param output_type: Expected structured output type (default: ``str``).
+    :param instructions: System-level instructions for the agent.
+    :param toolsets: List of :class:`BaseToolset` instances the agent may call.
+    :param usage_limits: Backend-specific usage limits; ignored if the hook 
does not support them.
+    :param message_history: Prior conversation state from a previous 
:class:`AgentRunResult`.
+    :param enable_tool_logging: When ``True`` (default), wraps 
Airflow-resolved tool callables with
+        a logging shim. Backend-native tool objects may be passed through 
unchanged by the concrete
+        hook and might not receive this wrapper.
+    :param durable_context: When set, enables step-level durable caching for 
the run.
+    :param agent_params: Extra keyword arguments forwarded to the underlying 
agent constructor.
+        Use this escape hatch for framework-specific options.
+    """
+
+    prompt: str | Sequence[Any]
+    output_type: type[Any] = str
+    instructions: str = ""
+    toolsets: list[Any] | None = None
+    usage_limits: Any = None
+    message_history: Any = None
+    enable_tool_logging: bool = True
+    durable_context: DurableContext | None = None
+    agent_params: dict[str, Any] = field(default_factory=dict)
+
+
+class BaseToolset(metaclass=ABCMeta):
+    """
+    Abstract base for framework-agnostic toolsets.
+
+    Subclasses implement :meth:`as_tools` to return a list of :class:`ToolSpec`
+    objects.  Each hook converts those specs to its native tool representation
+    via :meth:`BaseAIHook._tool_spec_to_native`.
+    """
+
+    @abstractmethod
+    def as_tools(self) -> list[ToolSpec]:
+        """Return the list of tools this toolset exposes."""
+
+
+class BaseAIHook(BaseHook, metaclass=ABCMeta):
+    """
+    Abstract hook for multi-turn LLM agents.
+
+    :class:`~airflow.providers.common.ai.operators.agent.AgentOperator` 
resolves the concrete hook
+    from the Airflow connection ``conn_type`` (for example ``pydanticai`` or 
``pydanticai-bedrock``).
+
+    :param llm_conn_id: Optional connection ID override (subclasses may apply 
a default).
+    :param model_id: Optional model override; not all backends use this 
parameter.
+
+    Subclasses implement :meth:`get_model`, :meth:`create_agent`, 
:meth:`run_agent`, and
+    :meth:`_tool_spec_to_native`.
+
+    Shared helpers :meth:`_init_durable`, :meth:`_resolve_tools`, 
:meth:`_logged_callable`, and
+    :meth:`_cached_callable` are provided for all hooks.
+    """
+
+    conn_name_attr = "llm_conn_id"
+
+    supports_toolsets: ClassVar[bool] = False
+    supports_durable: ClassVar[bool] = False

Review Comment:
   `supports_durable` (and `_init_durable` below at L289) sit on the base as a 
first-class capability, but the actual replay mechanism in `pydantic_ai.py` is 
`with agent.override(model=CachingModel(...))` plus a process-wide step 
counter. That depends on three pydantic-ai-only facts: an 
`agent.override(model=)` context manager, a single swappable `Model` object on 
the agent, and discrete ordered interceptable steps a monotonic counter can key 
on. LangChain (LangGraph), LlamaIndex (async event workflow), and CrewAI 
(`Crew.kickoff()` over a task graph) expose no swap-the-model-and-count-steps 
seam, so a second backend can't satisfy this contract. Putting it in the base 
signals to the next implementer that durable is something they must implement. 
Consider moving `supports_durable` / `_init_durable` / `_bind_agent_durable` / 
`_pop_agent_durable` / `_cached_callable` into a `DurableAgentMixin` that 
`PydanticAIHook` opts into, and keeping the base to the genuinely portable 
surface (create
  / run / tool conversion).



##########
providers/common/ai/src/airflow/providers/common/ai/hooks/base.py:
##########
@@ -0,0 +1,427 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Shared contract for agent-framework hooks used by 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`."""
+
+from __future__ import annotations
+
+import functools
+import json
+import time
+from abc import ABCMeta, abstractmethod
+from collections.abc import Callable, Sequence
+from dataclasses import dataclass, field
+from typing import Any, ClassVar
+
+from airflow.providers.common.ai.utils.function_schema import 
callable_to_tool_spec
+from airflow.providers.common.compat.sdk import BaseHook
+
+# Attribute name for durable storage/counter bound to a framework agent 
instance.
+_AIRFLOW_DURABLE_ATTR = "_airflow_durable_state"
+
+
+@dataclass
+class AgentUsage:
+    """Token and request usage from an agent run, when the backend exposes 
it."""
+
+    requests: int = 0
+    tool_calls: int = 0
+    input_tokens: int = 0
+    output_tokens: int = 0
+    total_tokens: int = 0
+
+
+@dataclass
+class DurableStats:
+    """Step-level cache statistics from a durable agent run."""
+
+    replayed_model: int = 0
+    replayed_tool: int = 0
+    cached_model: int = 0
+    cached_tool: int = 0
+
+
+@dataclass
+class AgentRunResult:
+    """
+    Backend-neutral result from :meth:`BaseAIHook.run_agent`.
+
+    :param output: Final agent output (``str``, Pydantic model instance, etc.).
+    :param message_history: Opaque conversation state for HITL regeneration; 
only pass back to the
+        same hook implementation that produced it.
+    :param model_name: Resolved model identifier, when available.
+    :param usage: Usage counters when the backend exposes them.
+    :param tool_names: Ordered tool names invoked during the run, when known.
+    :param durable_stats: Durable step-cache statistics, populated when 
durable execution is enabled.
+    """
+
+    output: Any
+    message_history: Any = None
+    model_name: str | None = None
+    usage: AgentUsage | None = None
+    tool_names: list[str] | None = None
+    durable_stats: DurableStats | None = None
+
+
+@dataclass
+class ToolSpec:
+    """
+    Framework-neutral tool descriptor.
+
+    Toolsets produce :class:`ToolSpec` objects; each hook converts them to its
+    native tool representation via :meth:`BaseAIHook._tool_spec_to_native`.
+
+    :param name: Tool name exposed to the LLM.
+    :param description: Human-readable description used by the LLM to decide 
when to call this tool.
+    :param parameters: JSON Schema ``object`` describing the tool's parameters.
+    :param fn: Callable that implements the tool. Must accept keyword 
arguments matching *parameters*.
+    :param sequential: When ``True``, the backend must not invoke this tool 
concurrently with others
+        in the same turn (for example when tools share a non-thread-safe 
connection).
+    """
+
+    name: str
+    description: str
+    parameters: dict[str, Any]
+    fn: Callable[..., Any]
+    sequential: bool = False
+
+
+@dataclass
+class DurableContext:
+    """Framework-neutral identity of the running task, used to locate the 
durable cache file."""
+
+    dag_id: str
+    task_id: str
+    run_id: str
+    map_index: int = -1
+
+
+@dataclass
+class AgentRunRequest:
+    """
+    Parameter object passed to :meth:`BaseAIHook.create_agent` and 
:meth:`BaseAIHook.run_agent`.
+
+    Encapsulates everything the hook needs to build and run an agent in a 
single
+    framework-neutral structure, so that 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`
+    has zero framework-specific imports.
+
+    :param prompt: User prompt for this invocation (plain ``str`` or a 
multimodal
+        ``Sequence`` accepted by the backend agent's run API).
+    :param output_type: Expected structured output type (default: ``str``).
+    :param instructions: System-level instructions for the agent.
+    :param toolsets: List of :class:`BaseToolset` instances the agent may call.
+    :param usage_limits: Backend-specific usage limits; ignored if the hook 
does not support them.
+    :param message_history: Prior conversation state from a previous 
:class:`AgentRunResult`.
+    :param enable_tool_logging: When ``True`` (default), wraps 
Airflow-resolved tool callables with
+        a logging shim. Backend-native tool objects may be passed through 
unchanged by the concrete
+        hook and might not receive this wrapper.
+    :param durable_context: When set, enables step-level durable caching for 
the run.
+    :param agent_params: Extra keyword arguments forwarded to the underlying 
agent constructor.
+        Use this escape hatch for framework-specific options.
+    """
+
+    prompt: str | Sequence[Any]
+    output_type: type[Any] = str
+    instructions: str = ""
+    toolsets: list[Any] | None = None
+    usage_limits: Any = None
+    message_history: Any = None
+    enable_tool_logging: bool = True
+    durable_context: DurableContext | None = None
+    agent_params: dict[str, Any] = field(default_factory=dict)
+
+
+class BaseToolset(metaclass=ABCMeta):
+    """
+    Abstract base for framework-agnostic toolsets.
+
+    Subclasses implement :meth:`as_tools` to return a list of :class:`ToolSpec`
+    objects.  Each hook converts those specs to its native tool representation
+    via :meth:`BaseAIHook._tool_spec_to_native`.
+    """
+
+    @abstractmethod
+    def as_tools(self) -> list[ToolSpec]:
+        """Return the list of tools this toolset exposes."""
+
+
+class BaseAIHook(BaseHook, metaclass=ABCMeta):

Review Comment:
   Worth being explicit that this "framework-neutral" agent contract currently 
has exactly one implementation family (`PydanticAIHook` + the Azure/Bedrock 
subclasses). The provider already ships `LangChainHook` / `LlamaIndexHook` / 
`MCPHook`, but they extend plain `BaseHook` and don't implement this contract, 
and `MCPHook.get_conn()` returns `pydantic_ai.mcp.*` objects directly -- so 
even the tool-server layer is pydantic-ai-bound today. An abstraction validated 
against a single backend tends to bake in that backend's shape (the durable 
mechanism, the create/run split, and `output_type` above are where it shows). 
It would be worth writing one thin second backend -- even a non-durable, 
no-cache `LangChainAgentHook` over `AgentExecutor.invoke` -- to let a real 
second implementation drive what belongs in the base vs a pydantic-ai mixin, or 
scoping this base as experimental until a second backend lands.



##########
providers/common/ai/src/airflow/providers/common/ai/hooks/base.py:
##########
@@ -0,0 +1,427 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Shared contract for agent-framework hooks used by 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`."""
+
+from __future__ import annotations
+
+import functools
+import json
+import time
+from abc import ABCMeta, abstractmethod
+from collections.abc import Callable, Sequence
+from dataclasses import dataclass, field
+from typing import Any, ClassVar
+
+from airflow.providers.common.ai.utils.function_schema import 
callable_to_tool_spec
+from airflow.providers.common.compat.sdk import BaseHook
+
+# Attribute name for durable storage/counter bound to a framework agent 
instance.
+_AIRFLOW_DURABLE_ATTR = "_airflow_durable_state"
+
+
+@dataclass
+class AgentUsage:
+    """Token and request usage from an agent run, when the backend exposes 
it."""
+
+    requests: int = 0
+    tool_calls: int = 0
+    input_tokens: int = 0
+    output_tokens: int = 0
+    total_tokens: int = 0
+
+
+@dataclass
+class DurableStats:
+    """Step-level cache statistics from a durable agent run."""
+
+    replayed_model: int = 0
+    replayed_tool: int = 0
+    cached_model: int = 0
+    cached_tool: int = 0
+
+
+@dataclass
+class AgentRunResult:
+    """
+    Backend-neutral result from :meth:`BaseAIHook.run_agent`.
+
+    :param output: Final agent output (``str``, Pydantic model instance, etc.).
+    :param message_history: Opaque conversation state for HITL regeneration; 
only pass back to the
+        same hook implementation that produced it.
+    :param model_name: Resolved model identifier, when available.
+    :param usage: Usage counters when the backend exposes them.
+    :param tool_names: Ordered tool names invoked during the run, when known.
+    :param durable_stats: Durable step-cache statistics, populated when 
durable execution is enabled.
+    """
+
+    output: Any
+    message_history: Any = None
+    model_name: str | None = None
+    usage: AgentUsage | None = None
+    tool_names: list[str] | None = None
+    durable_stats: DurableStats | None = None
+
+
+@dataclass
+class ToolSpec:
+    """
+    Framework-neutral tool descriptor.
+
+    Toolsets produce :class:`ToolSpec` objects; each hook converts them to its
+    native tool representation via :meth:`BaseAIHook._tool_spec_to_native`.
+
+    :param name: Tool name exposed to the LLM.
+    :param description: Human-readable description used by the LLM to decide 
when to call this tool.
+    :param parameters: JSON Schema ``object`` describing the tool's parameters.
+    :param fn: Callable that implements the tool. Must accept keyword 
arguments matching *parameters*.
+    :param sequential: When ``True``, the backend must not invoke this tool 
concurrently with others
+        in the same turn (for example when tools share a non-thread-safe 
connection).
+    """
+
+    name: str
+    description: str
+    parameters: dict[str, Any]
+    fn: Callable[..., Any]
+    sequential: bool = False
+
+
+@dataclass
+class DurableContext:
+    """Framework-neutral identity of the running task, used to locate the 
durable cache file."""
+
+    dag_id: str
+    task_id: str
+    run_id: str
+    map_index: int = -1
+
+
+@dataclass
+class AgentRunRequest:
+    """
+    Parameter object passed to :meth:`BaseAIHook.create_agent` and 
:meth:`BaseAIHook.run_agent`.
+
+    Encapsulates everything the hook needs to build and run an agent in a 
single
+    framework-neutral structure, so that 
:class:`~airflow.providers.common.ai.operators.agent.AgentOperator`
+    has zero framework-specific imports.
+
+    :param prompt: User prompt for this invocation (plain ``str`` or a 
multimodal
+        ``Sequence`` accepted by the backend agent's run API).
+    :param output_type: Expected structured output type (default: ``str``).
+    :param instructions: System-level instructions for the agent.
+    :param toolsets: List of :class:`BaseToolset` instances the agent may call.
+    :param usage_limits: Backend-specific usage limits; ignored if the hook 
does not support them.
+    :param message_history: Prior conversation state from a previous 
:class:`AgentRunResult`.
+    :param enable_tool_logging: When ``True`` (default), wraps 
Airflow-resolved tool callables with
+        a logging shim. Backend-native tool objects may be passed through 
unchanged by the concrete
+        hook and might not receive this wrapper.
+    :param durable_context: When set, enables step-level durable caching for 
the run.
+    :param agent_params: Extra keyword arguments forwarded to the underlying 
agent constructor.
+        Use this escape hatch for framework-specific options.
+    """
+
+    prompt: str | Sequence[Any]
+    output_type: type[Any] = str
+    instructions: str = ""
+    toolsets: list[Any] | None = None
+    usage_limits: Any = None
+    message_history: Any = None
+    enable_tool_logging: bool = True
+    durable_context: DurableContext | None = None
+    agent_params: dict[str, Any] = field(default_factory=dict)
+
+
+class BaseToolset(metaclass=ABCMeta):
+    """
+    Abstract base for framework-agnostic toolsets.
+
+    Subclasses implement :meth:`as_tools` to return a list of :class:`ToolSpec`
+    objects.  Each hook converts those specs to its native tool representation
+    via :meth:`BaseAIHook._tool_spec_to_native`.
+    """
+
+    @abstractmethod
+    def as_tools(self) -> list[ToolSpec]:
+        """Return the list of tools this toolset exposes."""
+
+
+class BaseAIHook(BaseHook, metaclass=ABCMeta):
+    """
+    Abstract hook for multi-turn LLM agents.
+
+    :class:`~airflow.providers.common.ai.operators.agent.AgentOperator` 
resolves the concrete hook
+    from the Airflow connection ``conn_type`` (for example ``pydanticai`` or 
``pydanticai-bedrock``).
+
+    :param llm_conn_id: Optional connection ID override (subclasses may apply 
a default).
+    :param model_id: Optional model override; not all backends use this 
parameter.
+
+    Subclasses implement :meth:`get_model`, :meth:`create_agent`, 
:meth:`run_agent`, and
+    :meth:`_tool_spec_to_native`.
+
+    Shared helpers :meth:`_init_durable`, :meth:`_resolve_tools`, 
:meth:`_logged_callable`, and
+    :meth:`_cached_callable` are provided for all hooks.
+    """
+
+    conn_name_attr = "llm_conn_id"
+
+    supports_toolsets: ClassVar[bool] = False
+    supports_durable: ClassVar[bool] = False
+    supports_usage_limits: ClassVar[bool] = False
+
+    def __init__(
+        self,
+        llm_conn_id: str | None = None,
+        model_id: str | None = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.llm_conn_id = llm_conn_id
+        self.model_id = model_id
+
+    @classmethod
+    def get_agent_hook(cls, conn_id: str, *, hook_params: dict[str, Any] | 
None = None) -> BaseAIHook:
+        """
+        Return an agent hook for *conn_id*, verifying it implements this 
contract.
+
+        Uses the connection's ``conn_type`` to select the hook class 
registered in
+        ``provider.yaml``.
+        """
+        hook = cls.get_hook(conn_id, hook_params=hook_params)
+        if not isinstance(hook, BaseAIHook):
+            raise TypeError(
+                f"Connection {conn_id!r} resolved to {type(hook).__name__}, 
which is not a BaseAIHook. "
+                "Use a connection type registered for agent frameworks (e.g. 
pydanticai, pydanticai-bedrock)."
+            )
+        return hook
+
+    @abstractmethod
+    def get_model(self) -> Any:
+        """Return the backend model/client used to construct agents."""
+
+    def get_conn(self) -> Any:

Review Comment:
   `get_conn()` is a pure pass-through to `get_model()`, which adds a second 
name for one concept and a second method a backend author wonders whether to 
override. If it only exists to satisfy `BaseHook`'s required `get_conn`, a 
one-line comment saying so would help; otherwise pick one name (the rest of the 
provider's hooks use `get_conn` as the primary method).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to