Hi Rui,

I worked a bit on the scheduler and added some of my comments below.


On Tue, Mar 14, 2017 at 11:08 PM, Rui Wang <rui.w...@airbnb.com.invalid>
wrote:

> Hi,
> The design doc below I created is trying to make airflow scheduler more
> centralized. Briefly speaking, I propose moving state change of
> TaskInstance to scheduler. You can see the reasons for this change below.
>
>
> Could you take a look and comment if you see anything does not make sense?
>
> -Rui
>
> ------------------------------------------------------------
> --------------------------------------
> Current The state of TaskInstance is changed by both scheduler and worker.
> On worker side, worker monitors TaskInstance and changes the state to
> RUNNING, SUCCESS, if task succeed, or to UP_FOR_RETRY, FAILED if task fail.
> Worker also does failure email logic and failure callback logic.
> Proposal The general idea is to make a centralized scheduler and make
> workers dumb. Worker should not change state of TaskInstance, but just
> executes what it is assigned and reports the result of the task. Instead,
> the scheduler should make the decision on TaskInstance state change.
> Ideally, workers should not even handle the failure emails and callbacks
> unless the scheduler asks it to do so.
>

I had a look at the whole pipeline of scheduling, verifying dependencies
and context,
forwarding to the task queue and then receiving and starting the task at
the worker end,
where some of the previous final step verifications are also being done.

The way things are now is that the design follows a collaborative design,
which
relies on the fact that the underlying messaging framework is active. The
scheduler eventually
kicks off a task, the "guaranteed" MQ layer is active, at the worker end
processing starts
and immediately reports results.

The benefit of having the task set the state
to RUNNING is that you now know some worker actually did pick it up and
started working on it.
The SLA Is responsible for ensuring the task finishes in time; if that gets
violated, manual
intervention is required.

In a more independent design, you send a task for completion, but you have
no idea about
the timeframe that a task should both start and finish. The independent
design makes less
assumptions about the underlying MQ layer, it assumes it may not be active,
but it introduces
the really complex and nasty side-effect that it now must also work out
when a task should have
both started and finished.



> Why Worker does not have as much information as scheduler has. There were
> bugs observed caused by worker when worker gets into trouble but cannot
> make decision to change task state due to lack of information. Although
> there is airflow metadata DB, it is still not easy to share all information
> that scheduler has with workers.
>

Can you give some specific examples here, for example a JIRA?


>
> We can also ensure a consistent environment. There are slight differences
> in the chef recipes for the different workers which can cause strange
> issues when DAGs parse on one but not the other.
>

I don't see how this relates to an independent scheduler, it sounds like a
deployment pipeline issue.


>
> In the meantime, moving state changes to the scheduler can reduce the
> complexity of airflow. It especially helps when airflow needs to move to
> distributed schedulers. In that case state change everywhere by both
> schedulers and workers are harder to maintain.
> How to change After lots of discussions, following step will be done:
>
> 1. Add a new column to TaskInstance table. Worker will fill this column
> with the task process exit code.
>
> This introduces at least another single state that has to be inspected.


> 2. Worker will only set TaskInstance state to RUNNING when it is ready to
> run task. There was debate on moving RUNNING to scheduler as well. If
> moving RUNNING to scheduler, either scheduler marks TaskInstance RUNNING
> before it gets into queue, or scheduler checks the status code in column
> above, which is updated by worker when worker is ready to run task. In
> Former case, from user's perspective, it is bad to mark TaskInstance as
> RUNNING when worker is not ready to run. User could be confused. In the
> latter case, scheduler could mark task as RUNNING late due to schedule
> interval. It is still not a good user experience. Since only worker knows
> when is ready to run task, worker should still deliver this message to user
> by setting RUNNING state.
>

The scheduler should not set the state to RUNNING, because it has no idea
if the
underlying MQ layer has actually forwarded the task and a worker has
successfully
parsed and started the DAG.  QUEUED is helpful to identify issues with
flooding the
MQ layer or maybe inactivity there.

I'd then go for additional messaging on the MQ layer to indicate that a
worker has picked
up a task and started working on it, which I'm not sure can be done on
celery, probably directly
on ActiveMQ, but then it binds it to a specific MQ implementation. Some
investigation required there.


>
> 3. In any other cases, worker should not change state of TaskInstance, but
> save defined status code into column above.
>
> I do not understand the functional difference if both columns end up being
inspected
by the scheduler anyway.



> 4. Worker still handles failure emails and callbacks because there were
> concern that scheduler could use too much resource to run failure callbacks
> given unpredictable callback sizes. ( I think ideally scheduler should
> treat failure callbacks and emails as tasks, and assign such tasks to
> workers after TaskInstance state changes correspondingly). Eventually this
> logic will be moved to the workers once there is support for multiple
> distributed schedulers.
>
> Yes, the scheduler loop is very time-constrained and efforts were made to
push more functionality
out of this loop (for example the DAG parsing), so that general scheduling
of things that were parsed
could actually take place. We should avoid introducing new work there
instead for that reason.

The problem with running "failure" tasks is that there wouldn't be a DAG
for it, so you can't send them
through the same pipeline.

I do see a potential for a single worker or service that handles all emails
and failure conditions perhaps
and as you mentioned, it's tied in with making decisions and the
information that's available to do that.

Maybe then it's more of na issue to try to transfer all information to the
worker instead, so it can be dealt there?



> 5. In scheduler's loop, scheduler should check TaskInstance status code,
> then change state and retry/fail TaskInstance correspondingly.
>

Reply via email to