amoghrajesh commented on code in PR #68900:
URL: https://github.com/apache/airflow/pull/68900#discussion_r3460007000
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -622,6 +648,25 @@ def _handle_request(self, msg: ToTriggerSupervisor, log:
FilteringBoundLogger, r
resp =
HITLDetailResponseResult.from_api_response(response=api_resp)
elif isinstance(msg, MaskSecret):
handle_mask_secret(msg)
+
+ # Adding AssetStateStore functionality
Review Comment:
```suggestion
```
##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -834,6 +843,94 @@ async def
test_create_triggers_asset_state_store_accessor_reads_and_writes(
await runner.cleanup_finished_triggers()
+class TestTriggerSupervisorAssetStateStore:
+ """Supervisor side of the asset-state-store round trip.
+
+ The accessor-only test above asserts the messages a watcher trigger emits,
but never
+ exercises ``TriggerRunnerSupervisor._handle_request``. These drive the
supervisor
+ directly so a broken (or missing) branch can't pass silently.
+ """
+
+ @pytest.fixture
+ def supervisor(self, jobless_supervisor, mocker):
+ jobless_supervisor.client = mocker.MagicMock()
Review Comment:
Here and elsewhere, can we please use `spec` or `autospec`?
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -622,6 +648,25 @@ def _handle_request(self, msg: ToTriggerSupervisor, log:
FilteringBoundLogger, r
resp =
HITLDetailResponseResult.from_api_response(response=api_resp)
elif isinstance(msg, MaskSecret):
handle_mask_secret(msg)
+
+ # Adding AssetStateStore functionality
+ elif isinstance(msg, ClearAssetStateStoreByName):
+ resp, dump_opts =
handle_clear_asset_state_store_by_name(self.client, msg)
+ elif isinstance(msg, ClearAssetStateStoreByUri):
+ resp, dump_opts =
handle_clear_asset_state_store_by_uri(self.client, msg)
+ elif isinstance(msg, DeleteAssetStateStoreByName):
+ resp, dump_opts =
handle_delete_asset_state_store_by_name(self.client, msg)
+ elif isinstance(msg, DeleteAssetStateStoreByUri):
+ resp, dump_opts =
handle_delete_asset_state_store_by_uri(self.client, msg)
+ elif isinstance(msg, GetAssetStateStoreByName):
+ resp, dump_opts =
handle_get_asset_state_store_by_name(self.client, msg)
+ elif isinstance(msg, GetAssetStateStoreByUri):
+ resp, dump_opts = handle_get_asset_state_store_by_uri(self.client,
msg)
+ elif isinstance(msg, SetAssetStateStoreByName):
+ resp, dump_opts =
handle_set_asset_state_store_by_name(self.client, msg)
+ elif isinstance(msg, SetAssetStateStoreByUri):
+ resp, dump_opts = handle_set_asset_state_store_by_uri(self.client,
msg)
+
Review Comment:
```suggestion
```
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -622,6 +648,25 @@ def _handle_request(self, msg: ToTriggerSupervisor, log:
FilteringBoundLogger, r
resp =
HITLDetailResponseResult.from_api_response(response=api_resp)
elif isinstance(msg, MaskSecret):
handle_mask_secret(msg)
+
+ # Adding AssetStateStore functionality
+ elif isinstance(msg, ClearAssetStateStoreByName):
+ resp, dump_opts =
handle_clear_asset_state_store_by_name(self.client, msg)
+ elif isinstance(msg, ClearAssetStateStoreByUri):
+ resp, dump_opts =
handle_clear_asset_state_store_by_uri(self.client, msg)
+ elif isinstance(msg, DeleteAssetStateStoreByName):
+ resp, dump_opts =
handle_delete_asset_state_store_by_name(self.client, msg)
+ elif isinstance(msg, DeleteAssetStateStoreByUri):
+ resp, dump_opts =
handle_delete_asset_state_store_by_uri(self.client, msg)
+ elif isinstance(msg, GetAssetStateStoreByName):
+ resp, dump_opts =
handle_get_asset_state_store_by_name(self.client, msg)
+ elif isinstance(msg, GetAssetStateStoreByUri):
+ resp, dump_opts = handle_get_asset_state_store_by_uri(self.client,
msg)
+ elif isinstance(msg, SetAssetStateStoreByName):
+ resp, dump_opts =
handle_set_asset_state_store_by_name(self.client, msg)
+ elif isinstance(msg, SetAssetStateStoreByUri):
+ resp, dump_opts = handle_set_asset_state_store_by_uri(self.client,
msg)
+
Review Comment:
```suggestion
elif isinstance(msg, ClearAssetStateStoreByName):
handle_clear_asset_state_store_by_name(self.client, msg)
resp = OKResponse(ok=True)
elif isinstance(msg, ClearAssetStateStoreByUri):
handle_clear_asset_state_store_by_uri(self.client, msg)
resp = OKResponse(ok=True)
elif isinstance(msg, DeleteAssetStateStoreByName):
handle_delete_asset_state_store_by_name(self.client, msg)
resp = OKResponse(ok=True)
elif isinstance(msg, DeleteAssetStateStoreByUri):
handle_delete_asset_state_store_by_uri(self.client, msg)
resp = OKResponse(ok=True)
elif isinstance(msg, GetAssetStateStoreByName):
resp, dump_opts =
handle_get_asset_state_store_by_name(self.client, msg)
elif isinstance(msg, GetAssetStateStoreByUri):
resp, dump_opts =
handle_get_asset_state_store_by_uri(self.client, msg)
elif isinstance(msg, SetAssetStateStoreByName):
handle_set_asset_state_store_by_name(self.client, msg)
resp = OKResponse(ok=True)
elif isinstance(msg, SetAssetStateStoreByUri):
handle_set_asset_state_store_by_uri(self.client, msg)
resp = OKResponse(ok=True)
```
The handlers (`set`, `delete`, `clear`) return `(None, {})`, so `resp` stays
`None` after these calls. `supervisor.py` explicitly sets `resp =
OKResponse(ok=True). Suggest aligning with that
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
Review Comment:
@kaxil, following up on your sync_to_async concern from
https://github.com/apache/airflow/pull/67839#discussion_r3439491205:
`TriggerCommsDecoder.send()` already handles the async case via
`greenback.await_()`, so there's no deadlock under normal conditions as I see
it. Does that make sense or were you pointing at a different gap?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]