Greetings Airflowers.  I'm evaluating Airflow 1.9.0 for our distributed 
orchestration needs (using CeleryExecutor and RabbitMQ), and I am seeing 
something strange.

I made a dag that has three main stages: 1) start, 2) fan out and run N tasks 
concurrently, 3) finish.

N can be large, maybe up to 10K.  I would expect to see N tasks get dumped onto 
the Rabbit queue when stage 2 begins.  Instead I am seeing only a few hundred 
added at a time.  As the workers process the tasks and the queue gets smaller, 
then more get added to Celery/Rabbit.  Eventually, it does finish, however I 
would really prefer that it dump ALL the work (all 10K tasks) into Celery 
immediately, for two reasons:


  1.  The current way makes the scheduler long-lived and stateful.  The 
scheduler might die after only 5K have completed, in which case the remaining 
5K tasks would never get added (I verified this)
  2.  I want to use the size of the Rabbit queue as metric to trigger 
autoscaling events to add more workers.  So I need a true picture of how much 
outstanding work remains (10K, not a few hundred)

I assume the scheduler has some kind of throttle that keeps it from dumping all 
10K messages simultaneously?  If so is this configurable?

FYI I have already set “parallelism” to 10K in the airflow.cfg

Here is my test dag:

# This dag tests how well airflow fans out

from airflow import DAG
from datetime import datetime, timedelta

from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airf...@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('fan_out', default_args=default_args, schedule_interval=None)

num_tasks = 1000

starting = BashOperator(
    task_id='starting',
    bash_command='echo starting',
    dag=dag
)

all_done = BashOperator(
    task_id='all_done',
    bash_command='echo all done',
    dag=dag)

for i in range(0, num_tasks):
    task = BashOperator(
        task_id='say_hello_' + str(i),
        bash_command='echo hello world',
        dag=dag)
    task.set_upstream(starting)
    task.set_downstream(all_done)



--
Regards,
Kevin Pauli

This email and any attachments were sent from a Monsanto email account and may 
contain confidential and/or privileged information. If you are not the intended 
recipient, please contact the sender and delete this email and any attachments 
immediately. Any unauthorized use, including disclosing, printing, storing, 
copying or distributing this email, is prohibited. All emails and attachments 
sent to or from Monsanto email accounts may be subject to monitoring, reading, 
and archiving by Monsanto, including its affiliates and subsidiaries, as 
permitted by applicable law. Thank you.

Reply via email to