Hi,
The design doc below I created is trying to make airflow scheduler more
centralized. Briefly speaking, I propose moving state change of
TaskInstance to scheduler. You can see the reasons for this change below.


Could you take a look and comment if you see anything does not make sense?

-Rui

--------------------------------------------------------------------------------------------------
Current The state of TaskInstance is changed by both scheduler and worker.
On worker side, worker monitors TaskInstance and changes the state to
RUNNING, SUCCESS, if task succeed, or to UP_FOR_RETRY, FAILED if task fail.
Worker also does failure email logic and failure callback logic.
Proposal The general idea is to make a centralized scheduler and make
workers dumb. Worker should not change state of TaskInstance, but just
executes what it is assigned and reports the result of the task. Instead,
the scheduler should make the decision on TaskInstance state change.
Ideally, workers should not even handle the failure emails and callbacks
unless the scheduler asks it to do so.
Why Worker does not have as much information as scheduler has. There were
bugs observed caused by worker when worker gets into trouble but cannot
make decision to change task state due to lack of information. Although
there is airflow metadata DB, it is still not easy to share all information
that scheduler has with workers.

We can also ensure a consistent environment. There are slight differences
in the chef recipes for the different workers which can cause strange
issues when DAGs parse on one but not the other.

In the meantime, moving state changes to the scheduler can reduce the
complexity of airflow. It especially helps when airflow needs to move to
distributed schedulers. In that case state change everywhere by both
schedulers and workers are harder to maintain.
How to change After lots of discussions, following step will be done:

1. Add a new column to TaskInstance table. Worker will fill this column
with the task process exit code.

2. Worker will only set TaskInstance state to RUNNING when it is ready to
run task. There was debate on moving RUNNING to scheduler as well. If
moving RUNNING to scheduler, either scheduler marks TaskInstance RUNNING
before it gets into queue, or scheduler checks the status code in column
above, which is updated by worker when worker is ready to run task. In
Former case, from user's perspective, it is bad to mark TaskInstance as
RUNNING when worker is not ready to run. User could be confused. In the
latter case, scheduler could mark task as RUNNING late due to schedule
interval. It is still not a good user experience. Since only worker knows
when is ready to run task, worker should still deliver this message to user
by setting RUNNING state.

3. In any other cases, worker should not change state of TaskInstance, but
save defined status code into column above.

4. Worker still handles failure emails and callbacks because there were
concern that scheduler could use too much resource to run failure callbacks
given unpredictable callback sizes. ( I think ideally scheduler should
treat failure callbacks and emails as tasks, and assign such tasks to
workers after TaskInstance state changes correspondingly). Eventually this
logic will be moved to the workers once there is support for multiple
distributed schedulers.

5. In scheduler's loop, scheduler should check TaskInstance status code,
then change state and retry/fail TaskInstance correspondingly.

Reply via email to