>
> Airflow would benefit overall from a clearer distinction of what part
> (scheduler, executor, task handler, local task job, triggerer, etc.) is
> responsible for updates to each state so we start getting a clearer picture


Andrew, this is exactly the point. Currently, there is not a clear
interface for state change handling in each component. The main reason that
I was proposing using the scheduler to deal with the stuck queued state is
the executor currently does not handle ti state changes and the executor
events are handled by the scheduler. I think we should have a clear
interface and responsibility for each component (scheduler, executor,
airflow run --local process, airflow run --raw process, trigger) first
before we decide the best approach.

Thanks,

Ping


On Fri, Mar 25, 2022 at 9:19 AM Andrew Godwin
<[email protected]> wrote:

> Yes, that was roughly the idea Jarek - if the executor is only running
> inside the scheduler and has no external process component, it'd be nice to
> have a part of the "executor interface" that got called periodically for
> cleanup (QUEUED or otherwise). In my internal executor experiments, we've
> had to use a separate process for this, though that has its own advantages.
>
> I think one good thing to establish, though, would be that only executor
> code touches task instances in that state (as part of a general overall
> rule that only one component is responsible for each state) - I think
> Airflow would benefit overall from a clearer distinction of what part
> (scheduler, executor, task handler, local task job, triggerer, etc.) is
> responsible for updates to each state so we start getting a clearer picture
> of where any bugs could be in distributed state machine terms.
>
> Andrew
>
> On Thu, Mar 24, 2022 at 7:12 AM Jarek Potiuk <[email protected]> wrote:
>
>> > 2. scheduler invokes that method periodically.
>>
>> I think this is not the right approach. I think I see what Andrew
>> means here, but I think we should not assume that the scheduler will
>> periodically call some method. Depending on the executor
>> implementation (say for example future Fargate Executor or Cloud Run
>> executor). Cleaning queued tasks might actually be done differently
>> (there might be notification in the executor itself for the tasks that
>> are queued and stuck and Scheduler might not need to periodically
>> query it.
>>
>> I'd say a better approach (and possibly Andrew that's what you had in
>> mind) is to have a separate method in the "executor" protocol -
>> "start_cleanup_of_queued_tasks()". And one implementation of it (The
>> one in BaseExecutor now) could do periodic cleanup. But the future
>> Fargate Executor could have it implemented differently.
>>
>> I think we already have a few methods like that in BaseExecutor that
>> also have some implementation that will not really be useful in other
>> executors, so deriving an executor from BaseExecutor which has some
>> implementation that will likely need to be overridden in other
>> executors. I think we should start with what Andrew proposed (I
>> think). Take the existing executors, extract really an
>> "ExecutorProtocol", possibly add ExecutorMixin (or even few) to add
>> some common behaviour for executors and make sure we got it right -
>> probably at the time we (or someone else) writes a new executor. Just
>> to make sure we are not trying to make "common" code for something
>> that is not really "common".
>>
>> But maybe I am misinterpreting the intentions :)
>>
>> J.
>>
>

Reply via email to