[
https://issues.apache.org/jira/browse/AIRFLOW-584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15642435#comment-15642435
]
Gerard Toonstra commented on AIRFLOW-584:
-----------------------------------------
I'm writing up a detailed story on why the race condition exists and how
specific settings would influence it in AIRFLOW-72. It comes down to the way
how pool availability is calculated.
This calculation revolves around checking the config setting and looking at the
number of task instances in RUNNING state that are using that pool. There is a
time window between the scheduler doing these calculations and the task
instances being sent to the executor and actually starting to RUN (which is
when they are actually set into RUNNING state).
In one scheduling loop, airflow wouldn't violate the rule. But if you have
congestion on your workers and your scheduler heartbeat is set low, then each
loop could send N tasks to the executor, because the ones that were sent to the
executor wouldn't have updated their status yet. In your case, there's
apparently 25-30 seconds of congestion on your workers for some reason in a
specific time interval.
I don't know the reason for that. If you're running this DAG besides other dags
on your system, then maybe all workers are tied up and these task instances do
not actually start on the workers yet until after 25-30 seconds. Or it could be
something related to the celery backend you're using.
On the up and coming airflow, this issue will already be somewhat mitigated,
because each task has a dependency check on the pool at the time of starting.
This is also not 100% proof against the race condition (simultaneously running
tasks A and B could still read the state at time X, thereby not seeing the
update of either A or B at the time of reading), but it's an improvement over
the 1.7.1.3 implementation, which doesn't have that check.
> Airflow Pool does not limit running tasks
> -----------------------------------------
>
> Key: AIRFLOW-584
> URL: https://issues.apache.org/jira/browse/AIRFLOW-584
> Project: Apache Airflow
> Issue Type: Bug
> Components: pools
> Affects Versions: Airflow 1.7.1.3
> Environment: Ubuntu 14.04
> Reporter: David
> Attachments: img1.png, img2.png
>
>
> Airflow pools are not limiting the number of running task instances for the
> following dag in 1.7.1.3
> Steps to recreate:
> Create a pool of size 5 through the UI.
> The following dag has 52 tasks with increasing priority corresponding to the
> task number. There should only ever be 5 tasks running at a time however I
> observed 29 'used slots' in a pool with 5 slots
> {code}
> dag_name = 'pools_bug'
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'start_date': datetime(2016, 10, 20),
> 'email_on_failure': False,
> 'retries': 1
> }
> dag = DAG(dag_name, default_args=default_args, schedule_interval="0 8 * * *")
> start = DummyOperator(task_id='start', dag=dag)
> end = DummyOperator(task_id='end', dag=dag)
> for i in range(50):
> sleep_command = 'sleep 10'
> task_name = 'task-{}'.format(i)
> op = BashOperator(
> task_id=task_name,
> bash_command=sleep_command,
> execution_timeout=timedelta(hours=4),
> priority_weight=i,
> pool=dag_name,
> dag=dag)
> start.set_downstream(op)
> end.set_upstream(op)
> {code}
> Relevant configurations from airflow.cfg:
> {code}
> [core]
> # The executor class that airflow should use. Choices include
> # SequentialExecutor, LocalExecutor, CeleryExecutor
> executor = CeleryExecutor
> # The amount of parallelism as a setting to the executor. This defines
> # the max number of task instances that should run simultaneously
> # on this airflow installation
> parallelism = 64
> # The number of task instances allowed to run concurrently by the scheduler
> dag_concurrency = 64
> # The maximum number of active DAG runs per DAG
> max_active_runs_per_dag = 1
> [celery]
> # This section only applies if you are using the CeleryExecutor in
> # [core] section above
> # The app name that will be used by celery
> celery_app_name = airflow.executors.celery_executor
> # The concurrency that will be used when starting workers with the
> # "airflow worker" command. This defines the number of task instances that
> # a worker will take, so size up your workers based on the resources on
> # your worker box and the nature of your tasks
> celeryd_concurrency = 64
> [scheduler]
> # Task instances listen for external kill signal (when you clear tasks
> # from the CLI or the UI), this defines the frequency at which they should
> # listen (in seconds).
> job_heartbeat_sec = 5
> # The scheduler constantly tries to trigger new tasks (look at the
> # scheduler section in the docs for more information). This defines
> # how often the scheduler should run (in seconds).
> scheduler_heartbeat_sec = 5
> {code}
> !img1.png!
> !img2.png!
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)