This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 73a87f8885 Add concept doc for Dynamic Task Mapping (#22867) 73a87f8885 is described below commit 73a87f8885d1a72723e403f4b3cd868199e308fd Author: Ash Berlin-Taylor <ash_git...@firemirror.com> AuthorDate: Mon Apr 11 15:28:40 2022 +0100 Add concept doc for Dynamic Task Mapping (#22867) Co-authored-by: Daniel Standish <15932138+dstand...@users.noreply.github.com> Co-authored-by: Jed Cunningham <jedcunning...@apache.org> Co-authored-by: Tzu-ping Chung <uranu...@gmail.com> Co-authored-by: eladkal <45845474+elad...@users.noreply.github.com> --- .../concepts/dynamic-task-mapping.rst | 261 +++++++++++++++++++++ docs/apache-airflow/concepts/index.rst | 1 + .../howto/dynamic-dag-generation.rst | 2 + docs/apache-airflow/img/mapping-simple-graph.png | Bin 0 -> 7676 bytes docs/apache-airflow/img/mapping-simple-grid.png | Bin 0 -> 179670 bytes 5 files changed, 264 insertions(+) diff --git a/docs/apache-airflow/concepts/dynamic-task-mapping.rst b/docs/apache-airflow/concepts/dynamic-task-mapping.rst new file mode 100644 index 0000000000..b326a31672 --- /dev/null +++ b/docs/apache-airflow/concepts/dynamic-task-mapping.rst @@ -0,0 +1,261 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +==================== +Dynamic Task Mapping +==================== + +Dynamic Task Mapping allows a way for a workflow to create a number of tasks at runtime based upon current data, rather than the DAG author having to know in advance how many tasks would be needed. + +This is similar to defining your tasks in a for loop, but instead of having the DAG file fetch the data and do that itself, the scheduler can do this based on the output of a previous task. Right before a mapped task is executed the scheduler will create *n* copies of the task, one for each input. + +It is also possible to have a task operate on the collected output of a mapped task, commonly known as map and reduce. + +Simple mapping +============== + +In its simplest form you can map over a list defined directly in your DAG file using the ``expand()`` function instead of calling your task directly. + +.. code-block:: python + + from datetime import datetime + + from airflow import DAG + from airflow.decorators import task + + + with DAG(dag_id="simple_mapping", start_date=datetime(2022, 3, 4)) as dag: + + @task + def add_one(x: int): + return x + 1 + + @task + def sum_it(values): + total = sum(values) + print(f"Total was {total}") + + added_values = add_one.expand(x=[1, 2, 3]) + sum_it(added_values) + +This will show ``Total was 9`` in the task logs when executed. + +This is the resulting DAG structure: + +.. image:: /img/mapping-simple-graph.png + +The grid view also provides visibility into your mapped tasks in the details panel: + +.. image:: /img/mapping-simple-grid.png + +.. note:: A reduce task is not required. + + Although we show a "reduce" task here (``sum_it``) you don't have to have one, the mapped tasks will still be executed even if they have no downstream tasks. + +Repeated Mapping +================ + +The result of one mapped task can also be used as input to the next mapped task. + +.. code-block:: python + + with DAG(dag_id="repeated_mapping", start_date=datetime(2022, 3, 4)) as dag: + + @task + def add_one(x: int): + return x + 1 + + first = add_one.expand(x=[1, 2, 3]) + second = add_one.expand(x=first) + +This would have a result of ``[3, 4, 5]``. + +Constant parameters +=================== + +As well as passing arguments that get expanded at run-time, it is possible to pass arguments that don't change – in order to clearly differentiate between the two kinds we use different functions, ``expand()`` for mapped arguments, and ``partial()`` for unmapped ones. + +.. code-block:: python + + @task + def add(x: int, y: int): + return x + y + + + added_values = add.partial(y=10).expand(x=[1, 2, 3]) + # This results in add function being expanded to + # add(x=1, y=10) + # add(x=2, y=10) + # add(x=3, y=10) + +This would result in values of 11, 12, and 13. + +This is also useful for passing things such as connection IDs, database table names, or bucket names to tasks. + +Mapping over multiple parameters +================================ + +As well as a single parameter it is possible to pass multiple parameters to expand. This will have the effect of creating a "cross product", calling the mapped task with each combination of parameters. + +.. code-block:: python + + @task + def add(x: int, y: int): + return x + y + + + added_values = add.expand(x=[2, 4, 8], y=[5, 10]) + # This results in the add function being called with + # add(x=2, y=5) + # add(x=2, y=10) + # add(x=4, y=5) + # add(x=4, y=10) + # add(x=8, y=5) + # add(x=8, y=10) + +This would result in the add task being called 6 times. Please note however that the order of expansion is not guaranteed. + +It is not possible to achieve an effect similar to Python's ``zip`` function with mapped arguments. + +Task-generated Mapping +====================== + +Up until now the examples we've shown could all be achieved with a ``for`` loop in the DAG file, but the real power of dynamic task mapping comes from being able to have a task generate the list to iterate over. + +.. code-block:: python + + @task + def make_list(): + # This can also be from an API call, checking a database, -- almost anything you like, as long as the + # resulting list/dictionary can be stored in the current XCom backend. + return [1, 2, {"a": "b"}, "str"] + + + @task + def consumer(arg): + print(repr(arg)) + + + with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag: + consumer.expand(arg=make_list()) + +The ``make_list`` task runs as a normal task and must return a list or dict (see `What data types can be expanded?`_), and then the ``consumer`` task will be called four times, once with each value in the return of ``make_list``. + +Mapping with non-TaskFlow operators +=================================== + +It is possible to use ``partial`` and ``expand`` with classic style operators as well. Some arguments are not mappable and must be passed to ``partial()``, such as ``task_id``, ``queue``, ``pool``, and most other arguments to ``BaseOperator``. + + +.. code-block:: python + + BashOperator.partial(task_id="bash", do_xcom_push=False).expand( + bash_command=["echo 1", "echo 2"] + ) + +Mapping over result of classic operators +---------------------------------------- + +If you want to map over the result of a classic operator you will need to create an ``XComArg`` object manually. + +.. code-block:: python + + from airflow import XComArg + + task = MyOperator(task_id="source") + + downstream = MyOperator2.partial(task_id="consumer").expand(input=XComArg(task)) + +Putting it all together +======================= + +In this example you have a regular data delivery to an S3 bucket and want to apply the same processing to every file that arrives, no matter how many arrive each time. + +.. code-block:: python + + from datetime import datetime + + from airflow import DAG, XComArg + from airflow.decorators import task + from airflow.providers.amazon.aws.hooks.s3 import S3Hook + from airflow.providers.amazon.aws.operators.s3 import S3ListOperator + + + with DAG(dag_id="mapped_s3", start_date=datetime(2020, 4, 7)) as dag: + files = S3ListOperator( + task_id="get_input", + bucket="example-bucket", + prefix='incoming/provider_a/{{ data_interval_start.strftime("%Y-%m-%d") }}', + ) + + @task + def count_lines(aws_conn_id, bucket, file): + hook = S3Hook(aws_conn_id=aws_conn_id) + + return len(hook.read_key(file, bucket).splitlines()) + + @task + def total(lines): + return sum(lines) + + counts = count_lines.partial(aws_conn_id="aws_default", bucket=files.bucket).expand( + file=XComArg(files) + ) + total(lines=counts) + +What data types can be expanded? +================================ + +Currently it is only possible to map against a dict, a list, or one of those types stored in XCom as the result of a task. + +If an upstream task returns an unmappable type, the mapped task will fail at run-time with an ``UnmappableXComTypePushed`` exception. For instance, you can't have the upstream task return a plain string – it must be a list or a dict. + +Placing limits on mapped tasks +============================== + +There are two limits that you can place on a task: + + #. the number of mapped task instances can be created as the result of expansion. + #. The number of the mapped task can run at once. + +- **Limiting number of mapped task** + + The [core] ``max_map_length`` config option is the maximum number of tasks that ``expand`` can create – the default value is 1024. + + If a source task (``make_list`` in our earlier example) returns a list longer than this it will result in *that* task failing. + +- **Limiting parallel copies of a mapped task** + + If you wish to not have a large mapped task consume all available runner slots you can use the ``max_active_tis_per_dag`` setting on the task to restrict how many can be running at the same time. + + Note however that this applies to all copies of that task against all active DagRuns, not just to this one specific DagRun. + + .. code-block:: python + + @task(max_active_tis_per_dag=16) + def add_one(x: int): + return x + 1 + + + BashOperator.partial(task_id="my_task", max_active_tis_per_dag=16).expand( + bash_command=commands + ) + +Automatically skipping zero-length maps +======================================= + +If the input is empty (zero length), no new tasks will be created and the mapped task will be marked as ``SKIPPED``. diff --git a/docs/apache-airflow/concepts/index.rst b/docs/apache-airflow/concepts/index.rst index 4d99555be1..f4f0cb3b65 100644 --- a/docs/apache-airflow/concepts/index.rst +++ b/docs/apache-airflow/concepts/index.rst @@ -36,6 +36,7 @@ Here you can find detailed documentation about each one of Airflow's core concep dags tasks operators + dynamic-task-mapping sensors deferring smart-sensors diff --git a/docs/apache-airflow/howto/dynamic-dag-generation.rst b/docs/apache-airflow/howto/dynamic-dag-generation.rst index 185c577d18..fb86abbe54 100644 --- a/docs/apache-airflow/howto/dynamic-dag-generation.rst +++ b/docs/apache-airflow/howto/dynamic-dag-generation.rst @@ -20,6 +20,8 @@ Dynamic DAG Generation ====================== +To have a task repeated based on the output/result of a previous task see :doc:`/concepts/dynamic-task-mapping`. + Dynamic DAGs with environment variables ....................................... diff --git a/docs/apache-airflow/img/mapping-simple-graph.png b/docs/apache-airflow/img/mapping-simple-graph.png new file mode 100644 index 0000000000..9e01d027ec Binary files /dev/null and b/docs/apache-airflow/img/mapping-simple-graph.png differ diff --git a/docs/apache-airflow/img/mapping-simple-grid.png b/docs/apache-airflow/img/mapping-simple-grid.png new file mode 100644 index 0000000000..59f5e6f71c Binary files /dev/null and b/docs/apache-airflow/img/mapping-simple-grid.png differ