This is an automated email from the ASF dual-hosted git repository. dstandish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new d70fecfaf6 Add initial docs for setup / teardown (#32169) d70fecfaf6 is described below commit d70fecfaf68dbc34c818290ad35f48bf01044dd9 Author: Daniel Standish <15932138+dstand...@users.noreply.github.com> AuthorDate: Thu Jul 20 13:12:18 2023 -0700 Add initial docs for setup / teardown (#32169) Co-authored-by: Akash Sharma <35839624+adave...@users.noreply.github.com> Co-authored-by: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com> Co-authored-by: Tzu-ping Chung <uranu...@gmail.com> --- .../example_setup_teardown_taskflow.py | 85 +++++----- docs/apache-airflow/core-concepts/dags.rst | 18 +- docs/apache-airflow/howto/index.rst | 1 + docs/apache-airflow/howto/setup-and-teardown.rst | 184 +++++++++++++++++++++ docs/spelling_wordlist.txt | 3 + 5 files changed, 247 insertions(+), 44 deletions(-) diff --git a/airflow/example_dags/example_setup_teardown_taskflow.py b/airflow/example_dags/example_setup_teardown_taskflow.py index 128534f1d2..4dcdbf253f 100644 --- a/airflow/example_dags/example_setup_teardown_taskflow.py +++ b/airflow/example_dags/example_setup_teardown_taskflow.py @@ -31,68 +31,75 @@ with DAG( ) as dag: @task - def task_1(): + def my_first_task(): print("Hello 1") @task - def task_2(): + def my_second_task(): print("Hello 2") @task - def task_3(): + def my_third_task(): print("Hello 3") # you can set setup / teardown relationships with the `as_teardown` method. - t1 = task_1() - t2 = task_2() - t3 = task_3() - t1 >> t2 >> t3.as_teardown(setups=t1) + task_1 = my_first_task() + task_2 = my_second_task() + task_3 = my_third_task() + task_1 >> task_2 >> task_3.as_teardown(setups=task_1) - # the method `as_teadrown` will mark t3 as teardown, t1 as setup, and arrow t1 >> t3 - # now if you clear t2 (downstream), then t1 will be cleared in addition to t3 + # The method `as_teardown` will mark task_3 as teardown, task_1 as setup, and + # arrow task_1 >> task_3. + # Now if you clear task_2, then it's setup task, task_1, will be cleared in + # addition to its teardown task, task_3 # it's also possible to use a decorator to mark a task as setup or # teardown when you define it. see below. @setup - def dag_setup(): - print("I am dag_setup") + def outer_setup(): + print("I am outer_setup") + return "some cluster id" @teardown - def dag_teardown(): - print("I am dag_teardown") + def outer_teardown(cluster_id): + print("I am outer_teardown") + print(f"Tearing down cluster: {cluster_id}") @task - def dag_normal_task(): + def outer_work(): print("I am just a normal task") - s = dag_setup() - t = dag_teardown() - - # by using the decorators, dag_setup and dag_teardown are already marked as setup / teardown - # now we just need to make sure they are linked directly - # what we need to do is this:: - # s >> t - # s >> dag_normal_task() >> t - # but we can use a context manager to make it cleaner - with s >> t: - dag_normal_task() - @task_group def section_1(): - @task - def my_setup(): + @setup + def inner_setup(): print("I set up") + return "some_cluster_id" @task - def my_teardown(): - print("I tear down") - - @task - def hello(): - print("I say hello") - - (s := my_setup()) >> hello() >> my_teardown().as_teardown(setups=s) - - # and let's put section 1 inside the "dag setup" and "dag teardown" - s >> section_1() >> t + def inner_work(cluster_id): + print(f"doing some work with {cluster_id=}") + + @teardown + def inner_teardown(cluster_id): + print(f"tearing down {cluster_id=}") + + # this passes the return value of `inner_setup` to both `inner_work` and `inner_teardown` + inner_setup_task = inner_setup() + inner_work(inner_setup_task) >> inner_teardown(inner_setup_task) + + # by using the decorators, outer_setup and outer_teardown are already marked as setup / teardown + # now we just need to make sure they are linked directly. At a low level, what we need + # to do so is the following:: + # s = outer_setup() + # t = outer_teardown() + # s >> t + # s >> outer_work() >> t + # Thus, s and t are linked directly, and outer_work runs in between. We can take advantage of + # the fact that we are in taskflow, along with the context manager on teardowns, as follows: + with outer_teardown(outer_setup()): + outer_work() + + # and let's put section 1 inside the outer setup and teardown tasks + section_1() diff --git a/docs/apache-airflow/core-concepts/dags.rst b/docs/apache-airflow/core-concepts/dags.rst index ab3e3695e8..aa95699ab8 100644 --- a/docs/apache-airflow/core-concepts/dags.rst +++ b/docs/apache-airflow/core-concepts/dags.rst @@ -299,11 +299,11 @@ Control Flow By default, a DAG will only run a Task when all the Tasks it depends on are successful. There are several ways of modifying this, however: -* :ref:`concepts:branching`, where you can select which Task to move onto based on a condition -* :ref:`concepts:latest-only`, a special form of branching that only runs on DAGs running against the present -* :ref:`concepts:depends-on-past`, where tasks can depend on themselves *from a previous run* -* :ref:`concepts:trigger-rules`, which let you set the conditions under which a DAG will run a task. - +* :ref:`concepts:branching` - select which Task to move onto based on a condition +* :ref:`concepts:trigger-rules` - set the conditions under which a DAG will run a task +* :doc:`/howto/setup-and-teardown` - define setup and teardown relationships +* :ref:`concepts:latest-only` - a special form of branching that only runs on DAGs running against the present +* :ref:`concepts:depends-on-past` - tasks can depend on themselves *from a previous run* .. _concepts:branching: @@ -484,6 +484,14 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality .. image:: /img/branch_with_trigger.png +Setup and teardown +------------------ + +In data workflows it's common to create a resource (such as a compute resource), use it to do some work, and then tear it down. Airflow provides setup and teardown tasks to support this need. + +Please see main article :doc:`/howto/setup-and-teardown` for details on how to use this feature. + + Dynamic DAGs ------------ diff --git a/docs/apache-airflow/howto/index.rst b/docs/apache-airflow/howto/index.rst index 15a5249147..03c34820c4 100644 --- a/docs/apache-airflow/howto/index.rst +++ b/docs/apache-airflow/howto/index.rst @@ -45,6 +45,7 @@ configuring an Airflow environment. export-more-env-vars connection variable + setup-and-teardown run-behind-proxy run-with-systemd define-extra-link diff --git a/docs/apache-airflow/howto/setup-and-teardown.rst b/docs/apache-airflow/howto/setup-and-teardown.rst new file mode 100644 index 0000000000..c897031a6c --- /dev/null +++ b/docs/apache-airflow/howto/setup-and-teardown.rst @@ -0,0 +1,184 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Setup and Teardown +~~~~~~~~~~~~~~~~~~ + +In data workflows it's common to create a resource (such as a compute resource), use it to do some work, and then tear it down. Airflow provides setup and teardown tasks to support this need. + +Key features of setup and teardown tasks: + + * If you clear a task, its setups and teardowns will be cleared. + * By default, teardown tasks are ignored for the purpose of evaluating dag run state. + * A teardown task will run if its setup was successful, even if its work tasks failed. + * Teardown tasks are ignored when setting dependencies against task groups. + +How setup and teardown works +"""""""""""""""""""""""""""" + +Basic usage +""""""""""" + +Suppose you have a dag that creates a cluster, runs a query, and deletes the cluster. Without using setup and teardown tasks you might set these relationships: + +.. code-block:: python + + create_cluster >> run_query >> delete_cluster + +To enable create_cluster and delete_cluster as setup and teardown tasks, we mark them as such methods ``as_setup`` and ``as_teardown`` and add an upstream / downstream relationship between them: + +.. code-block:: python + + create_cluster.as_setup() >> run_query >> delete_cluster.as_teardown() + create_cluster >> delete_cluster + +For convenience we can do this in one line by passing ``create_cluster`` to the ``as_teardown`` method: + +.. code-block:: python + + create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster) + +Observations: + + * If you clear ``run_query`` to run it again, then both ``create_cluster`` and ``delete_cluster`` will be cleared. + * If ``run_query`` fails, then ``delete_cluster`` will still run. + * The success of the dag run will depend *only* on the success of ``run_query``. + +Additionally, if we have multiple tasks to wrap, we can use the teardown as a context manager: + +.. code-block:: python + + with delete_cluster.as_teardown(setups=create_cluster): + [RunQueryOne(), RunQueryTwo()] >> DoSomeOtherStuff() + WorkOne() >> [do_this_stuff(), do_other_stuff()] + +This will set create_cluster to run before the tasks in the context, and delete_cluster after them. + +Note that if you are attempting to add an already-instantiated task to a setup context you need to do it explicitly: + +.. code-block:: python + + with my_teardown_task as scope: + scope.add_task(work_task) # work_task was already instantiated elsewhere + +Setup "scope" +""""""""""""" + +Tasks between a setup and its teardown are in the "scope" of the setup / teardown pair. + +Let's look at an example: + +.. code-block:: python + + s1 >> w1 >> w2 >> t1.as_teardown(setups=s1) >> w3 + w2 >> w4 + +In the above example, ``w1`` and ``w2`` are "between" ``s1`` and ``t1`` and therefore are assumed to require ``s1``. Thus if ``w1`` or ``w2`` is cleared, so too will be ``s1`` and ``t1``. But if ``w3`` or ``w4`` is cleared, neither ``s1`` nor ``t1`` will be cleared. + +You can have multiple setup tasks wired to a single teardown. The teardown will run if at least one of the setups completed successfully. + +You can have a setup without a teardown: + +.. code-block:: python + + create_cluster >> run_query >> other_task + +In this case, everything downstream of create_cluster is assumed to require it. So if you clear query_two, it will also clear create_cluster. Suppose that we add a teardown for create_cluster after run_query: + +.. code-block:: python + + create_cluster >> run_query >> other_task + run_query >> delete_cluster.as_teardown(setups=create_cluster) + +Now, Airflow can infer that other_task does not require create_cluster, so if we clear other_task, create_cluster will not also be cleared. + +In that example, we (in our pretend docs land) actually wanted to delete the cluster. But supposing we didn't, and we just wanted to say "other_task does not require create_cluster", then we could use an EmptyOperator to limit the setup's scope: + +.. code-block:: python + + create_cluster >> run_query >> other_task + run_query >> EmptyOperator(task_id="cluster_teardown").as_teardown(setups=create_cluster) + + +Controlling dag run state +""""""""""""""""""""""""" + +Another feature of setup / teardown tasks is you can choose whether or not the teardown task should have an impact on dag run state. Perhaps you don't care if the "cleanup" work performed by your teardown task fails, and you only consider the dag run a failure if the "work" tasks fail. By default, teardown tasks are not considered for dag run state. + +Continuing with the example above, if you want the run's success to depend on ``delete_cluster``, then set``on_failure_fail_dagrun=True`` when setting ``delete_cluster`` as teardown. For example: + +.. code-block:: python + + create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster, on_failure_fail_dagrun=True) + +Authoring with task groups +"""""""""""""""""""""""""" + +When arrowing from task group to task group, or from task group to task, we ignore teardowns. This allows teardowns to run in parallel, and allows dag execution to proceed even if teardown tasks fail. + +Consider this example: + +.. code-block:: python + + with TaskGroup("my_group") as tg: + s1 = my_setup() + w1 = my_work() + t1 = my_teardown() + s1 >> w1 >> t1.as_teardown(setups=s1) + w2 = other_work() + tg >> w2 + +If ``t1`` were not a teardown task, then this dag would effectively be ``s1 >> w1 >> t1 >> w2``. But since we have marked ``t1`` as a teardown, it's ignored in ``tg >> w2``. So the dag is equivalent to the following: + +.. code-block:: python + + s1 >> w1 >> [t1.as_teardown(setups=s1), w2] + +Now let's consider an example with nesting: + +.. code-block:: python + + with TaskGroup("my_group") as tg: + s1 = my_setup() + w1 = my_work() + t1 = my_teardown() + s1 >> w1 >> t1.as_teardown(setups=s1) + w2 = other_work() + tg >> w2 + dag_s1 = dag_setup1() + dag_t1 = dag_teardown1() + dag_s1 >> [tg, w2] >> dag_t1.as_teardown(dag_s1) + +In this example ``s1`` is downstream of ``dag_s1``, so it must wait for ``dag_s1`` to complete successfully. But ``t1`` and ``dag_t1`` can run concurrently, because ``t1`` is ignored in the expression ``tg >> dag_t1``. If you clear ``w2``, it will clear ``dag_s1`` and ``dag_t1``, but not anything in the task group. + +Running setups and teardowns in parallel +"""""""""""""""""""""""""""""""""""""""" + +You can run setup tasks in parallel: + +.. code-block:: python + + ( + [create_cluster, create_bucket] + >> run_query + >> [delete_cluster.as_teardown(setups=create_cluster), delete_bucket.as_teardown(setups=create_bucket)] + ) + +Trigger rule behavior for teardowns +""""""""""""""""""""""""""""""""""" + +Teardowns use a (non-configurable) trigger rule called ALL_DONE_SETUP_SUCCESS. With this rule, as long as all upstreams are done and at least one directly connected setup is successful, the teardown will run. If all of a teardown's setups were skipped or failed, those states will propagate to the teardown. diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index ad3b7cd8f8..8effe2cc92 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -481,6 +481,7 @@ Dont DOS'ing DownloadReportV downscaling +downstreams Drillbit Drivy dropdown @@ -1493,6 +1494,7 @@ tblproperties TCP tcp teardown +teardowns templatable templateable Templated @@ -1597,6 +1599,7 @@ Upsert upsert upserts Upsight +upstreams Uptime uri uris