Sounds good. Happy this answer could help. It seems like concurrency settings are a commonly confusing topic.
*Taylor Edmiston* Blog <https://blog.tedmiston.com/> | CV <https://stackoverflow.com/cv/taylor> | LinkedIn <https://www.linkedin.com/in/tedmiston/> | AngelList <https://angel.co/taylor> | Stack Overflow <https://stackoverflow.com/users/149428/taylor-edmiston> On Thu, Jun 28, 2018 at 2:40 PM, PAULI, KEVIN CHRISTIAN [AG/1000] < kevin.christian.pa...@monsanto.com> wrote: > Thanks. Through trial and error I learned that I need to set all three of > these: > > - AIRFLOW__CORE__PARALLELISM=10000 > - AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=10000 > - AIRFLOW__CORE__DAG_CONCURRENCY=10000 > > With only these two enabled, I can get to 10K but it is very slow, only > adding 100 new tasks in bursts every 30 seconds, in a stair-step fashion: > - AIRFLOW__CORE__PARALLELISM=10000 > - AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=10000 > > If I only enable these two, it is the same "stair-step" pattern, with 128 > added every 30 seconds: > > - AIRFLOW__CORE__PARALLELISM=10000 > - AIRFLOW__CORE__DAG_CONCURRENCY=10000 > > But if I set all three, it does add 10K to the queue in one shot. > > -- > Regards, > Kevin Pauli > > On 6/27/18, 4:41 PM, "Taylor Edmiston" <tedmis...@gmail.com> wrote: > > It sounds like you may be getting bottlenecked by executor concurrency > settings. > > Are you using default values for the other concurrency settings, > specifically the ones > <https://stackoverflow.com/questions/50737800/how-many- > tasks-can-be-scheduled-in-a-single-airflow-dag/50743825#50743825> > mentioned here? If you increase the other ones to be very high as > well, do > you still experience the issue? > > Taylor > > *Taylor Edmiston* > Blog <https://blog.tedmiston.com/> | CV > <https://stackoverflow.com/cv/taylor> | LinkedIn > <https://www.linkedin.com/in/tedmiston/> | AngelList > <https://angel.co/taylor> | Stack Overflow > <https://stackoverflow.com/users/149428/taylor-edmiston> > > > On Mon, Jun 25, 2018 at 1:40 PM, PAULI, KEVIN CHRISTIAN [AG/1000] < > kevin.christian.pa...@monsanto.com> wrote: > > > Greetings Airflowers. I'm evaluating Airflow 1.9.0 for our > distributed > > orchestration needs (using CeleryExecutor and RabbitMQ), and I am > seeing > > something strange. > > > > I made a dag that has three main stages: 1) start, 2) fan out and > run N > > tasks concurrently, 3) finish. > > > > N can be large, maybe up to 10K. I would expect to see N tasks get > dumped > > onto the Rabbit queue when stage 2 begins. Instead I am seeing only > a few > > hundred added at a time. As the workers process the tasks and the > queue > > gets smaller, then more get added to Celery/Rabbit. Eventually, it > does > > finish, however I would really prefer that it dump ALL the work (all > 10K > > tasks) into Celery immediately, for two reasons: > > > > > > 1. The current way makes the scheduler long-lived and stateful. > The > > scheduler might die after only 5K have completed, in which case the > > remaining 5K tasks would never get added (I verified this) > > 2. I want to use the size of the Rabbit queue as metric to trigger > > autoscaling events to add more workers. So I need a true picture of > how > > much outstanding work remains (10K, not a few hundred) > > > > I assume the scheduler has some kind of throttle that keeps it from > > dumping all 10K messages simultaneously? If so is this configurable? > > > > FYI I have already set “parallelism” to 10K in the airflow.cfg > > > > Here is my test dag: > > > > # This dag tests how well airflow fans out > > > > from airflow import DAG > > from datetime import datetime, timedelta > > > > from airflow.operators.bash_operator import BashOperator > > > > default_args = { > > 'owner': 'airflow', > > 'depends_on_past': False, > > 'start_date': datetime(2015, 6, 1), > > 'email': ['airf...@example.com<mailto:airf...@example.com>'], > > 'email_on_failure': False, > > 'email_on_retry': False, > > 'retries': 1, > > 'retry_delay': timedelta(minutes=5), > > } > > > > dag = DAG('fan_out', default_args=default_args, > schedule_interval=None) > > > > num_tasks = 1000 > > > > starting = BashOperator( > > task_id='starting', > > bash_command='echo starting', > > dag=dag > > ) > > > > all_done = BashOperator( > > task_id='all_done', > > bash_command='echo all done', > > dag=dag) > > > > for i in range(0, num_tasks): > > task = BashOperator( > > task_id='say_hello_' + str(i), > > bash_command='echo hello world', > > dag=dag) > > task.set_upstream(starting) > > task.set_downstream(all_done) > > > > > > > > -- > > Regards, > > Kevin Pauli > > > > This email and any attachments were sent from a Monsanto email > account and > > may contain confidential and/or privileged information. If you are > not the > > intended recipient, please contact the sender and delete this email > and any > > attachments immediately. Any unauthorized use, including disclosing, > > printing, storing, copying or distributing this email, is > prohibited. All > > emails and attachments sent to or from Monsanto email accounts may be > > subject to monitoring, reading, and archiving by Monsanto, including > its > > affiliates and subsidiaries, as permitted by applicable law. Thank > you. > > > > > This email and any attachments were sent from a Monsanto email account and > may contain confidential and/or privileged information. If you are not the > intended recipient, please contact the sender and delete this email and any > attachments immediately. Any unauthorized use, including disclosing, > printing, storing, copying or distributing this email, is prohibited. All > emails and attachments sent to or from Monsanto email accounts may be > subject to monitoring, reading, and archiving by Monsanto, including its > affiliates and subsidiaries, as permitted by applicable law. Thank you. >