jroachgolf84 commented on code in PR #68900:
URL: https://github.com/apache/airflow/pull/68900#discussion_r3467060754
##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -834,6 +845,94 @@ async def
test_create_triggers_asset_state_store_accessor_reads_and_writes(
await runner.cleanup_finished_triggers()
+class TestTriggerSupervisorAssetStateStore:
+ """Supervisor side of the asset-state-store round trip.
+
+ The accessor-only test above asserts the messages a watcher trigger emits,
but never
+ exercises ``TriggerRunnerSupervisor._handle_request``. These drive the
supervisor
+ directly so a broken (or missing) branch can't pass silently.
+ """
+
+ @pytest.fixture
+ def supervisor(self, jobless_supervisor, mocker):
+ jobless_supervisor.client = mocker.MagicMock(spec=Client)
+ mocker.patch.object(TriggerRunnerSupervisor, "send_msg")
+ return jobless_supervisor
+
+ @staticmethod
+ def _handle(supervisor, msg):
+ supervisor._handle_request(msg,
log=MagicMock(spec=FilteringBoundLogger), req_id=7)
+
+ def test_get_by_name_wraps_response_and_replies(self, supervisor):
+ supervisor.client.asset_state_store.get.return_value =
AssetStateStoreResponse(value="2026-01-01")
+
+ self._handle(supervisor, GetAssetStateStoreByName(name="asset_a",
key="watermark"))
+
+
supervisor.client.asset_state_store.get.assert_called_once_with(key="watermark",
name="asset_a")
+ supervisor.send_msg.assert_called_once_with(
+ AssetStateStoreResult(value="2026-01-01"), request_id=7, error=None
+ )
+
+ def test_get_by_uri_wraps_response_and_replies(self, supervisor):
+ supervisor.client.asset_state_store.get.return_value =
AssetStateStoreResponse(value="2026-01-01")
+
+ self._handle(supervisor, GetAssetStateStoreByUri(uri="s3://bucket/a",
key="watermark"))
+
+
supervisor.client.asset_state_store.get.assert_called_once_with(key="watermark",
uri="s3://bucket/a")
+ supervisor.send_msg.assert_called_once_with(
+ AssetStateStoreResult(value="2026-01-01"), request_id=7, error=None
+ )
+
+ def test_get_passes_through_not_found_error(self, supervisor):
+ err = ErrorResponse(error=ErrorType.ASSET_STORE_NOT_FOUND,
detail={"key": "watermark"})
+ supervisor.client.asset_state_store.get.return_value = err
+
+ self._handle(supervisor, GetAssetStateStoreByName(name="asset_a",
key="watermark"))
+
+ supervisor.send_msg.assert_called_once_with(err, request_id=7,
error=None)
+
+ def test_set_by_name(self, supervisor):
+ self._handle(
+ supervisor, SetAssetStateStoreByName(name="asset_a",
key="watermark", value="2026-01-01")
+ )
+
+ supervisor.client.asset_state_store.set.assert_called_once_with(
+ key="watermark", value="2026-01-01", name="asset_a"
+ )
+ supervisor.send_msg.assert_called_once_with(OKResponse(ok=True),
request_id=7, error=None)
+
+ def test_set_by_uri(self, supervisor):
+ self._handle(
+ supervisor, SetAssetStateStoreByUri(uri="s3://bucket/a",
key="watermark", value="2026-01-01")
+ )
+
+ supervisor.client.asset_state_store.set.assert_called_once_with(
+ key="watermark", value="2026-01-01", uri="s3://bucket/a"
+ )
+
+ def test_delete_by_name(self, supervisor):
+ self._handle(supervisor, DeleteAssetStateStoreByName(name="asset_a",
key="watermark"))
+
+
supervisor.client.asset_state_store.delete.assert_called_once_with(key="watermark",
name="asset_a")
+
+ def test_delete_by_uri(self, supervisor):
+ self._handle(supervisor,
DeleteAssetStateStoreByUri(uri="s3://bucket/a", key="watermark"))
+
+ supervisor.client.asset_state_store.delete.assert_called_once_with(
+ key="watermark", uri="s3://bucket/a"
+ )
+
+ def test_clear_by_name(self, supervisor):
+ self._handle(supervisor, ClearAssetStateStoreByName(name="asset_a"))
+
+
supervisor.client.asset_state_store.clear.assert_called_once_with(name="asset_a")
+
+ def test_clear_by_uri(self, supervisor):
+ self._handle(supervisor,
ClearAssetStateStoreByUri(uri="s3://bucket/a"))
+
+
supervisor.client.asset_state_store.clear.assert_called_once_with(uri="s3://bucket/a")
Review Comment:
Resolved in next commit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]