I agree this needs a core fix in Airflow, but I'd like to highlight that this is fundamentally changing the executor contract (as it changes tasks from at-most-once submission to at-least-once) and so not only would it need a very close level of testing, it would also be some level of breaking change - since Airflow allows you to plug in third party executors.
Not that I'm against it, but we'd have to have a bit of a debate about what level of semver impact it would have. I imagine we could just about justify it as a minor-level change? Andrew On Tue, Mar 15, 2022 at 3:33 PM Ping Zhang <[email protected]> wrote: > Hi all, > > Currently, the tasks could be stuck at `queued` state and could not be > scheduled by the scheduler or picked up by the worker. This could happen > when failure happens after a task is marked by `queued` before the executor > marks it as `running`. > > There is a fix https://github.com/apache/airflow/pull/19769/files for the > celery executor. However, it only targets the CeleryExecutor and it leaks > the scheduler responsibility to the executor. > > We propose to move the state reconciliation logic to the scheduler. The > proposed change: > > 1. when getting the _executable_task_instances_to_queued ( > https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job.py#L287), > it also includes QUEUED state, like: .filter(TI.state.in_([ > TaskInstanceState.SCHEDULED, TaskInstanceState.QUEUED]). In this way, the > scheduler can process queued tis. > > 2. In the queue_command method in the base executor, it conditionally > (based on last enqueued time by the executor, this is configurable) queues > the ti even if its state is QUEUED. > > This could potentially send the same tasks twice to the executor. Since in > the worker side, there are condition checks about whether a task can run or > not. This won't cause issues. > > We have been running this in our prod for a while and would love to > contribute it. > > Please let me know your thoughts. > > Thanks, > > Ping >
