amoghrajesh commented on code in PR #68926: URL: https://github.com/apache/airflow/pull/68926#discussion_r3464883532
########## providers/common/ai/src/airflow/providers/common/ai/durable/task_state_store.py: ########## @@ -0,0 +1,160 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Task-state-store-backed durable storage for pydantic-ai agent step caching. + +Available on Airflow >= 3.3, where the AIP-103 task state store provides a +per-task-instance key/value store that survives retries within a run and is +cleared when the run is removed. Each cached step is written under its own key +(``model_step_{N}`` / ``tool_step_{N}``); the store handles persistence and, +when ``[workers] state_store_backend`` is configured, transparently offloads +large values to external storage. No ``[common.ai] durable_cache_path`` is +needed. + +This module is imported only on Airflow >= 3.3 (see +``AgentOperator._build_durable_storage``); ``NEVER_EXPIRE`` does not exist on +older cores. Review Comment: ```suggestion older airflow versions. ``` ########## providers/common/ai/tests/unit/common/ai/operators/test_agent.py: ########## @@ -557,17 +557,44 @@ def test_durable_default_false(self): op = AgentOperator(task_id="test", prompt="test", llm_conn_id="my_llm") assert op.durable is False + @patch("airflow.providers.common.ai.operators.agent.AIRFLOW_V_3_3_PLUS", True) + def test_build_durable_storage_uses_task_state_store_on_3_3(self): + """On Airflow >= 3.3 the cache lives in the task state store -- no durable_cache_path needed.""" + from airflow.providers.common.ai.durable.task_state_store import TaskStateStoreDurableStorage + + accessor = MagicMock() + op = AgentOperator(task_id="t", prompt="p", llm_conn_id="c", durable=True) + + storage = op._build_durable_storage({"task_state_store": accessor}) + + assert isinstance(storage, TaskStateStoreDurableStorage) + assert storage._store is accessor + + @patch("airflow.providers.common.ai.operators.agent.AIRFLOW_V_3_3_PLUS", False) + def test_build_durable_storage_falls_back_to_object_storage_below_3_3(self): + """On Airflow < 3.3 the cache falls back to the ObjectStorage backend.""" + from airflow.providers.common.ai.durable.storage import DurableStorage + + ti = MagicMock(dag_id="d", task_id="t", run_id="r", map_index=-1) + op = AgentOperator(task_id="t", prompt="p", llm_conn_id="c", durable=True) + + storage = op._build_durable_storage({"task_instance": ti}) + + assert isinstance(storage, DurableStorage) + assert storage._cache_id == "d_t_r" Review Comment: Isn't it better to just run the respective tests on their versions? ie: skip `test_build_durable_storage_uses_task_state_store_on_3_3` for < 3.3? ########## providers/common/ai/tests/unit/common/ai/operators/test_agent.py: ########## @@ -557,17 +557,44 @@ def test_durable_default_false(self): op = AgentOperator(task_id="test", prompt="test", llm_conn_id="my_llm") assert op.durable is False + @patch("airflow.providers.common.ai.operators.agent.AIRFLOW_V_3_3_PLUS", True) + def test_build_durable_storage_uses_task_state_store_on_3_3(self): + """On Airflow >= 3.3 the cache lives in the task state store -- no durable_cache_path needed.""" + from airflow.providers.common.ai.durable.task_state_store import TaskStateStoreDurableStorage + + accessor = MagicMock() Review Comment: nit: add a `TaskStateStoreAccessor` spec ########## providers/common/ai/tests/unit/common/ai/durable/test_base.py: ########## Review Comment: This file verifies the protocol with two artificial backends (`_CompleteBackend`, `_PartialBackend`) but never checks that the real `DurableStorage` (the ObjectStorage backend) satisfies `DurableStorageProtocol`. Since it's @runtime_checkable, we could do that to catch any future method signature drift. ########## providers/common/ai/tests/unit/common/ai/operators/test_agent.py: ########## @@ -557,17 +557,44 @@ def test_durable_default_false(self): op = AgentOperator(task_id="test", prompt="test", llm_conn_id="my_llm") assert op.durable is False + @patch("airflow.providers.common.ai.operators.agent.AIRFLOW_V_3_3_PLUS", True) + def test_build_durable_storage_uses_task_state_store_on_3_3(self): + """On Airflow >= 3.3 the cache lives in the task state store -- no durable_cache_path needed.""" + from airflow.providers.common.ai.durable.task_state_store import TaskStateStoreDurableStorage + + accessor = MagicMock() + op = AgentOperator(task_id="t", prompt="p", llm_conn_id="c", durable=True) + + storage = op._build_durable_storage({"task_state_store": accessor}) + + assert isinstance(storage, TaskStateStoreDurableStorage) + assert storage._store is accessor + + @patch("airflow.providers.common.ai.operators.agent.AIRFLOW_V_3_3_PLUS", False) + def test_build_durable_storage_falls_back_to_object_storage_below_3_3(self): + """On Airflow < 3.3 the cache falls back to the ObjectStorage backend.""" + from airflow.providers.common.ai.durable.storage import DurableStorage + + ti = MagicMock(dag_id="d", task_id="t", run_id="r", map_index=-1) Review Comment: nit: add a spec here and elsewhere where its used like this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
