amoghrajesh commented on code in PR #50773:
URL: https://github.com/apache/airflow/pull/50773#discussion_r2122747650
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -897,7 +897,8 @@ def test_run_with_asset_outlets(
instant = timezone.datetime(2024, 12, 3, 10, 0)
time_machine.move_to(instant, tick=False)
- run(ti, context=ti.get_template_context(), log=mock.MagicMock())
+ with
mock.patch("airflow.sdk.execution_time.task_runner._validate_task_inlets_and_outlets"):
+ run(ti, context=ti.get_template_context(), log=mock.MagicMock())
Review Comment:
Can we please also assert that the mock was called?
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -791,6 +796,22 @@ def _prepare(ti: RuntimeTaskInstance, log: Logger,
context: Context) -> ToSuperv
return None
+def _validate_task_inlets_and_outlets(*, ti: RuntimeTaskInstance, log: Logger)
-> None:
+ if not ti.task.inlets and not ti.task.outlets:
+ return
+
+ SUPERVISOR_COMMS.send_request(msg=ValidateInletsAndOutlets(ti_id=ti.id),
log=log)
+ inactive_assets_resp = SUPERVISOR_COMMS.get_message()
+ if TYPE_CHECKING:
+ assert isinstance(inactive_assets_resp, InactiveAssetsResult)
+ if inactive_assets := inactive_assets_resp.inactive_assets:
+ raise AirflowInactiveAssetInInletOrOutletException(
+ inactive_asset_keys=[
+ AssetUniqueKey.from_profile(asset_profile) for asset_profile
in inactive_assets
+ ]
+ )
Review Comment:
Interesting why or how this was removed. But as I understand it, this part
checks "before" running if its possible to run with the current asset settings
right?
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -1215,6 +1217,10 @@ def _handle_request(self, msg: ToSupervisor, log:
FilteringBoundLogger):
)
elif isinstance(msg, DeleteVariable):
resp = self.client.variables.delete(msg.key)
+ elif isinstance(msg, ValidateInletsAndOutlets):
Review Comment:
Could you add a test under: `test_handle_requests` in test_supervisor.py?
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -400,12 +407,16 @@ def ti_update_state(
query = TI.duration_expression_update(ti_patch_payload.end_date,
query, session.bind)
updated_state = ti_patch_payload.state
task_instance = session.get(TI, ti_id_str)
- TI.register_asset_changes_in_db(
- task_instance,
- ti_patch_payload.task_outlets, # type: ignore
- ti_patch_payload.outlet_events,
- session,
- )
+ try:
+ TI.register_asset_changes_in_db(
+ task_instance,
+ ti_patch_payload.task_outlets, # type: ignore
+ ti_patch_payload.outlet_events,
+ session,
+ )
+ except AirflowInactiveAssetInInletOrOutletException:
+ log.exception()
Review Comment:
```suggestion
except AirflowInactiveAssetInInletOrOutletException as e:
log.error("Asset registration failed due to conflicting asset: %s", e)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]