This is an automated email from the ASF dual-hosted git repository.
amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6714142dba1 Make core API routes for task and asset states only
interact with DB (#67835)
6714142dba1 is described below
commit 6714142dba1588394af5ba6cf9858b75caa7361c
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Jun 3 09:15:10 2026 +0530
Make core API routes for task and asset states only interact with DB
(#67835)
---
.../core_api/routes/public/asset_store.py | 8 +++----
.../core_api/routes/public/task_store.py | 12 +++++------
.../core_api/services/public/task_instances.py | 4 ++--
airflow-core/src/airflow/state/metastore.py | 7 ++++++
.../core_api/routes/public/test_asset_store.py | 24 +++++++++++++++++++++
.../core_api/routes/public/test_task_store.py | 25 ++++++++++++++++++++++
6 files changed, 67 insertions(+), 13 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_store.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_store.py
index e01e73f8cb0..00f02bb664e 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_store.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_store.py
@@ -35,7 +35,7 @@ from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_
from airflow.api_fastapi.core_api.security import requires_access_asset
from airflow.models.asset import AssetModel
from airflow.models.asset_store import AssetStoreModel
-from airflow.state import get_state_backend
+from airflow.state.metastore import _get_db_backend
asset_store_router = AirflowRouter(
tags=["Asset Store"],
@@ -134,7 +134,7 @@ def set_asset_store(
session: SessionDep,
) -> None:
"""Set an asset store value. Creates or overwrites the key."""
- get_state_backend().set(AssetScope(asset_id=asset_id), key,
json.dumps(body.value), session=session)
+ _get_db_backend().set(AssetScope(asset_id=asset_id), key,
json.dumps(body.value), session=session)
@asset_store_router.delete(
@@ -149,7 +149,7 @@ def delete_asset_store(
session: SessionDep,
) -> None:
"""Delete a single asset store key. No-op if the key does not exist."""
- get_state_backend().delete(AssetScope(asset_id=asset_id), key,
session=session)
+ _get_db_backend().delete(AssetScope(asset_id=asset_id), key,
session=session)
@asset_store_router.delete(
@@ -163,4 +163,4 @@ def clear_asset_store(
session: SessionDep,
) -> None:
"""Delete all store keys for an asset."""
- get_state_backend().clear(AssetScope(asset_id=asset_id), session=session)
+ _get_db_backend().clear(AssetScope(asset_id=asset_id), session=session)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_store.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_store.py
index 9319d654fe0..32c16241dfd 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_store.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_store.py
@@ -39,7 +39,7 @@ from airflow.api_fastapi.core_api.security import
requires_access_dag
from airflow.configuration import conf
from airflow.models.task_store import TaskStoreModel
from airflow.models.taskinstance import TaskInstance as TI
-from airflow.state import get_state_backend
+from airflow.state.metastore import _get_db_backend
task_store_router = AirflowRouter(
tags=["Task Store"],
@@ -187,7 +187,7 @@ def set_task_store(
expires_at = _resolve_expires_at(body.expires_at)
scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
try:
- get_state_backend().set(scope, key, json.dumps(body.value),
expires_at=expires_at, session=session)
+ _get_db_backend().set(scope, key, json.dumps(body.value),
expires_at=expires_at, session=session)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=str(e)) from e
@@ -227,9 +227,7 @@ def patch_task_store(
)
scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
- get_state_backend().set(
- scope, key, json.dumps(body.value), expires_at=existing.expires_at,
session=session
- )
+ _get_db_backend().set(scope, key, json.dumps(body.value),
expires_at=existing.expires_at, session=session)
@task_store_router.delete(
@@ -248,7 +246,7 @@ def delete_task_store(
) -> None:
"""Delete a single task store key. No-op if the key does not exist."""
scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
- get_state_backend().delete(scope, key, session=session)
+ _get_db_backend().delete(scope, key, session=session)
@task_store_router.delete(
@@ -272,4 +270,4 @@ def clear_task_store(
the ``map_index`` parameter is ignored.
"""
scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
- get_state_backend().clear(scope, all_map_indices=all_map_indices,
session=session)
+ _get_db_backend().clear(scope, all_map_indices=all_map_indices,
session=session)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
index bde91e8d8b4..efd4735959c 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
@@ -53,7 +53,7 @@ from airflow.listeners.listener import get_listener_manager
from airflow.models.dag import DagModel
from airflow.models.taskinstance import TaskInstance as TI
from airflow.serialization.definitions.dag import SerializedDAG
-from airflow.state import get_state_backend
+from airflow.state.metastore import _get_db_backend
from airflow.utils.state import TaskInstanceState
log = structlog.get_logger(__name__)
@@ -63,7 +63,7 @@ def _clear_task_store_on_success(tis: Sequence[TI], session:
Session) -> None:
"""Clear task store rows for each TI if clear_on_success is enabled."""
if not conf.getboolean("state_store", "clear_on_success", fallback=False):
return
- backend = get_state_backend()
+ backend = _get_db_backend()
for ti in tis:
scope = TaskScope(
dag_id=ti.dag_id,
diff --git a/airflow-core/src/airflow/state/metastore.py
b/airflow-core/src/airflow/state/metastore.py
index 24c5214727d..137fce09a75 100644
--- a/airflow-core/src/airflow/state/metastore.py
+++ b/airflow-core/src/airflow/state/metastore.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+import functools
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from datetime import datetime
@@ -462,3 +463,9 @@ class MetastoreStoreBackend(BaseStoreBackend):
AssetStoreModel.asset_id == scope.asset_id,
)
)
+
+
[email protected]
+def _get_db_backend() -> MetastoreStoreBackend:
+ """Return a cached MetastoreStoreBackend instance for DB-direct access."""
+ return MetastoreStoreBackend()
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_store.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_store.py
index 09fc2813f5c..8345386b2fd 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_store.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_store.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import json
+from unittest.mock import patch
import pytest
from pydantic import ValidationError
@@ -299,3 +300,26 @@ class TestClearAssetState(TestAssetStateEndpoint):
def test_unauthorized_returns_401(self, unauthenticated_test_client):
assert unauthenticated_test_client.delete(self._base_url).status_code
== 401
+
+
+class TestRoutesNeverCallCustomBackend(TestAssetStateEndpoint):
+ """Tests to validate that core API routes must use MetastoreStateBackend
directly."""
+
+ @pytest.mark.parametrize(
+ ("method", "path_suffix", "kwargs"),
+ [
+ ("get", "", {}),
+ ("get", "/watermark", {}),
+ ("put", "/watermark", {"json": {"value": "v2"}}),
+ ("delete", "/watermark", {}),
+ ("delete", "", {}),
+ ],
+ )
+ def test_route_never_calls_get_state_backend(self, test_client, method,
path_suffix, kwargs):
+ _create_asset_state(self._session, self.asset.id, "watermark", "v1")
+ self._session.commit()
+
+ with patch("airflow.state.get_state_backend") as mock_get_backend:
+ getattr(test_client, method)(f"{self._base_url}{path_suffix}",
**kwargs)
+
+ mock_get_backend.assert_not_called()
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py
index beb7c0cbce0..30184d09c00 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import json
+from unittest.mock import patch
import pytest
from pydantic import ValidationError
@@ -433,3 +434,27 @@ class TestClearTaskState(TestTaskStateEndpoint):
def test_unauthorized_returns_401(self, unauthenticated_test_client):
assert unauthenticated_test_client.delete(BASE_URL).status_code == 401
+
+
+class TestRoutesNeverCallCustomBackend(TestTaskStateEndpoint):
+ """Tests to validate that core API routes must use MetastoreStateBackend
directly."""
+
+ @pytest.mark.parametrize(
+ ("method", "path", "kwargs"),
+ [
+ ("get", BASE_URL, {}),
+ ("get", f"{BASE_URL}/job_id", {}),
+ ("put", f"{BASE_URL}/job_id", {"json": {"value": "v2"}}),
+ ("patch", f"{BASE_URL}/job_id", {"json": {"value": "v3"}}),
+ ("delete", f"{BASE_URL}/job_id", {}),
+ ("delete", BASE_URL, {}),
+ ],
+ )
+ def test_route_never_calls_get_state_backend(self, test_client, method,
path, kwargs):
+ _create_task_state(self._session, "job_id", "v1", self.dag_run)
+ self._session.commit()
+
+ with patch("airflow.state.get_state_backend") as mock_get_backend:
+ getattr(test_client, method)(path, **kwargs)
+
+ mock_get_backend.assert_not_called()