Cross-linking this discussion with Stack Overflow where this question was
also posted.  The answer there is similar to my reply above.

https://stackoverflow.com/questions/51051978/how-to-get-airf
low-to-add-thousands-of-tasks-to-celery-at-one-time

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | CV
<https://stackoverflow.com/cv/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston>


On Wed, Jun 27, 2018 at 5:39 PM, Taylor Edmiston <tedmis...@gmail.com>
wrote:

> It sounds like you may be getting bottlenecked by executor concurrency
> settings.
>
> Are you using default values for the other concurrency settings,
> specifically the ones
> <https://stackoverflow.com/questions/50737800/how-many-tasks-can-be-scheduled-in-a-single-airflow-dag/50743825#50743825>
> mentioned here?  If you increase the other ones to be very high as well, do
> you still experience the issue?
>
> Taylor
>
> *Taylor Edmiston*
> Blog <https://blog.tedmiston.com/> | CV
> <https://stackoverflow.com/cv/taylor> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | AngelList
> <https://angel.co/taylor> | Stack Overflow
> <https://stackoverflow.com/users/149428/taylor-edmiston>
>
>
> On Mon, Jun 25, 2018 at 1:40 PM, PAULI, KEVIN CHRISTIAN [AG/1000] <
> kevin.christian.pa...@monsanto.com> wrote:
>
>> Greetings Airflowers.  I'm evaluating Airflow 1.9.0 for our distributed
>> orchestration needs (using CeleryExecutor and RabbitMQ), and I am seeing
>> something strange.
>>
>> I made a dag that has three main stages: 1) start, 2) fan out and run N
>> tasks concurrently, 3) finish.
>>
>> N can be large, maybe up to 10K.  I would expect to see N tasks get
>> dumped onto the Rabbit queue when stage 2 begins.  Instead I am seeing only
>> a few hundred added at a time.  As the workers process the tasks and the
>> queue gets smaller, then more get added to Celery/Rabbit.  Eventually, it
>> does finish, however I would really prefer that it dump ALL the work (all
>> 10K tasks) into Celery immediately, for two reasons:
>>
>>
>>   1.  The current way makes the scheduler long-lived and stateful.  The
>> scheduler might die after only 5K have completed, in which case the
>> remaining 5K tasks would never get added (I verified this)
>>   2.  I want to use the size of the Rabbit queue as metric to trigger
>> autoscaling events to add more workers.  So I need a true picture of how
>> much outstanding work remains (10K, not a few hundred)
>>
>> I assume the scheduler has some kind of throttle that keeps it from
>> dumping all 10K messages simultaneously?  If so is this configurable?
>>
>> FYI I have already set “parallelism” to 10K in the airflow.cfg
>>
>> Here is my test dag:
>>
>> # This dag tests how well airflow fans out
>>
>> from airflow import DAG
>> from datetime import datetime, timedelta
>>
>> from airflow.operators.bash_operator import BashOperator
>>
>> default_args = {
>>     'owner': 'airflow',
>>     'depends_on_past': False,
>>     'start_date': datetime(2015, 6, 1),
>>     'email': ['airf...@example.com<mailto:airf...@example.com>'],
>>     'email_on_failure': False,
>>     'email_on_retry': False,
>>     'retries': 1,
>>     'retry_delay': timedelta(minutes=5),
>> }
>>
>> dag = DAG('fan_out', default_args=default_args, schedule_interval=None)
>>
>> num_tasks = 1000
>>
>> starting = BashOperator(
>>     task_id='starting',
>>     bash_command='echo starting',
>>     dag=dag
>> )
>>
>> all_done = BashOperator(
>>     task_id='all_done',
>>     bash_command='echo all done',
>>     dag=dag)
>>
>> for i in range(0, num_tasks):
>>     task = BashOperator(
>>         task_id='say_hello_' + str(i),
>>         bash_command='echo hello world',
>>         dag=dag)
>>     task.set_upstream(starting)
>>     task.set_downstream(all_done)
>>
>>
>>
>> --
>> Regards,
>> Kevin Pauli
>>
>> This email and any attachments were sent from a Monsanto email account
>> and may contain confidential and/or privileged information. If you are not
>> the intended recipient, please contact the sender and delete this email and
>> any attachments immediately. Any unauthorized use, including disclosing,
>> printing, storing, copying or distributing this email, is prohibited. All
>> emails and attachments sent to or from Monsanto email accounts may be
>> subject to monitoring, reading, and archiving by Monsanto, including its
>> affiliates and subsidiaries, as permitted by applicable law. Thank you.
>>
>
>

Reply via email to