karenbraganz commented on issue #51301:
URL: https://github.com/apache/airflow/issues/51301#issuecomment-2932821660
I was able to reproduce this issue as well. I am seeing logs for
`on_failure_callback` in 2.10.0 but not in 2.10.5 when tasks get stuck in the
queued state and fail.
```
[2025-06-02T21:01:54.409+0000] {processor.py:791} INFO - Executed failure
callback for <TaskInstance: dag_1.task_1 scheduled__2025-06-01T00:00:00+00:00
[failed]> in state failed
```
I believe this is happening due to the changes introduced in [PR
#43520](https://github.com/apache/airflow/pull/43520). With this PR, tasks
don't immediately fail when the `task_queued_timeout` is met. Instead, they can
requeue and re-attempt to run up to two more times (even without retries set).
In this PR, once the task instance fails to run after the two requeue
attempts, its state will be [set to failed
directly](https://github.com/apache/airflow/blob/b93c3db6b1641b0840bd15ac7d05bc58ff2cccbf/airflow/jobs/scheduler_job_runner.py#L1859)
with the [TaskInstance set_state
method](https://github.com/apache/airflow/blob/b93c3db6b1641b0840bd15ac7d05bc58ff2cccbf/airflow/models/taskinstance.py#L2435).
Previously, tasks were failed by calling the
[cleanup_stuck_queued_tasks](https://github.com/apache/airflow/blob/c99887ec11ce3e1a43f2794fcf36d27555140f00/airflow/jobs/scheduler_job_runner.py#L1797)
method. This would eventually result in the [change_state function being
called, which would add the task instance's failed state to the executor's
event buffer
](https://github.com/apache/airflow/blob/2624df691a08463271017430f30fd4d17492b1ff/airflow-core/src/airflow/executors/base_executor.py#L390).
I think this step might have been important to ensure that the scheduler could
process the task failure as an executor event and call[
ti.handle_failure()](https://github.com/apache/airflow/blob/b93c3db6b1641b0840bd15ac7d05bc58ff2cccbf/airflow/jobs/scheduler_job_runner.py#L943)
for the callback to be executed.
The solution might be to call the [executor's fail method, which would call
the change_state
method](https://github.com/apache/airflow/blob/2624df691a08463271017430f30fd4d17492b1ff/airflow-core/src/airflow/executors/base_executor.py#L392)
similar to what was done before instead of using set_state to fail the task.
@dimberman or @RNHTTR is my understanding correct? I'd be happy to submit a
PR to fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]