amoghrajesh commented on code in PR #68900:
URL: https://github.com/apache/airflow/pull/68900#discussion_r3464559601
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state_store.py:
##########
@@ -51,8 +51,8 @@
_TIWriterFields = tuple[str, str, str, int]
-def _fetch_ti_writer_fields(token: TIToken, session: SessionDep) ->
_TIWriterFields:
- """Return (dag_id, run_id, task_id, map_index) for the TI identified by
the token."""
Review Comment:
Checking this more carefully, the `None` return value treats two things the
same way:
- a watcher trigger (which intentionally carries the null UUID sentinel) and
- any non-null token whose TI row happens to be missing. In practice the
second case probably cannot happen today, but I would rather be safe here. An
explicit `token.id == NULL_UUID` check for the watcher branch keeps the 404
intact for everything else, and makes it obvious from reading the code why we
are skipping the TI lookup rather than inferring it from a missing row. Feels
like a small change for a meaningful safety net.
##########
task-sdk/tests/task_sdk/execution_time/test_request_handlers.py:
##########
Review Comment:
Most tests in this file follow same kind of pattern, you could just have a
single test with parameterise instead, something like:
```python
@pytest.mark.parametrize("msg,method,call_kwargs,expected_result", [
(
SetAssetStateStoreByName(name="asset_a", key="k", value="v"),
"set", {"key": "k", "value": "v", "name": "asset_a"}, None,
),
...
])
def test_xyz(client, msg, method, call_kwargs, expected_result):
handle = HANDLER_MAP[type(msg)]
result, dump_opts = handle(client, msg)
getattr(client.asset_state_store,
method).assert_called_once_with(**call_kwargs)
assert result is expected_result
assert dump_opts == {}
```
##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -834,6 +845,94 @@ async def
test_create_triggers_asset_state_store_accessor_reads_and_writes(
await runner.cleanup_finished_triggers()
+class TestTriggerSupervisorAssetStateStore:
+ """Supervisor side of the asset-state-store round trip.
+
+ The accessor-only test above asserts the messages a watcher trigger emits,
but never
+ exercises ``TriggerRunnerSupervisor._handle_request``. These drive the
supervisor
+ directly so a broken (or missing) branch can't pass silently.
+ """
+
+ @pytest.fixture
+ def supervisor(self, jobless_supervisor, mocker):
+ jobless_supervisor.client = mocker.MagicMock(spec=Client)
+ mocker.patch.object(TriggerRunnerSupervisor, "send_msg")
+ return jobless_supervisor
+
+ @staticmethod
+ def _handle(supervisor, msg):
+ supervisor._handle_request(msg,
log=MagicMock(spec=FilteringBoundLogger), req_id=7)
+
+ def test_get_by_name_wraps_response_and_replies(self, supervisor):
+ supervisor.client.asset_state_store.get.return_value =
AssetStateStoreResponse(value="2026-01-01")
+
+ self._handle(supervisor, GetAssetStateStoreByName(name="asset_a",
key="watermark"))
+
+
supervisor.client.asset_state_store.get.assert_called_once_with(key="watermark",
name="asset_a")
+ supervisor.send_msg.assert_called_once_with(
+ AssetStateStoreResult(value="2026-01-01"), request_id=7, error=None
+ )
+
+ def test_get_by_uri_wraps_response_and_replies(self, supervisor):
+ supervisor.client.asset_state_store.get.return_value =
AssetStateStoreResponse(value="2026-01-01")
+
+ self._handle(supervisor, GetAssetStateStoreByUri(uri="s3://bucket/a",
key="watermark"))
+
+
supervisor.client.asset_state_store.get.assert_called_once_with(key="watermark",
uri="s3://bucket/a")
+ supervisor.send_msg.assert_called_once_with(
+ AssetStateStoreResult(value="2026-01-01"), request_id=7, error=None
+ )
+
+ def test_get_passes_through_not_found_error(self, supervisor):
+ err = ErrorResponse(error=ErrorType.ASSET_STORE_NOT_FOUND,
detail={"key": "watermark"})
+ supervisor.client.asset_state_store.get.return_value = err
+
+ self._handle(supervisor, GetAssetStateStoreByName(name="asset_a",
key="watermark"))
+
+ supervisor.send_msg.assert_called_once_with(err, request_id=7,
error=None)
+
+ def test_set_by_name(self, supervisor):
+ self._handle(
+ supervisor, SetAssetStateStoreByName(name="asset_a",
key="watermark", value="2026-01-01")
+ )
+
+ supervisor.client.asset_state_store.set.assert_called_once_with(
+ key="watermark", value="2026-01-01", name="asset_a"
+ )
+ supervisor.send_msg.assert_called_once_with(OKResponse(ok=True),
request_id=7, error=None)
+
+ def test_set_by_uri(self, supervisor):
+ self._handle(
+ supervisor, SetAssetStateStoreByUri(uri="s3://bucket/a",
key="watermark", value="2026-01-01")
+ )
+
+ supervisor.client.asset_state_store.set.assert_called_once_with(
+ key="watermark", value="2026-01-01", uri="s3://bucket/a"
+ )
+
+ def test_delete_by_name(self, supervisor):
+ self._handle(supervisor, DeleteAssetStateStoreByName(name="asset_a",
key="watermark"))
+
+
supervisor.client.asset_state_store.delete.assert_called_once_with(key="watermark",
name="asset_a")
+
+ def test_delete_by_uri(self, supervisor):
+ self._handle(supervisor,
DeleteAssetStateStoreByUri(uri="s3://bucket/a", key="watermark"))
+
+ supervisor.client.asset_state_store.delete.assert_called_once_with(
+ key="watermark", uri="s3://bucket/a"
+ )
+
+ def test_clear_by_name(self, supervisor):
+ self._handle(supervisor, ClearAssetStateStoreByName(name="asset_a"))
+
+
supervisor.client.asset_state_store.clear.assert_called_once_with(name="asset_a")
+
+ def test_clear_by_uri(self, supervisor):
+ self._handle(supervisor,
ClearAssetStateStoreByUri(uri="s3://bucket/a"))
+
+
supervisor.client.asset_state_store.clear.assert_called_once_with(uri="s3://bucket/a")
Review Comment:
These are missing asserts about `OKResponse` being called.
`test_set_by_name` handles it well.
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state_store.py:
##########
@@ -380,3 +380,66 @@ def test_get_inactive_asset_by_name_returns_404(self,
client: TestClient, inacti
def test_get_inactive_asset_by_uri_returns_404(self, client: TestClient,
inactive_asset: AssetModel):
response = client.get(_BY_URI_VALUE, params={"uri":
inactive_asset.uri, "key": "watermark"})
assert response.status_code == 404
+
+
+class TestPutAssetStateWatcherWrite:
+ """PUT routes record WATCHER kind when no TI backs the token
(WATCHER/triggerer path)."""
+
+ @pytest.fixture(autouse=True)
+ def setup_null_uuid_auth(self, exec_app: FastAPI):
+ from uuid import UUID
+
+ async def _watcher_auth(request: Request) -> TIToken:
+ return TIToken(
+ id=UUID("00000000-0000-0000-0000-000000000000"),
claims=TIClaims(scope="execution")
+ )
+
+ exec_app.dependency_overrides[require_auth] = _watcher_auth
+
+ def test_put_by_name_creates_row(self, client: TestClient, asset:
AssetModel, session):
+ response = client.put(
+ _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"},
json={"value": "2026-01-01"}
+ )
+
+ assert response.status_code == 204
+ row = session.scalar(
+ select(AssetStateStoreModel).where(
+ AssetStateStoreModel.asset_id == asset.id,
+ AssetStateStoreModel.key == "watermark",
+ )
+ )
+ assert row is not None
+ assert row.value == '"2026-01-01"'
+
+ def test_put_by_name_records_watcher_kind(self, client: TestClient, asset:
AssetModel, session):
+ client.put(
+ _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"},
json={"value": "2026-01-01"}
+ )
+
+ row = session.scalar(
+ select(AssetStateStoreModel).where(
+ AssetStateStoreModel.asset_id == asset.id,
+ AssetStateStoreModel.key == "watermark",
+ )
+ )
+ assert row is not None
+ assert row.last_updated_by_kind == "watcher"
+ assert row.last_updated_by_dag_id is None
+ assert row.last_updated_by_run_id is None
+ assert row.last_updated_by_task_id is None
+ assert row.last_updated_by_map_index is None
+
+ def test_put_by_uri_records_watcher_kind(self, client: TestClient, asset:
AssetModel, session):
+ response = client.put(
+ _BY_URI_VALUE, params={"uri": asset.uri, "key": "watermark"},
json={"value": "2026-06-01"}
+ )
+
+ assert response.status_code == 204
+ row = session.scalar(
+ select(AssetStateStoreModel).where(
+ AssetStateStoreModel.asset_id == asset.id,
+ AssetStateStoreModel.key == "watermark",
+ )
+ )
+ assert row is not None
+ assert row.last_updated_by_kind == "watcher"
Review Comment:
We should assert that other fields are none to have a strong test here,
check the test above that does it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]