[ https://issues.apache.org/jira/browse/AIRFLOW-6454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
t oo updated AIRFLOW-6454: -------------------------- Description: *LUIGI* vs *AIRFLOW* 200 sequential tasks: LUIGI: mkdir -p test_output8 #*8.3secs* total time for all 200 (from Downloads folder) time python3 -m luigi --module cloop --local-scheduler ManyMany AIRFLOW: #*1032 sec* total time for all 200, .16s per task but 5sec gap between tasks #intention was for tasks in the DAG to be completely sequential ie task 3 must wait for task 2 which must wait for task 1..etc but chain() not working as intended?? airflow trigger_dag looper2 #look at dagrun start-endtime cloop.py `import os #import time import luigi # To run: # cd ~/luigi_workflows # pythonpath=.. luigi --module=luigi_workflows.test_resources ManyMany --workers=100 class Sleep(luigi.Task): #resources = {'foo': 10} fname = luigi.Parameter() def requires(self): #print(self) zin=self.fname ii=int(zin.split('_')[1]) if ii > 1: return Sleep(fname='marker_{}'.format(ii-1)) else: [] def full_path(self): return os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test_output8', self.fname) def run(self): #time.sleep(1) with open(self.full_path(), 'w') as f: print('', file=f) def output(self): return luigi.LocalTarget(self.full_path()) class Many(luigi.WrapperTask): n = luigi.IntParameter() def requires(self): for i in range(self.n): yield Sleep(fname='marker_{}'.format(i)) class ManyMany(luigi.WrapperTask): n = luigi.IntParameter(default=200) def requires(self): for i in range(self.n): yield Many(n=self.n) ` looper2.py `import airflow from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator from airflow.utils.helpers import chain args = { 'owner': 'airflow', 'retries': 3, 'start_date': airflow.utils.dates.days_ago(2) } dag = DAG( dag_id='looper2', default_args=args, schedule_interval=None) chain([DummyOperator(task_id='op' + str(i), dag=dag) for i in range(1, 201)]) if __name__ == "__main__": dag.cli() ` > add test for time taken by scheduler to run dag of diff num of tasks (2 vs 20 > vs 200 vs 2000 vs 20000 simple 1 line print tasks) > -------------------------------------------------------------------------------------------------------------------------------- > > Key: AIRFLOW-6454 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6454 > Project: Apache Airflow > Issue Type: Improvement > Components: tests > Affects Versions: 1.10.7 > Reporter: t oo > Priority: Major > > *LUIGI* vs *AIRFLOW* > 200 sequential tasks: > LUIGI: > mkdir -p test_output8 > #*8.3secs* total time for all 200 (from Downloads folder) > time python3 -m luigi --module cloop --local-scheduler ManyMany > AIRFLOW: > #*1032 sec* total time for all 200, .16s per task but 5sec gap between tasks > #intention was for tasks in the DAG to be completely sequential ie task 3 > must wait for task 2 which must wait for task 1..etc but chain() not working > as intended?? > airflow trigger_dag looper2 > #look at dagrun start-endtime > cloop.py > `import os > #import time > import luigi > # To run: > # cd ~/luigi_workflows > # pythonpath=.. luigi --module=luigi_workflows.test_resources ManyMany > --workers=100 > class Sleep(luigi.Task): > #resources = {'foo': 10} > fname = luigi.Parameter() > def requires(self): > #print(self) > zin=self.fname > ii=int(zin.split('_')[1]) > if ii > 1: > return Sleep(fname='marker_{}'.format(ii-1)) > else: > [] > def full_path(self): > return os.path.join(os.path.dirname(os.path.realpath(__file__)), > 'test_output8', self.fname) > def run(self): > #time.sleep(1) > with open(self.full_path(), 'w') as f: > print('', file=f) > def output(self): > return luigi.LocalTarget(self.full_path()) > class Many(luigi.WrapperTask): > n = luigi.IntParameter() > def requires(self): > for i in range(self.n): > yield Sleep(fname='marker_{}'.format(i)) > class ManyMany(luigi.WrapperTask): > n = luigi.IntParameter(default=200) > def requires(self): > for i in range(self.n): > yield Many(n=self.n) > ` > looper2.py > `import airflow > from airflow.models import DAG > from airflow.operators.bash_operator import BashOperator > from airflow.operators.dummy_operator import DummyOperator > from airflow.utils.helpers import chain > args = { > 'owner': 'airflow', > 'retries': 3, > 'start_date': airflow.utils.dates.days_ago(2) > } > dag = DAG( > dag_id='looper2', default_args=args, > schedule_interval=None) > chain([DummyOperator(task_id='op' + str(i), dag=dag) for i in range(1, 201)]) > if __name__ == "__main__": > dag.cli() > ` -- This message was sent by Atlassian Jira (v8.3.4#803005)