o-nikolas commented on PR #54692:
URL: https://github.com/apache/airflow/pull/54692#issuecomment-3251207277
I've done a lot of deep diving and I think I've identified what's happening
here. So the above exception is actually coming from the reentry point when
we're returning from the triggerer. Just before that exception in the logs we
see:
```
[2025-09-03 19:16:03,685] {pod.py:909} ERROR - Trigger emitted an error
event, failing the task: cannot import name 'SUPERVISOR_COMMS' from
'airflow.sdk.execution_time.task_runner'
(/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py)
[2025-09-03T19:16:03.685+0000] {pod.py:909} ERROR - Trigger emitted an error
event, failing the task: cannot import name 'SUPERVISOR_COMMS' from
'airflow.sdk.execution_time.task_runner'
(/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py)
[2025-09-03T19:16:20.896+0000] {pod_manager.py:765} INFO - Pod
run-pod-xxeffbtb has phase Pending
[2025-09-03T19:16:28.631+0000] {pod_manager.py:765} INFO - Pod
run-pod-xxeffbtb has phase Pending
[2025-09-03T19:16:36.345+0000] {pod_manager.py:765} INFO - Pod
run-pod-xxeffbtb has phase Pending
[2025-09-03T19:16:44.060+0000] {pod_manager.py:765} INFO - Pod
run-pod-xxeffbtb has phase Pending
[2025-09-03 19:17:03,348] {pod.py:1181} INFO - Skipping deleting pod:
run-pod-xxeffbtb
[2025-09-03T19:17:03.348+0000] {pod.py:1181} INFO - Skipping deleting pod:
run-pod-xxeffbtb
2025-09-03 19:17:03 [error ] Task failed with exception [task]
```
And I see successful communication with task API comms above that. So it's
actually within the Triggerer scope that we're failing to communicate with task
api to get connection information.
This is because in the dag.test() world, we run triggerers inline
[here](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/dag.py#L1376)
and we do not setup the inprocess supervisor comms like we do for regular task
execution in dag.test()
[here](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/supervisor.py#L1574-L1577).
Triggers definitely need a supervisor comms setup, you can see we do that in
the normal case (i.e. not dag.test())
[here](https://github.com/apache/airflow/blob/f1e9e522fb81894c1e5ab8bf2eba1269011fd757/airflow-core/src/airflow/jobs/triggerer_job_runner.py#L905-L910).
So I believe we need to update the inline execution of triggers in
dag.test() to setup a supervisor comms.
I have a draft PR here with what I think that might look like. But likely
needs refinement: #55236
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]