I think we all very much agree that first we need to define responsibilities for those :). How about someone starts a nice design doc with graphs etc. we can discuss :) ?
On Thu, Mar 31, 2022 at 11:11 PM Ping Zhang <[email protected]> wrote: >> >> Airflow would benefit overall from a clearer distinction of what part >> (scheduler, executor, task handler, local task job, triggerer, etc.) is >> responsible for updates to each state so we start getting a clearer picture > > > Andrew, this is exactly the point. Currently, there is not a clear interface > for state change handling in each component. The main reason that I was > proposing using the scheduler to deal with the stuck queued state is the > executor currently does not handle ti state changes and the executor events > are handled by the scheduler. I think we should have a clear interface and > responsibility for each component (scheduler, executor, airflow run --local > process, airflow run --raw process, trigger) first before we decide the best > approach. > > Thanks, > > Ping > > > On Fri, Mar 25, 2022 at 9:19 AM Andrew Godwin > <[email protected]> wrote: >> >> Yes, that was roughly the idea Jarek - if the executor is only running >> inside the scheduler and has no external process component, it'd be nice to >> have a part of the "executor interface" that got called periodically for >> cleanup (QUEUED or otherwise). In my internal executor experiments, we've >> had to use a separate process for this, though that has its own advantages. >> >> I think one good thing to establish, though, would be that only executor >> code touches task instances in that state (as part of a general overall rule >> that only one component is responsible for each state) - I think Airflow >> would benefit overall from a clearer distinction of what part (scheduler, >> executor, task handler, local task job, triggerer, etc.) is responsible for >> updates to each state so we start getting a clearer picture of where any >> bugs could be in distributed state machine terms. >> >> Andrew >> >> On Thu, Mar 24, 2022 at 7:12 AM Jarek Potiuk <[email protected]> wrote: >>> >>> > 2. scheduler invokes that method periodically. >>> >>> I think this is not the right approach. I think I see what Andrew >>> means here, but I think we should not assume that the scheduler will >>> periodically call some method. Depending on the executor >>> implementation (say for example future Fargate Executor or Cloud Run >>> executor). Cleaning queued tasks might actually be done differently >>> (there might be notification in the executor itself for the tasks that >>> are queued and stuck and Scheduler might not need to periodically >>> query it. >>> >>> I'd say a better approach (and possibly Andrew that's what you had in >>> mind) is to have a separate method in the "executor" protocol - >>> "start_cleanup_of_queued_tasks()". And one implementation of it (The >>> one in BaseExecutor now) could do periodic cleanup. But the future >>> Fargate Executor could have it implemented differently. >>> >>> I think we already have a few methods like that in BaseExecutor that >>> also have some implementation that will not really be useful in other >>> executors, so deriving an executor from BaseExecutor which has some >>> implementation that will likely need to be overridden in other >>> executors. I think we should start with what Andrew proposed (I >>> think). Take the existing executors, extract really an >>> "ExecutorProtocol", possibly add ExecutorMixin (or even few) to add >>> some common behaviour for executors and make sure we got it right - >>> probably at the time we (or someone else) writes a new executor. Just >>> to make sure we are not trying to make "common" code for something >>> that is not really "common". >>> >>> But maybe I am misinterpreting the intentions :) >>> >>> J.
