Greetings Airflowers. I'm evaluating Airflow 1.9.0 for our distributed orchestration needs (using CeleryExecutor and RabbitMQ), and I am seeing something strange.
I made a dag that has three main stages: 1) start, 2) fan out and run N tasks concurrently, 3) finish. N can be large, maybe up to 10K. I would expect to see N tasks get dumped onto the Rabbit queue when stage 2 begins. Instead I am seeing only a few hundred added at a time. As the workers process the tasks and the queue gets smaller, then more get added to Celery/Rabbit. Eventually, it does finish, however I would really prefer that it dump ALL the work (all 10K tasks) into Celery immediately, for two reasons: 1. The current way makes the scheduler long-lived and stateful. The scheduler might die after only 5K have completed, in which case the remaining 5K tasks would never get added (I verified this) 2. I want to use the size of the Rabbit queue as metric to trigger autoscaling events to add more workers. So I need a true picture of how much outstanding work remains (10K, not a few hundred) I assume the scheduler has some kind of throttle that keeps it from dumping all 10K messages simultaneously? If so is this configurable? FYI I have already set “parallelism” to 10K in the airflow.cfg Here is my test dag: # This dag tests how well airflow fans out from airflow import DAG from datetime import datetime, timedelta from airflow.operators.bash_operator import BashOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 6, 1), 'email': ['airf...@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG('fan_out', default_args=default_args, schedule_interval=None) num_tasks = 1000 starting = BashOperator( task_id='starting', bash_command='echo starting', dag=dag ) all_done = BashOperator( task_id='all_done', bash_command='echo all done', dag=dag) for i in range(0, num_tasks): task = BashOperator( task_id='say_hello_' + str(i), bash_command='echo hello world', dag=dag) task.set_upstream(starting) task.set_downstream(all_done) -- Regards, Kevin Pauli This email and any attachments were sent from a Monsanto email account and may contain confidential and/or privileged information. If you are not the intended recipient, please contact the sender and delete this email and any attachments immediately. Any unauthorized use, including disclosing, printing, storing, copying or distributing this email, is prohibited. All emails and attachments sent to or from Monsanto email accounts may be subject to monitoring, reading, and archiving by Monsanto, including its affiliates and subsidiaries, as permitted by applicable law. Thank you.