[
https://issues.apache.org/jira/browse/AIRFLOW-137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15641891#comment-15641891
]
Gerard Toonstra edited comment on AIRFLOW-137 at 11/6/16 2:36 PM:
------------------------------------------------------------------
When dagruns already exist and dags are cleared, dagruns are updated into the
"RUNNING" state by "clear_task_instances". Because the scheduler does not look
at the "max_active_runs" when it's scheduling in tasks, it will violate the
parameter setting.
--- in depth analysis ---
SchedulerJob.create_dag_run will create a dag run for a dag as long as it does
not exceed the max_active_runs parameter. It does this by querying for dagruns
in the "RUNNING" state.
This is called by the FileProcessor, which runs every n seconds and calls
"process_dags" for every dag it found and processed.
These functions are the actual scheduler to pick up new work to perform.
"process_dags" calls "process_task_instances", which only looks at active
dagruns to schedule or queue new task instances to execute. It no longer looks
at the max_active_runs
parameter, because the way it's used, it is only looking at that parameter when
creating new dagruns, not when they are executed or have executed.
When tasks are cleared in the UI, this is done through the "views.py" file and
the "clear" subroutine. This in turn calls "dag.clear", which by default has
the parameter set: reset_dag_runs=True" . The clear subroutine does not
override that setting through the UI.
"dag.clear" gets a list of task instances that should be cleared. Then it calls
"clear_task_instances" in models.py, which removes those task instance records
and also gets a list of all execution_dates for which execution dates were
removed. It uses that list of execution dates to reset the dagruns to RUNNING.
The "reset_dag_runs=True" parameter eventually causes a call to
"set_dag_runs_state" (default RUNNING), which deals with DagStat records on the
basis of dirty flags in a dag.
When the next cycle for the scheduler comes in, those dag runs of which tasks
were cleared are found in the RUNNING state and has missing entries for task
instances in that dagrun. The result is that all task instances for all
execution dates are scheduled in.
was (Author: g.toonstra):
When dagruns already exist and dags are cleared, dagruns are updated into the
"RUNNING" state by "clear_task_instances". Because the scheduler does not look
at the "max_active_runs" when it's scheduling in tasks, it will violate the
parameter setting.
--- in depth analysis ---
SchedulerJob.create_dag_run will create a dag run for a dag as long as it does
not exceed the max_active_runs parameter. It does this by querying for dagruns
in the "RUNNING" state.
This is called by the FileProcessor, which runs every n seconds and calls
"process_dags" for every dag it found and processed.
These functions are the actual scheduler to pick up new work to perform.
"process_dags" calls "process_task_instances", which only looks at active
dagruns to schedule or queue new task instances to execute. It no longer looks
at the max_active_runs
parameter, because the way it's used, it is only looking at that parameter when
creating new dagruns, not when they are executed or have executed.
When tasks are cleared in the UI, this is done through the "views.py" file and
the "clear" subroutine. This in turn calls "dag.clear", which by default has
the parameter set: reset_dag_runs=True" . The clear subroutine does not
override that setting through the UI.
"dag.clear" gets a list of task instances that should be cleared. Then it calls
"clear_task_instances" in models.py, which removes those task instance records
and also gets a list of all execution_dates for which execution dates were
removed. It uses that list of execution dates to reset the dagruns to RUNNING.
The "reset_dag_runs=True" parameter eventually causes a call to
"set_dag_runs_state" (default RUNNING), which deals with DagStat records on the
basis of dirty flags in a dag.
> Airflow does not respect 'max_active_runs' when task from multiple dag runs
> cleared
> -----------------------------------------------------------------------------------
>
> Key: AIRFLOW-137
> URL: https://issues.apache.org/jira/browse/AIRFLOW-137
> Project: Apache Airflow
> Issue Type: Bug
> Reporter: Tomasz Bartczak
> Priority: Minor
>
> Also requested at https://github.com/apache/incubator-airflow/issues/1442
> Dear Airflow Maintainers,
> Environment
> Before I tell you about my issue, let me describe my Airflow environment:
> Please fill out any appropriate fields:
> Airflow version: 1.7.0
> Airflow components: webserver, mysql, scheduler with celery executor
> Python Version: 2.7.6
> Operating System: Linux Ubuntu 3.19.0-26-generic Scheduler runs with
> --num-runs and get restarted around every minute or so
> Description of Issue
> Now that you know a little about me, let me tell you about the issue I am
> having:
> What did you expect to happen?
> After running 'airflow clear -t spark_final_observations2csv -s
> 2016-04-07T01:00:00 -e 2016-04-11T01:00:00 MODELLING_V6' I expected that this
> task gets executed in all dag-runs in specified by given time-range -
> respecting 'max_active_runs'
> Dag configuration:
> concurrency= 3,
> max_active_runs = 2,
> What happened instead?
> Airflow at first started executing 3 of those tasks, which already
> violates 'max_active_runs', but it looks like 'concurrency' was the applied
> limit here.
> 3_running_2_pending
> After first task was done - airflow scheduled all other tasks, making it 5
> running dags at the same time that violates all specified limit.
> In the GUI we saw red warning (5/2 Dags running ;-) )
> Reproducing the Issue
> max_active_runs is respected in a day-to-day basis - when of the tasks was
> stuck - airflow didn't start more than 2 dags concurrently.
> [screenshots in the original issue:
> https://github.com/apache/incubator-airflow/issues/1442]
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)