[ 
https://issues.apache.org/jira/browse/AIRFLOW-6454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17012370#comment-17012370
 ] 

Kamil Bregula commented on AIRFLOW-6454:
----------------------------------------

My team wants to start working on improving performance in the near future. 
This case will definitely be one that we will consider.  One of the reasons for 
a long time between performing subsequent tasks is a long time of running 
processes. We must isolate tasks in separate processes. Unfortunately, previous 
versions spawn new processes instead of forking, which meant that all modules 
were reloaded. It's is an expensive operation. 

> add test for time taken by scheduler to run dag of diff num of tasks (2 vs 20 
> vs 200 vs 2000 vs 20000 simple 1 line print tasks)
> --------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-6454
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6454
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: tests
>    Affects Versions: 1.10.7
>            Reporter: t oo
>            Priority: Major
>
> *LUIGI* vs *AIRFLOW*
>  
> 200 sequential tasks (so no parallelism):
>  
> +LUIGI:+
>  mkdir -p test_output8
> pip install luigi
> #no need to start web server, scheduler or meta db
>  #*8.3secs* total time for all 200
>  time python3 -m luigi --module cloop --local-scheduler ManyMany
>  
> +AIRFLOW:+
>  #*1032 sec* total time for all 200, .16s per task but 5sec gap between tasks
>  #intention was for tasks in the DAG to be completely sequential ie task 3 
> must wait for task 2 which must wait for task 1..etc but chain() not working 
> as intended?? so used default_pool=1
> airflow initdb
> nohup airflow webserver -p 8080 &
> nohup airflow scheduler  &
> airflow trigger_dag looper2
>  #look at dagrun start-endtime
>  
> cloop.py
> {code:java}
> import os
> #import time
> import luigi
> # To run:
> # cd ~/luigi_workflows
> # pythonpath=.. luigi --module=luigi_workflows.test_resources ManyMany 
> --workers=100
> class Sleep(luigi.Task):
>     #resources = {'foo': 10}
>     fname = luigi.Parameter()
>     def requires(self):
>         #print(self)
>         zin=self.fname
>         ii=int(zin.split('_')[1])
>         if ii > 1:
>             return Sleep(fname='marker_{}'.format(ii-1))
>         else:
>             []
>     def full_path(self):
>         return os.path.join(os.path.dirname(os.path.realpath(__file__)), 
> 'test_output8', self.fname)
>     def run(self):
>         #time.sleep(1)
>         with open(self.full_path(), 'w') as f:
>             print('', file=f)
>     def output(self):
>         return luigi.LocalTarget(self.full_path())
> class Many(luigi.WrapperTask):
>     n = luigi.IntParameter()
>     def requires(self):
>         for i in range(self.n):
>             yield Sleep(fname='marker_{}'.format(i))
> class ManyMany(luigi.WrapperTask):
>     n = luigi.IntParameter(default=200)
>     def requires(self):
>         for i in range(self.n):
>             yield Many(n=self.n)
> {code}
> looper2.py
> {code:java}
> import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.utils.helpers import chain
> args = {
>     'owner': 'airflow',
>     'retries': 3,
>     'start_date': airflow.utils.dates.days_ago(2)
> }
> dag = DAG(
>     dag_id='looper2', default_args=args,
>     schedule_interval=None)
> chain([DummyOperator(task_id='op' + str(i), dag=dag) for i in range(1, 201)])
> if __name__ == "__main__":
>     dag.cli()
> {code}
> I saw similar test in 
> https://github.com/apache/airflow/pull/5096 but it did not seem to be 
> sequential or using scheduler
> Possible test scenarios:
> 1. 1 DAG with 200 tasks running sequentially
> 2. 1 DAG with 200 tasks running all in parallel (200 slots)
> 3. 1 DAG with 200 tasks running all in parallel (48 slots)
> 4. 200 DAGs each with 1 task
> Then repeat above changing 200 to 2000 or 20.etc



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to