Hi all,

Currently, the tasks could be stuck at `queued` state and could not be
scheduled by the scheduler or picked up by the worker. This could happen
when failure happens after a task is marked by `queued` before the executor
marks it as `running`.

There is a fix https://github.com/apache/airflow/pull/19769/files for the
celery executor. However, it only targets the CeleryExecutor and it leaks
the scheduler responsibility to the executor.

We propose to move the state reconciliation logic to the scheduler. The
proposed change:

1. when getting the _executable_task_instances_to_queued (
https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job.py#L287),
it also includes QUEUED state, like:   .filter(TI.state.in_([
TaskInstanceState.SCHEDULED, TaskInstanceState.QUEUED]). In this way, the
scheduler can process queued tis.

2. In the queue_command method in the base executor, it conditionally
(based on last enqueued time by the executor, this is configurable) queues
the ti even if its state is QUEUED.

This could potentially send the same tasks twice to the executor. Since in
the worker side, there are condition checks about whether a task can run or
not. This won't cause issues.

We have been running this in our prod for a while and would love to
contribute it.

Please let me know your thoughts.

Thanks,

Ping

Reply via email to