This is an automated email from the ASF dual-hosted git repository.

amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6714142dba1 Make core API routes for task and asset states only 
interact with DB (#67835)
6714142dba1 is described below

commit 6714142dba1588394af5ba6cf9858b75caa7361c
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Jun 3 09:15:10 2026 +0530

    Make core API routes for task and asset states only interact with DB 
(#67835)
---
 .../core_api/routes/public/asset_store.py          |  8 +++----
 .../core_api/routes/public/task_store.py           | 12 +++++------
 .../core_api/services/public/task_instances.py     |  4 ++--
 airflow-core/src/airflow/state/metastore.py        |  7 ++++++
 .../core_api/routes/public/test_asset_store.py     | 24 +++++++++++++++++++++
 .../core_api/routes/public/test_task_store.py      | 25 ++++++++++++++++++++++
 6 files changed, 67 insertions(+), 13 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_store.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_store.py
index e01e73f8cb0..00f02bb664e 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_store.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_store.py
@@ -35,7 +35,7 @@ from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_
 from airflow.api_fastapi.core_api.security import requires_access_asset
 from airflow.models.asset import AssetModel
 from airflow.models.asset_store import AssetStoreModel
-from airflow.state import get_state_backend
+from airflow.state.metastore import _get_db_backend
 
 asset_store_router = AirflowRouter(
     tags=["Asset Store"],
@@ -134,7 +134,7 @@ def set_asset_store(
     session: SessionDep,
 ) -> None:
     """Set an asset store value. Creates or overwrites the key."""
-    get_state_backend().set(AssetScope(asset_id=asset_id), key, 
json.dumps(body.value), session=session)
+    _get_db_backend().set(AssetScope(asset_id=asset_id), key, 
json.dumps(body.value), session=session)
 
 
 @asset_store_router.delete(
@@ -149,7 +149,7 @@ def delete_asset_store(
     session: SessionDep,
 ) -> None:
     """Delete a single asset store key. No-op if the key does not exist."""
-    get_state_backend().delete(AssetScope(asset_id=asset_id), key, 
session=session)
+    _get_db_backend().delete(AssetScope(asset_id=asset_id), key, 
session=session)
 
 
 @asset_store_router.delete(
@@ -163,4 +163,4 @@ def clear_asset_store(
     session: SessionDep,
 ) -> None:
     """Delete all store keys for an asset."""
-    get_state_backend().clear(AssetScope(asset_id=asset_id), session=session)
+    _get_db_backend().clear(AssetScope(asset_id=asset_id), session=session)
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_store.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_store.py
index 9319d654fe0..32c16241dfd 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_store.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_store.py
@@ -39,7 +39,7 @@ from airflow.api_fastapi.core_api.security import 
requires_access_dag
 from airflow.configuration import conf
 from airflow.models.task_store import TaskStoreModel
 from airflow.models.taskinstance import TaskInstance as TI
-from airflow.state import get_state_backend
+from airflow.state.metastore import _get_db_backend
 
 task_store_router = AirflowRouter(
     tags=["Task Store"],
@@ -187,7 +187,7 @@ def set_task_store(
     expires_at = _resolve_expires_at(body.expires_at)
     scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
     try:
-        get_state_backend().set(scope, key, json.dumps(body.value), 
expires_at=expires_at, session=session)
+        _get_db_backend().set(scope, key, json.dumps(body.value), 
expires_at=expires_at, session=session)
     except ValueError as e:
         raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, 
detail=str(e)) from e
 
@@ -227,9 +227,7 @@ def patch_task_store(
         )
 
     scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
-    get_state_backend().set(
-        scope, key, json.dumps(body.value), expires_at=existing.expires_at, 
session=session
-    )
+    _get_db_backend().set(scope, key, json.dumps(body.value), 
expires_at=existing.expires_at, session=session)
 
 
 @task_store_router.delete(
@@ -248,7 +246,7 @@ def delete_task_store(
 ) -> None:
     """Delete a single task store key. No-op if the key does not exist."""
     scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
-    get_state_backend().delete(scope, key, session=session)
+    _get_db_backend().delete(scope, key, session=session)
 
 
 @task_store_router.delete(
@@ -272,4 +270,4 @@ def clear_task_store(
     the ``map_index`` parameter is ignored.
     """
     scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
-    get_state_backend().clear(scope, all_map_indices=all_map_indices, 
session=session)
+    _get_db_backend().clear(scope, all_map_indices=all_map_indices, 
session=session)
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
index bde91e8d8b4..efd4735959c 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
@@ -53,7 +53,7 @@ from airflow.listeners.listener import get_listener_manager
 from airflow.models.dag import DagModel
 from airflow.models.taskinstance import TaskInstance as TI
 from airflow.serialization.definitions.dag import SerializedDAG
-from airflow.state import get_state_backend
+from airflow.state.metastore import _get_db_backend
 from airflow.utils.state import TaskInstanceState
 
 log = structlog.get_logger(__name__)
@@ -63,7 +63,7 @@ def _clear_task_store_on_success(tis: Sequence[TI], session: 
Session) -> None:
     """Clear task store rows for each TI if clear_on_success is enabled."""
     if not conf.getboolean("state_store", "clear_on_success", fallback=False):
         return
-    backend = get_state_backend()
+    backend = _get_db_backend()
     for ti in tis:
         scope = TaskScope(
             dag_id=ti.dag_id,
diff --git a/airflow-core/src/airflow/state/metastore.py 
b/airflow-core/src/airflow/state/metastore.py
index 24c5214727d..137fce09a75 100644
--- a/airflow-core/src/airflow/state/metastore.py
+++ b/airflow-core/src/airflow/state/metastore.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+import functools
 from collections.abc import AsyncGenerator
 from contextlib import asynccontextmanager
 from datetime import datetime
@@ -462,3 +463,9 @@ class MetastoreStoreBackend(BaseStoreBackend):
                 AssetStoreModel.asset_id == scope.asset_id,
             )
         )
+
+
[email protected]
+def _get_db_backend() -> MetastoreStoreBackend:
+    """Return a cached MetastoreStoreBackend instance for DB-direct access."""
+    return MetastoreStoreBackend()
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_store.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_store.py
index 09fc2813f5c..8345386b2fd 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_store.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_store.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 import json
+from unittest.mock import patch
 
 import pytest
 from pydantic import ValidationError
@@ -299,3 +300,26 @@ class TestClearAssetState(TestAssetStateEndpoint):
 
     def test_unauthorized_returns_401(self, unauthenticated_test_client):
         assert unauthenticated_test_client.delete(self._base_url).status_code 
== 401
+
+
+class TestRoutesNeverCallCustomBackend(TestAssetStateEndpoint):
+    """Tests to validate that core API routes must use MetastoreStateBackend 
directly."""
+
+    @pytest.mark.parametrize(
+        ("method", "path_suffix", "kwargs"),
+        [
+            ("get", "", {}),
+            ("get", "/watermark", {}),
+            ("put", "/watermark", {"json": {"value": "v2"}}),
+            ("delete", "/watermark", {}),
+            ("delete", "", {}),
+        ],
+    )
+    def test_route_never_calls_get_state_backend(self, test_client, method, 
path_suffix, kwargs):
+        _create_asset_state(self._session, self.asset.id, "watermark", "v1")
+        self._session.commit()
+
+        with patch("airflow.state.get_state_backend") as mock_get_backend:
+            getattr(test_client, method)(f"{self._base_url}{path_suffix}", 
**kwargs)
+
+        mock_get_backend.assert_not_called()
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py
index beb7c0cbce0..30184d09c00 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 import json
+from unittest.mock import patch
 
 import pytest
 from pydantic import ValidationError
@@ -433,3 +434,27 @@ class TestClearTaskState(TestTaskStateEndpoint):
 
     def test_unauthorized_returns_401(self, unauthenticated_test_client):
         assert unauthenticated_test_client.delete(BASE_URL).status_code == 401
+
+
+class TestRoutesNeverCallCustomBackend(TestTaskStateEndpoint):
+    """Tests to validate that core API routes must use MetastoreStateBackend 
directly."""
+
+    @pytest.mark.parametrize(
+        ("method", "path", "kwargs"),
+        [
+            ("get", BASE_URL, {}),
+            ("get", f"{BASE_URL}/job_id", {}),
+            ("put", f"{BASE_URL}/job_id", {"json": {"value": "v2"}}),
+            ("patch", f"{BASE_URL}/job_id", {"json": {"value": "v3"}}),
+            ("delete", f"{BASE_URL}/job_id", {}),
+            ("delete", BASE_URL, {}),
+        ],
+    )
+    def test_route_never_calls_get_state_backend(self, test_client, method, 
path, kwargs):
+        _create_task_state(self._session, "job_id", "v1", self.dag_run)
+        self._session.commit()
+
+        with patch("airflow.state.get_state_backend") as mock_get_backend:
+            getattr(test_client, method)(path, **kwargs)
+
+        mock_get_backend.assert_not_called()

Reply via email to