kaxil closed pull request #4071: [AIRFLOW-3237] Refactor example DAGs URL: https://github.com/apache/incubator-airflow/pull/4071
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py index a5ff651d6a..68accc6317 100644 --- a/airflow/example_dags/example_bash_operator.py +++ b/airflow/example_dags/example_bash_operator.py @@ -17,48 +17,57 @@ # specific language governing permissions and limitations # under the License. -import airflow from builtins import range -from airflow.operators.bash_operator import BashOperator -from airflow.operators.dummy_operator import DummyOperator -from airflow.models import DAG from datetime import timedelta +import airflow +from airflow.models import DAG +from airflow.operators.bash_operator import BashOperator +from airflow.operators.dummy_operator import DummyOperator args = { 'owner': 'airflow', - 'start_date': airflow.utils.dates.days_ago(2) + 'start_date': airflow.utils.dates.days_ago(2), } dag = DAG( - dag_id='example_bash_operator', default_args=args, + dag_id='example_bash_operator', + default_args=args, schedule_interval='0 0 * * *', - dagrun_timeout=timedelta(minutes=60)) + dagrun_timeout=timedelta(minutes=60), +) -cmd = 'ls -l' -run_this_last = DummyOperator(task_id='run_this_last', dag=dag) +run_this_last = DummyOperator( + task_id='run_this_last', + dag=dag, +) # [START howto_operator_bash] run_this = BashOperator( - task_id='run_after_loop', bash_command='echo 1', dag=dag) + task_id='run_after_loop', + bash_command='echo 1', + dag=dag, +) # [END howto_operator_bash] -run_this.set_downstream(run_this_last) + +run_this >> run_this_last for i in range(3): - i = str(i) task = BashOperator( - task_id='runme_' + i, + task_id='runme_' + str(i), bash_command='echo "{{ task_instance_key_str }}" && sleep 1', - dag=dag) - task.set_downstream(run_this) + dag=dag, + ) + task >> run_this # [START howto_operator_bash_template] -task = BashOperator( +also_run_this = BashOperator( task_id='also_run_this', bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"', - dag=dag) + dag=dag, +) # [END howto_operator_bash_template] -task.set_downstream(run_this_last) +also_run_this >> run_this_last if __name__ == "__main__": dag.cli() diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index 45bf11f301..197d7d7a73 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -17,43 +17,53 @@ # specific language governing permissions and limitations # under the License. -import airflow -from airflow.operators.python_operator import BranchPythonOperator -from airflow.operators.dummy_operator import DummyOperator -from airflow.models import DAG import random +import airflow +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.python_operator import BranchPythonOperator args = { 'owner': 'airflow', - 'start_date': airflow.utils.dates.days_ago(2) + 'start_date': airflow.utils.dates.days_ago(2), } dag = DAG( dag_id='example_branch_operator', default_args=args, - schedule_interval="@daily") + schedule_interval="@daily", +) -cmd = 'ls -l' -run_this_first = DummyOperator(task_id='run_this_first', dag=dag) +run_this_first = DummyOperator( + task_id='run_this_first', + dag=dag, +) options = ['branch_a', 'branch_b', 'branch_c', 'branch_d'] branching = BranchPythonOperator( task_id='branching', python_callable=lambda: random.choice(options), - dag=dag) -branching.set_upstream(run_this_first) + dag=dag, +) +run_this_first >> branching join = DummyOperator( task_id='join', trigger_rule='one_success', - dag=dag + dag=dag, ) for option in options: - t = DummyOperator(task_id=option, dag=dag) - t.set_upstream(branching) - dummy_follow = DummyOperator(task_id='follow_' + option, dag=dag) - t.set_downstream(dummy_follow) - dummy_follow.set_downstream(join) + t = DummyOperator( + task_id=option, + dag=dag, + ) + + dummy_follow = DummyOperator( + task_id='follow_' + option, + dag=dag, + ) + + branching >> t >> dummy_follow >> join diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py index 7be55a5f36..950d5c1ae2 100644 --- a/airflow/example_dags/example_branch_python_dop_operator_3.py +++ b/airflow/example_dags/example_branch_python_dop_operator_3.py @@ -18,9 +18,9 @@ # under the License. import airflow -from airflow.operators.python_operator import BranchPythonOperator -from airflow.operators.dummy_operator import DummyOperator from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.python_operator import BranchPythonOperator args = { 'owner': 'airflow', @@ -31,12 +31,14 @@ # BranchPython operator that depends on past # and where tasks may run or be skipped on # alternating runs -dag = DAG(dag_id='example_branch_dop_operator_v3', - schedule_interval='*/1 * * * *', default_args=args) - +dag = DAG( + dag_id='example_branch_dop_operator_v3', + schedule_interval='*/1 * * * *', + default_args=args, +) -def should_run(ds, **kwargs): +def should_run(**kwargs): print('------------- exec dttm = {} and minute = {}'. format(kwargs['execution_date'], kwargs['execution_date'].minute)) if kwargs['execution_date'].minute % 2 == 0: @@ -49,14 +51,9 @@ def should_run(ds, **kwargs): task_id='condition', provide_context=True, python_callable=should_run, - dag=dag) - -oper_1 = DummyOperator( - task_id='oper_1', - dag=dag) -oper_1.set_upstream(cond) + dag=dag, +) -oper_2 = DummyOperator( - task_id='oper_2', - dag=dag) -oper_2.set_upstream(cond) +dummy_task_1 = DummyOperator(task_id='dummy_task_1', dag=dag) +dummy_task_2 = DummyOperator(task_id='dummy_task_2', dag=dag) +cond >> [dummy_task_1, dummy_task_2] diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py index da7ea3f218..eca528e899 100644 --- a/airflow/example_dags/example_http_operator.py +++ b/airflow/example_dags/example_http_operator.py @@ -49,14 +49,16 @@ data=json.dumps({"priority": 5}), headers={"Content-Type": "application/json"}, response_check=lambda response: True if len(response.json()) == 0 else False, - dag=dag) + dag=dag, +) t5 = SimpleHttpOperator( task_id='post_op_formenc', endpoint='nodes/url', data="name=Joe", headers={"Content-Type": "application/x-www-form-urlencoded"}, - dag=dag) + dag=dag, +) t2 = SimpleHttpOperator( task_id='get_op', @@ -64,7 +66,8 @@ endpoint='api/v1.0/nodes', data={"param1": "value1", "param2": "value2"}, headers={}, - dag=dag) + dag=dag, +) t3 = SimpleHttpOperator( task_id='put_op', @@ -72,7 +75,8 @@ endpoint='api/v1.0/nodes', data=json.dumps({"priority": 5}), headers={"Content-Type": "application/json"}, - dag=dag) + dag=dag, +) t4 = SimpleHttpOperator( task_id='del_op', @@ -80,7 +84,8 @@ endpoint='api/v1.0/nodes', data="some=data", headers={"Content-Type": "application/x-www-form-urlencoded"}, - dag=dag) + dag=dag, +) sensor = HttpSensor( task_id='http_sensor_check', @@ -89,10 +94,7 @@ request_params={}, response_check=lambda response: True if "Google" in response.content else False, poke_interval=5, - dag=dag) + dag=dag, +) -t1.set_upstream(sensor) -t2.set_upstream(t1) -t3.set_upstream(t2) -t4.set_upstream(t3) -t5.set_upstream(t4) +sensor >> t1 >> t2 >> t3 >> t4 >> t5 diff --git a/airflow/example_dags/example_latest_only.py b/airflow/example_dags/example_latest_only.py index fdb2dca490..635a764198 100644 --- a/airflow/example_dags/example_latest_only.py +++ b/airflow/example_dags/example_latest_only.py @@ -33,6 +33,6 @@ ) latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag) - task1 = DummyOperator(task_id='task1', dag=dag) -task1.set_upstream(latest_only) + +latest_only >> task1 diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py index b8f4811c1a..3559afb0c8 100644 --- a/airflow/example_dags/example_latest_only_with_trigger.py +++ b/airflow/example_dags/example_latest_only_with_trigger.py @@ -34,15 +34,10 @@ ) latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag) - task1 = DummyOperator(task_id='task1', dag=dag) -task1.set_upstream(latest_only) - task2 = DummyOperator(task_id='task2', dag=dag) - task3 = DummyOperator(task_id='task3', dag=dag) -task3.set_upstream([task1, task2]) +task4 = DummyOperator(task_id='task4', dag=dag, trigger_rule=TriggerRule.ALL_DONE) -task4 = DummyOperator(task_id='task4', dag=dag, - trigger_rule=TriggerRule.ALL_DONE) -task4.set_upstream([task1, task2]) +latest_only >> task1 >> [task3, task4] +task2 >> [task3, task4] diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py index 7efca2f3b0..2aef593abf 100644 --- a/airflow/example_dags/example_passing_params_via_test_command.py +++ b/airflow/example_dags/example_passing_params_via_test_command.py @@ -18,18 +18,21 @@ # under the License. from datetime import timedelta + import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator - -dag = DAG("example_passing_params_via_test_command", - default_args={"owner": "airflow", - "start_date": airflow.utils.dates.days_ago(1)}, - schedule_interval='*/1 * * * *', - dagrun_timeout=timedelta(minutes=4) - ) +dag = DAG( + "example_passing_params_via_test_command", + default_args={ + "owner": "airflow", + "start_date": airflow.utils.dates.days_ago(1), + }, + schedule_interval='*/1 * * * *', + dagrun_timeout=timedelta(minutes=4), +) def my_py_command(ds, **kwargs): @@ -54,12 +57,14 @@ def my_py_command(ds, **kwargs): provide_context=True, python_callable=my_py_command, params={"miff": "agg"}, - dag=dag) - + dag=dag, +) also_run_this = BashOperator( task_id='also_run_this', bash_command=my_templated_command, params={"miff": "agg"}, - dag=dag) -also_run_this.set_upstream(run_this) + dag=dag, +) + +run_this >> also_run_this diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index 6c4874fc87..254fb3e26c 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -18,23 +18,25 @@ # under the License. from __future__ import print_function -from builtins import range -import airflow -from airflow.operators.python_operator import PythonOperator -from airflow.models import DAG import time +from builtins import range from pprint import pprint +import airflow +from airflow.models import DAG +from airflow.operators.python_operator import PythonOperator args = { 'owner': 'airflow', - 'start_date': airflow.utils.dates.days_ago(2) + 'start_date': airflow.utils.dates.days_ago(2), } dag = DAG( - dag_id='example_python_operator', default_args=args, - schedule_interval=None) + dag_id='example_python_operator', + default_args=args, + schedule_interval=None, +) # [START howto_operator_python] @@ -48,7 +50,8 @@ def print_context(ds, **kwargs): task_id='print_the_context', provide_context=True, python_callable=print_context, - dag=dag) + dag=dag, +) # [END howto_operator_python] @@ -64,7 +67,8 @@ def my_sleeping_function(random_base): task_id='sleep_for_' + str(i), python_callable=my_sleeping_function, op_kwargs={'random_base': float(i) / 10}, - dag=dag) + dag=dag, + ) - task.set_upstream(run_this) + run_this >> task # [END howto_operator_python_kwargs] diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py index 8e9565df91..1093dab616 100644 --- a/airflow/example_dags/example_short_circuit_operator.py +++ b/airflow/example_dags/example_short_circuit_operator.py @@ -17,25 +17,29 @@ # specific language governing permissions and limitations # under the License. -import airflow -from airflow.operators.python_operator import ShortCircuitOperator -from airflow.operators.dummy_operator import DummyOperator -from airflow.models import DAG import airflow.utils.helpers - +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.python_operator import ShortCircuitOperator args = { 'owner': 'airflow', - 'start_date': airflow.utils.dates.days_ago(2) + 'start_date': airflow.utils.dates.days_ago(2), } dag = DAG(dag_id='example_short_circuit_operator', default_args=args) cond_true = ShortCircuitOperator( - task_id='condition_is_True', python_callable=lambda: True, dag=dag) + task_id='condition_is_True', + python_callable=lambda: True, + dag=dag, +) cond_false = ShortCircuitOperator( - task_id='condition_is_False', python_callable=lambda: False, dag=dag) + task_id='condition_is_False', + python_callable=lambda: False, + dag=dag, +) ds_true = [DummyOperator(task_id='true_' + str(i), dag=dag) for i in [1, 2]] ds_false = [DummyOperator(task_id='false_' + str(i), dag=dag) for i in [1, 2]] diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py index f11ca59338..456eb911dc 100644 --- a/airflow/example_dags/example_skip_dag.py +++ b/airflow/example_dags/example_skip_dag.py @@ -18,14 +18,13 @@ # under the License. import airflow -from airflow.operators.dummy_operator import DummyOperator -from airflow.models import DAG from airflow.exceptions import AirflowSkipException - +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator args = { 'owner': 'airflow', - 'start_date': airflow.utils.dates.days_ago(2) + 'start_date': airflow.utils.dates.days_ago(2), } @@ -37,23 +36,17 @@ def execute(self, context): raise AirflowSkipException -dag = DAG(dag_id='example_skip_dag', default_args=args) - - def create_test_pipeline(suffix, trigger_rule, dag): - skip_operator = DummySkipOperator(task_id='skip_operator_{}'.format(suffix), dag=dag) - always_true = DummyOperator(task_id='always_true_{}'.format(suffix), dag=dag) - join = DummyOperator(task_id=trigger_rule, dag=dag, trigger_rule=trigger_rule) - - join.set_upstream(skip_operator) - join.set_upstream(always_true) - final = DummyOperator(task_id='final_{}'.format(suffix), dag=dag) - final.set_upstream(join) + + skip_operator >> join + always_true >> join + join >> final +dag = DAG(dag_id='example_skip_dag', default_args=args) create_test_pipeline('1', 'all_success', dag) create_test_pipeline('2', 'one_success', dag) diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py index ffd254b19a..98386ba454 100644 --- a/airflow/example_dags/example_subdag_operator.py +++ b/airflow/example_dags/example_subdag_operator.py @@ -18,14 +18,11 @@ # under the License. import airflow - +from airflow.example_dags.subdags.subdag import subdag from airflow.models import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.subdag_operator import SubDagOperator -from airflow.example_dags.subdags.subdag import subdag - - DAG_NAME = 'example_subdag_operator' args = { @@ -71,7 +68,4 @@ dag=dag, ) -start.set_downstream(section_1) -section_1.set_downstream(some_other_task) -some_other_task.set_downstream(section_2) -section_2.set_downstream(end) +start >> section_1 >> some_other_task >> section_2 >> end diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py index f5c7218239..35e7184f76 100644 --- a/airflow/example_dags/example_trigger_controller_dag.py +++ b/airflow/example_dags/example_trigger_controller_dag.py @@ -33,11 +33,11 @@ 2. A Target DAG : c.f. example_trigger_target_dag.py """ -from airflow import DAG -from airflow.operators.dagrun_operator import TriggerDagRunOperator +import pprint from datetime import datetime -import pprint +from airflow import DAG +from airflow.operators.dagrun_operator import TriggerDagRunOperator pp = pprint.PrettyPrinter(indent=4) @@ -53,16 +53,20 @@ def conditionally_trigger(context, dag_run_obj): # Define the DAG -dag = DAG(dag_id='example_trigger_controller_dag', - default_args={"owner": "airflow", - "start_date": datetime.utcnow()}, - schedule_interval='@once') - +dag = DAG( + dag_id='example_trigger_controller_dag', + default_args={ + "owner": "airflow", + "start_date": datetime.utcnow(), + }, + schedule_interval='@once', +) # Define the single task in this controller example DAG -trigger = TriggerDagRunOperator(task_id='test_trigger_dagrun', - trigger_dag_id="example_trigger_target_dag", - python_callable=conditionally_trigger, - params={'condition_param': True, - 'message': 'Hello World'}, - dag=dag) +trigger = TriggerDagRunOperator( + task_id='test_trigger_dagrun', + trigger_dag_id="example_trigger_target_dag", + python_callable=conditionally_trigger, + params={'condition_param': True, 'message': 'Hello World'}, + dag=dag, +) diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py index 7a656f2859..c1403a60e1 100644 --- a/airflow/example_dags/example_trigger_target_dag.py +++ b/airflow/example_dags/example_trigger_target_dag.py @@ -17,12 +17,13 @@ # specific language governing permissions and limitations # under the License. +import pprint +from datetime import datetime + +from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator -from airflow.models import DAG -from datetime import datetime -import pprint pp = pprint.PrettyPrinter(indent=4) # This example illustrates the use of the TriggerDagRunOperator. There are 2 @@ -50,7 +51,8 @@ dag = DAG( dag_id='example_trigger_target_dag', default_args=args, - schedule_interval=None) + schedule_interval=None, +) def run_this_func(ds, **kwargs): @@ -62,12 +64,13 @@ def run_this_func(ds, **kwargs): task_id='run_this', provide_context=True, python_callable=run_this_func, - dag=dag) - + dag=dag, +) # You can also access the DagRun object in templates bash_task = BashOperator( task_id="bash_task", bash_command='echo "Here is the message: ' '{{ dag_run.conf["message"] if dag_run else "" }}" ', - dag=dag) + dag=dag, +) diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py index 66bec9a780..f2b7627aca 100644 --- a/airflow/example_dags/example_xcom.py +++ b/airflow/example_dags/example_xcom.py @@ -17,6 +17,7 @@ # specific language governing permissions and limitations # under the License. from __future__ import print_function + import airflow from airflow import DAG from airflow.operators.python_operator import PythonOperator @@ -24,25 +25,22 @@ args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(2), - 'provide_context': True + 'provide_context': True, } -dag = DAG( - 'example_xcom', - schedule_interval="@once", - default_args=args) +dag = DAG('example_xcom', schedule_interval="@once", default_args=args) value_1 = [1, 2, 3] value_2 = {'a': 'b'} def push(**kwargs): - # pushes an XCom without a specific target + """Pushes an XCom without a specific target""" kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1) def push_by_returning(**kwargs): - # pushes an XCom without a specific target, just by returning it + """Pushes an XCom without a specific target, just by returning it""" return value_2 @@ -63,12 +61,21 @@ def puller(**kwargs): push1 = PythonOperator( - task_id='push', dag=dag, python_callable=push) + task_id='push', + dag=dag, + python_callable=push, +) push2 = PythonOperator( - task_id='push_by_returning', dag=dag, python_callable=push_by_returning) + task_id='push_by_returning', + dag=dag, + python_callable=push_by_returning, +) pull = PythonOperator( - task_id='puller', dag=dag, python_callable=puller) + task_id='puller', + dag=dag, + python_callable=puller, +) -pull.set_upstream([push1, push2]) +pull << [push1, push2] diff --git a/airflow/example_dags/test_utils.py b/airflow/example_dags/test_utils.py index fb8792a1bf..3fc8af1dad 100644 --- a/airflow/example_dags/test_utils.py +++ b/airflow/example_dags/test_utils.py @@ -18,17 +18,15 @@ # under the License. """Used for unit tests""" import airflow -from airflow.operators.bash_operator import BashOperator from airflow.models import DAG +from airflow.operators.bash_operator import BashOperator -dag = DAG( - dag_id='test_utils', - schedule_interval=None, -) +dag = DAG(dag_id='test_utils', schedule_interval=None) task = BashOperator( task_id='sleeps_forever', dag=dag, bash_command="sleep 10000000000", start_date=airflow.utils.dates.days_ago(2), - owner='airflow') + owner='airflow', +) diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py index ad817338ef..ccf2e6e2ee 100644 --- a/airflow/example_dags/tutorial.py +++ b/airflow/example_dags/tutorial.py @@ -22,14 +22,14 @@ Documentation that goes along with the Airflow tutorial located [here](https://airflow.incubator.apache.org/tutorial.html) """ +from datetime import timedelta + import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator -from datetime import timedelta - -# these args will get passed on to each operator -# you can override them on a per-task basis during operator initialization +# These args will get passed on to each operator +# You can override them on a per-task basis during operator initialization default_args = { 'owner': 'airflow', 'depends_on_past': False, @@ -58,13 +58,15 @@ 'tutorial', default_args=default_args, description='A simple tutorial DAG', - schedule_interval=timedelta(days=1)) + schedule_interval=timedelta(days=1), +) # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( task_id='print_date', bash_command='date', - dag=dag) + dag=dag, +) t1.doc_md = """\ #### Task Documentation @@ -80,7 +82,8 @@ task_id='sleep', depends_on_past=False, bash_command='sleep 5', - dag=dag) + dag=dag, +) templated_command = """ {% for i in range(5) %} @@ -95,7 +98,7 @@ depends_on_past=False, bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, - dag=dag) + dag=dag, +) -t2.set_upstream(t1) -t3.set_upstream(t1) +t1 >> [t2, t3] diff --git a/docs/tutorial.rst b/docs/tutorial.rst index 0ea58d2784..570cd75c80 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -247,23 +247,36 @@ in templates, make sure to read through the :ref:`macros` section Setting up Dependencies ----------------------- -We have two simple tasks that do not depend on each other. Here's a few ways +We have tasks `t1`, `t2` and `t3` that do not depend on each other. Here's a few ways you can define dependencies between them: .. code:: python - t2.set_upstream(t1) + t1.set_downstream(t2) # This means that t2 will depend on t1 - # running successfully to run - # It is equivalent to - # t1.set_downstream(t2) + # running successfully to run. + # It is equivalent to: + t2.set_upstream(t1) - t3.set_upstream(t1) + # The bit shift operator can also be + # used to chain operations: + t1 >> t2 + + # And the upstream dependency with the + # bit shift operator: + t2 << t1 + + # Chaining multiple dependencies becomes + # concise with the bit shift operator: + t1 >> t2 >> t3 - # all of this is equivalent to - # dag.set_dependency('print_date', 'sleep') - # dag.set_dependency('print_date', 'templated') + # A list of tasks can also be set as + # dependencies. These operations + # all have the same effect: + t1.set_downstream([t2, t3]) + t1 >> [t2, t3] + [t2, t3] << t1 Note that when executing your script, Airflow will raise exceptions when it finds cycles in your DAG or when a dependency is referenced more ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services