Cross-linking this discussion with Stack Overflow where this question was also posted. The answer there is similar to my reply above.
https://stackoverflow.com/questions/51051978/how-to-get-airf low-to-add-thousands-of-tasks-to-celery-at-one-time *Taylor Edmiston* Blog <https://blog.tedmiston.com/> | CV <https://stackoverflow.com/cv/taylor> | LinkedIn <https://www.linkedin.com/in/tedmiston/> | AngelList <https://angel.co/taylor> | Stack Overflow <https://stackoverflow.com/users/149428/taylor-edmiston> On Wed, Jun 27, 2018 at 5:39 PM, Taylor Edmiston <tedmis...@gmail.com> wrote: > It sounds like you may be getting bottlenecked by executor concurrency > settings. > > Are you using default values for the other concurrency settings, > specifically the ones > <https://stackoverflow.com/questions/50737800/how-many-tasks-can-be-scheduled-in-a-single-airflow-dag/50743825#50743825> > mentioned here? If you increase the other ones to be very high as well, do > you still experience the issue? > > Taylor > > *Taylor Edmiston* > Blog <https://blog.tedmiston.com/> | CV > <https://stackoverflow.com/cv/taylor> | LinkedIn > <https://www.linkedin.com/in/tedmiston/> | AngelList > <https://angel.co/taylor> | Stack Overflow > <https://stackoverflow.com/users/149428/taylor-edmiston> > > > On Mon, Jun 25, 2018 at 1:40 PM, PAULI, KEVIN CHRISTIAN [AG/1000] < > kevin.christian.pa...@monsanto.com> wrote: > >> Greetings Airflowers. I'm evaluating Airflow 1.9.0 for our distributed >> orchestration needs (using CeleryExecutor and RabbitMQ), and I am seeing >> something strange. >> >> I made a dag that has three main stages: 1) start, 2) fan out and run N >> tasks concurrently, 3) finish. >> >> N can be large, maybe up to 10K. I would expect to see N tasks get >> dumped onto the Rabbit queue when stage 2 begins. Instead I am seeing only >> a few hundred added at a time. As the workers process the tasks and the >> queue gets smaller, then more get added to Celery/Rabbit. Eventually, it >> does finish, however I would really prefer that it dump ALL the work (all >> 10K tasks) into Celery immediately, for two reasons: >> >> >> 1. The current way makes the scheduler long-lived and stateful. The >> scheduler might die after only 5K have completed, in which case the >> remaining 5K tasks would never get added (I verified this) >> 2. I want to use the size of the Rabbit queue as metric to trigger >> autoscaling events to add more workers. So I need a true picture of how >> much outstanding work remains (10K, not a few hundred) >> >> I assume the scheduler has some kind of throttle that keeps it from >> dumping all 10K messages simultaneously? If so is this configurable? >> >> FYI I have already set “parallelism” to 10K in the airflow.cfg >> >> Here is my test dag: >> >> # This dag tests how well airflow fans out >> >> from airflow import DAG >> from datetime import datetime, timedelta >> >> from airflow.operators.bash_operator import BashOperator >> >> default_args = { >> 'owner': 'airflow', >> 'depends_on_past': False, >> 'start_date': datetime(2015, 6, 1), >> 'email': ['airf...@example.com<mailto:airf...@example.com>'], >> 'email_on_failure': False, >> 'email_on_retry': False, >> 'retries': 1, >> 'retry_delay': timedelta(minutes=5), >> } >> >> dag = DAG('fan_out', default_args=default_args, schedule_interval=None) >> >> num_tasks = 1000 >> >> starting = BashOperator( >> task_id='starting', >> bash_command='echo starting', >> dag=dag >> ) >> >> all_done = BashOperator( >> task_id='all_done', >> bash_command='echo all done', >> dag=dag) >> >> for i in range(0, num_tasks): >> task = BashOperator( >> task_id='say_hello_' + str(i), >> bash_command='echo hello world', >> dag=dag) >> task.set_upstream(starting) >> task.set_downstream(all_done) >> >> >> >> -- >> Regards, >> Kevin Pauli >> >> This email and any attachments were sent from a Monsanto email account >> and may contain confidential and/or privileged information. If you are not >> the intended recipient, please contact the sender and delete this email and >> any attachments immediately. Any unauthorized use, including disclosing, >> printing, storing, copying or distributing this email, is prohibited. All >> emails and attachments sent to or from Monsanto email accounts may be >> subject to monitoring, reading, and archiving by Monsanto, including its >> affiliates and subsidiaries, as permitted by applicable law. Thank you. >> > >