Hi all, Currently, the tasks could be stuck at `queued` state and could not be scheduled by the scheduler or picked up by the worker. This could happen when failure happens after a task is marked by `queued` before the executor marks it as `running`.
There is a fix https://github.com/apache/airflow/pull/19769/files for the celery executor. However, it only targets the CeleryExecutor and it leaks the scheduler responsibility to the executor. We propose to move the state reconciliation logic to the scheduler. The proposed change: 1. when getting the _executable_task_instances_to_queued ( https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job.py#L287), it also includes QUEUED state, like: .filter(TI.state.in_([ TaskInstanceState.SCHEDULED, TaskInstanceState.QUEUED]). In this way, the scheduler can process queued tis. 2. In the queue_command method in the base executor, it conditionally (based on last enqueued time by the executor, this is configurable) queues the ti even if its state is QUEUED. This could potentially send the same tasks twice to the executor. Since in the worker side, there are condition checks about whether a task can run or not. This won't cause issues. We have been running this in our prod for a while and would love to contribute it. Please let me know your thoughts. Thanks, Ping
