Should not we make "_clear_stuck_queued_tasks" part of the executor
interface simply?  And trigger it by the scheduler?  I guess that
might solve it in a "portable" way without changing the "at-most-once"
semantics?

J.

On Tue, Mar 15, 2022 at 10:40 PM Andrew Godwin
<[email protected]> wrote:
>
> I agree this needs a core fix in Airflow, but I'd like to highlight that this 
> is fundamentally changing the executor contract (as it changes tasks from 
> at-most-once submission to at-least-once) and so not only would it need a 
> very close level of testing, it would also be some level of breaking change - 
> since Airflow allows you to plug in third party executors.
>
> Not that I'm against it, but we'd have to have a bit of a debate about what 
> level of semver impact it would have. I imagine we could just about justify 
> it as a minor-level change?
>
> Andrew
>
> On Tue, Mar 15, 2022 at 3:33 PM Ping Zhang <[email protected]> wrote:
>>
>> Hi all,
>>
>> Currently, the tasks could be stuck at `queued` state and could not be 
>> scheduled by the scheduler or picked up by the worker. This could happen 
>> when failure happens after a task is marked by `queued` before the executor 
>> marks it as `running`.
>>
>> There is a fix https://github.com/apache/airflow/pull/19769/files for the 
>> celery executor. However, it only targets the CeleryExecutor and it leaks 
>> the scheduler responsibility to the executor.
>>
>> We propose to move the state reconciliation logic to the scheduler. The 
>> proposed change:
>>
>> 1. when getting the _executable_task_instances_to_queued 
>> (https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job.py#L287),
>>  it also includes QUEUED state, like:   
>> .filter(TI.state.in_([TaskInstanceState.SCHEDULED, 
>> TaskInstanceState.QUEUED]). In this way, the scheduler can process queued 
>> tis.
>>
>> 2. In the queue_command method in the base executor, it conditionally (based 
>> on last enqueued time by the executor, this is configurable) queues the ti 
>> even if its state is QUEUED.
>>
>> This could potentially send the same tasks twice to the executor. Since in 
>> the worker side, there are condition checks about whether a task can run or 
>> not. This won't cause issues.
>>
>> We have been running this in our prod for a while and would love to 
>> contribute it.
>>
>> Please let me know your thoughts.
>>
>> Thanks,
>>
>> Ping

Reply via email to