t oo created AIRFLOW-6920:
-----------------------------

             Summary: AIRFLOW Feature Parity with LUIGI & CONTROLM 
                 Key: AIRFLOW-6920
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6920
             Project: Apache Airflow
          Issue Type: Improvement
          Components: tests
    Affects Versions: 1.10.7
            Reporter: t oo


*LUIGI* vs *AIRFLOW*

 

200 sequential tasks (so no parallelism):

 

+LUIGI:+
 mkdir -p test_output8
pip install luigi
#no need to start web server, scheduler or meta db
 #*8.3secs* total time for all 200
 time python3 -m luigi --module cloop --local-scheduler ManyMany

 

+AIRFLOW:+
 #*1032 sec* total time for all 200, .16s per task but 5sec gap between tasks
 #intention was for tasks in the DAG to be completely sequential ie task 3 must 
wait for task 2 which must wait for task 1..etc but chain() not working as 
intended?? so used default_pool=1
airflow initdb
nohup airflow webserver -p 8080 &
nohup airflow scheduler  &
airflow trigger_dag looper2
 #look at dagrun start-endtime

 

cloop.py
{code:java}
import os
#import time

import luigi

# To run:
# cd ~/luigi_workflows
# pythonpath=.. luigi --module=luigi_workflows.test_resources ManyMany 
--workers=100

class Sleep(luigi.Task):
    #resources = {'foo': 10}

    fname = luigi.Parameter()

    def requires(self):
        #print(self)
        zin=self.fname
        ii=int(zin.split('_')[1])
        if ii > 1:
            return Sleep(fname='marker_{}'.format(ii-1))
        else:
            []

    def full_path(self):
        return os.path.join(os.path.dirname(os.path.realpath(__file__)), 
'test_output8', self.fname)

    def run(self):
        #time.sleep(1)
        with open(self.full_path(), 'w') as f:
            print('', file=f)

    def output(self):
        return luigi.LocalTarget(self.full_path())


class Many(luigi.WrapperTask):
    n = luigi.IntParameter()

    def requires(self):
        for i in range(self.n):
            yield Sleep(fname='marker_{}'.format(i))


class ManyMany(luigi.WrapperTask):
    n = luigi.IntParameter(default=200)

    def requires(self):
        for i in range(self.n):
            yield Many(n=self.n)
{code}
looper2.py
{code:java}
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.helpers import chain

args = {
    'owner': 'airflow',
    'retries': 3,
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='looper2', default_args=args,
    schedule_interval=None)

chain([DummyOperator(task_id='op' + str(i), dag=dag) for i in range(1, 201)])

if __name__ == "__main__":
    dag.cli()
{code}

I saw similar test in 
https://github.com/apache/airflow/pull/5096 but it did not seem to be 
sequential or using scheduler



Possible test scenarios:
1. 1 DAG with 200 tasks running sequentially
2. 1 DAG with 200 tasks running all in parallel (200 slots)
3. 1 DAG with 200 tasks running all in parallel (48 slots)
4. 200 DAGs each with 1 task
Then repeat above changing 200 to 2000 or 20.etc


Qs: 
1. any plans for an 'in-memory' scheduler like Luigi's? 
2. Anyone open to a Luigi Operator? 
3. Any speedups to make existing scheduler faster? Noting that the tasks here 
are sequential (should be similar time to 200 dags of 1 task each)


ControlM comparison:
is it envisioned that airflow becomes a replacement for 
https://www.bmcsoftware.uk/it-solutions/control-m.html ?
 execution_date seems similar to Order Date, DAG seems similar to job, tasks in 
a dag seem similar to a command called by a job but some of the items I see 
missing:
    1. integrating public holiday calendars,
  2. ability to specify schedule like 11am on '2nd weekday of the month', 'last 
5 days of the month', 'last business day of the month'
  3. ability to visualise dependencies between dags (there does not seem to be 
a high level way to say at 11am schedule DAGc after DAGa and DAGb, then at 3pm 
schedule DAGd after DAGc only if DAGc was successful )
  4. ability to click 1 to many dags in a UI and change their state to 
killed/success (force ok).etc and have it instantly affect task instances (ie 
stopping them)
  5. ability to set whole DAGs to 'dummy' on certain days of the week. ie DAGb 
(runs 7 days a week and do stuff) must run after DAGa for each execdate (DAGa 
should do stuff on mon-fri but on sat/sun DAGa should 'do' nothing ie entire 
dag is 'dummy' just to satisfy 'IN condition' of DAGb)
  6. ability to change the number of tasks within a DAG for a diff exec date 
without 'stuffing' up the scheduler/metadb
  7. ability to 'order up' any day in the past/future (for all or some dags) 
and keep it on 'hold', visualise which dags 'would' be scheduled, see dag 
dependencies, and choose to run all/some (or just do nothing and delete them) 
of the DAGs while maintaining dependencies between them and optionally 'forcing 
ok' some to skip dependencies.
  8. ability to feed in conf (ie arguments) to a DAG from a UI or change the 
host the dag runs on
  9. ability to rerun an entire 'exec date' and maintain audit trail in the db 
of timings of the 1st run of that exec date, plus allow different conf on 2nd 
run.
  10. faster execution,
  a) it seems if I want 15 different dag ids of 300 tasks each and all should 
run exact same tasks (just with different conf arguments) the dagbag has to 
parse 4500 tasks instead of recognising a single set of 300 differed only by 
conf
  b) 'push' flow of tasks within a dag, rather than gaps between tasks
  c) scheduler does not get overloaded with 100k tasks 
11. dagrun timeout (without maxruns constraint)
  12. enforce depends on prior exec date of a dag with schedules that may only 
be weekly, certain days a week
  13. multi pools (ie quantitative resources) on a single dag
  14. ability to edit schedules via the UI
    15. audit trail of changes to a DAG (not tasks but things like schedule, 
runas user)

At the moment:
ControlM=Enterprise features, stability, speed but no python definitions of 
tasks
Luigi=Speed and python definitions of tasks but no scheduling
Airflow=Community momentum and python definitions of tasks but not fast and 
lacking some features of ControlM








--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to