This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 73a87f8885 Add concept doc for Dynamic Task Mapping (#22867)
73a87f8885 is described below

commit 73a87f8885d1a72723e403f4b3cd868199e308fd
Author: Ash Berlin-Taylor <ash_git...@firemirror.com>
AuthorDate: Mon Apr 11 15:28:40 2022 +0100

    Add concept doc for Dynamic Task Mapping (#22867)
    
    Co-authored-by: Daniel Standish 
<15932138+dstand...@users.noreply.github.com>
    Co-authored-by: Jed Cunningham <jedcunning...@apache.org>
    Co-authored-by: Tzu-ping Chung <uranu...@gmail.com>
    Co-authored-by: eladkal <45845474+elad...@users.noreply.github.com>
---
 .../concepts/dynamic-task-mapping.rst              | 261 +++++++++++++++++++++
 docs/apache-airflow/concepts/index.rst             |   1 +
 .../howto/dynamic-dag-generation.rst               |   2 +
 docs/apache-airflow/img/mapping-simple-graph.png   | Bin 0 -> 7676 bytes
 docs/apache-airflow/img/mapping-simple-grid.png    | Bin 0 -> 179670 bytes
 5 files changed, 264 insertions(+)

diff --git a/docs/apache-airflow/concepts/dynamic-task-mapping.rst 
b/docs/apache-airflow/concepts/dynamic-task-mapping.rst
new file mode 100644
index 0000000000..b326a31672
--- /dev/null
+++ b/docs/apache-airflow/concepts/dynamic-task-mapping.rst
@@ -0,0 +1,261 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+====================
+Dynamic Task Mapping
+====================
+
+Dynamic Task Mapping allows a way for a workflow to create a number of tasks 
at runtime based upon current data, rather than the DAG author having to know 
in advance how many tasks would be needed.
+
+This is similar to defining your tasks in a for loop, but instead of having 
the DAG file fetch the data and do that itself, the scheduler can do this based 
on the output of a previous task. Right before a mapped task is executed the 
scheduler will create *n* copies of the task, one for each input.
+
+It is also possible to have a task operate on the collected output of a mapped 
task, commonly known as map and reduce.
+
+Simple mapping
+==============
+
+In its simplest form you can map over a list defined directly in your DAG file 
using the ``expand()`` function instead of calling your task directly.
+
+.. code-block:: python
+
+    from datetime import datetime
+
+    from airflow import DAG
+    from airflow.decorators import task
+
+
+    with DAG(dag_id="simple_mapping", start_date=datetime(2022, 3, 4)) as dag:
+
+        @task
+        def add_one(x: int):
+            return x + 1
+
+        @task
+        def sum_it(values):
+            total = sum(values)
+            print(f"Total was {total}")
+
+        added_values = add_one.expand(x=[1, 2, 3])
+        sum_it(added_values)
+
+This will show ``Total was 9`` in the task logs when executed.
+
+This is the resulting DAG structure:
+
+.. image:: /img/mapping-simple-graph.png
+
+The grid view also provides visibility into your mapped tasks in the details 
panel:
+
+.. image:: /img/mapping-simple-grid.png
+
+.. note:: A reduce task is not required.
+
+    Although we show a "reduce" task here (``sum_it``) you don't have to have 
one, the mapped tasks will still be executed even if they have no downstream 
tasks.
+
+Repeated Mapping
+================
+
+The result of one mapped task can also be used as input to the next mapped 
task.
+
+.. code-block:: python
+
+    with DAG(dag_id="repeated_mapping", start_date=datetime(2022, 3, 4)) as 
dag:
+
+        @task
+        def add_one(x: int):
+            return x + 1
+
+        first = add_one.expand(x=[1, 2, 3])
+        second = add_one.expand(x=first)
+
+This would have a result of ``[3, 4, 5]``.
+
+Constant parameters
+===================
+
+As well as passing arguments that get expanded at run-time, it is possible to 
pass arguments that don't change – in order to clearly differentiate between 
the two kinds we use different functions, ``expand()`` for mapped arguments, 
and ``partial()`` for unmapped ones.
+
+.. code-block:: python
+
+    @task
+    def add(x: int, y: int):
+        return x + y
+
+
+    added_values = add.partial(y=10).expand(x=[1, 2, 3])
+    # This results in add function being expanded to
+    # add(x=1, y=10)
+    # add(x=2, y=10)
+    # add(x=3, y=10)
+
+This would result in values of 11, 12, and 13.
+
+This is also useful for passing things such as connection IDs, database table 
names, or bucket names to tasks.
+
+Mapping over multiple parameters
+================================
+
+As well as a single parameter it is possible to pass multiple parameters to 
expand. This will have the effect of creating a "cross product", calling the 
mapped task with each combination of parameters.
+
+.. code-block:: python
+
+    @task
+    def add(x: int, y: int):
+        return x + y
+
+
+    added_values = add.expand(x=[2, 4, 8], y=[5, 10])
+    # This results in the add function being called with
+    # add(x=2, y=5)
+    # add(x=2, y=10)
+    # add(x=4, y=5)
+    # add(x=4, y=10)
+    # add(x=8, y=5)
+    # add(x=8, y=10)
+
+This would result in the add task being called 6 times. Please note however 
that the order of expansion is not guaranteed.
+
+It is not possible to achieve an effect similar to Python's ``zip`` function 
with mapped arguments.
+
+Task-generated Mapping
+======================
+
+Up until now the examples we've shown could all be achieved with a ``for`` 
loop in the DAG file, but the real power of dynamic task mapping comes from 
being able to have a task generate the list to iterate over.
+
+.. code-block:: python
+
+    @task
+    def make_list():
+        # This can also be from an API call, checking a database, -- almost 
anything you like, as long as the
+        # resulting list/dictionary can be stored in the current XCom backend.
+        return [1, 2, {"a": "b"}, "str"]
+
+
+    @task
+    def consumer(arg):
+        print(repr(arg))
+
+
+    with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
+        consumer.expand(arg=make_list())
+
+The ``make_list`` task runs as a normal task and must return a list or dict 
(see `What data types can be expanded?`_), and then the ``consumer`` task will 
be called four times, once with each value in the return of ``make_list``.
+
+Mapping with non-TaskFlow operators
+===================================
+
+It is possible to use ``partial`` and ``expand`` with classic style operators 
as well. Some arguments are not mappable and must be passed to ``partial()``, 
such as ``task_id``, ``queue``, ``pool``, and most other arguments to 
``BaseOperator``.
+
+
+.. code-block:: python
+
+    BashOperator.partial(task_id="bash", do_xcom_push=False).expand(
+        bash_command=["echo 1", "echo 2"]
+    )
+
+Mapping over result of classic operators
+----------------------------------------
+
+If you want to map over the result of a classic operator you will need to 
create an ``XComArg`` object manually.
+
+.. code-block:: python
+
+    from airflow import XComArg
+
+    task = MyOperator(task_id="source")
+
+    downstream = 
MyOperator2.partial(task_id="consumer").expand(input=XComArg(task))
+
+Putting it all together
+=======================
+
+In this example you have a regular data delivery to an S3 bucket and want to 
apply the same processing to every file that arrives, no matter how many arrive 
each time.
+
+.. code-block:: python
+
+    from datetime import datetime
+
+    from airflow import DAG, XComArg
+    from airflow.decorators import task
+    from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+    from airflow.providers.amazon.aws.operators.s3 import S3ListOperator
+
+
+    with DAG(dag_id="mapped_s3", start_date=datetime(2020, 4, 7)) as dag:
+        files = S3ListOperator(
+            task_id="get_input",
+            bucket="example-bucket",
+            prefix='incoming/provider_a/{{ 
data_interval_start.strftime("%Y-%m-%d") }}',
+        )
+
+        @task
+        def count_lines(aws_conn_id, bucket, file):
+            hook = S3Hook(aws_conn_id=aws_conn_id)
+
+            return len(hook.read_key(file, bucket).splitlines())
+
+        @task
+        def total(lines):
+            return sum(lines)
+
+        counts = count_lines.partial(aws_conn_id="aws_default", 
bucket=files.bucket).expand(
+            file=XComArg(files)
+        )
+        total(lines=counts)
+
+What data types can be expanded?
+================================
+
+Currently it is only possible to map against a dict, a list, or one of those 
types stored in XCom as the result of a task.
+
+If an upstream task returns an unmappable type, the mapped task will fail at 
run-time with an ``UnmappableXComTypePushed`` exception. For instance, you 
can't have the upstream task return a plain string – it must be a list or a 
dict.
+
+Placing limits on mapped tasks
+==============================
+
+There are two limits that you can place on a task:
+
+  #. the number of mapped task instances can be created as the result of 
expansion.
+  #. The number of the mapped task can run at once.
+
+- **Limiting number of mapped task**
+
+  The [core] ``max_map_length`` config option is the maximum number of tasks 
that ``expand`` can create – the default value is 1024.
+
+  If a source task (``make_list`` in our earlier example) returns a list 
longer than this it will result in *that* task failing.
+
+- **Limiting parallel copies of a mapped task**
+
+  If you wish to not have a large mapped task consume all available runner 
slots you can use the ``max_active_tis_per_dag`` setting on the task to 
restrict how many can be running at the same time.
+
+  Note however that this applies to all copies of that task against all active 
DagRuns, not just to this one specific DagRun.
+
+  .. code-block:: python
+
+      @task(max_active_tis_per_dag=16)
+      def add_one(x: int):
+          return x + 1
+
+
+      BashOperator.partial(task_id="my_task", 
max_active_tis_per_dag=16).expand(
+          bash_command=commands
+      )
+
+Automatically skipping zero-length maps
+=======================================
+
+If the input is empty (zero length), no new tasks will be created and the 
mapped task will be marked as ``SKIPPED``.
diff --git a/docs/apache-airflow/concepts/index.rst 
b/docs/apache-airflow/concepts/index.rst
index 4d99555be1..f4f0cb3b65 100644
--- a/docs/apache-airflow/concepts/index.rst
+++ b/docs/apache-airflow/concepts/index.rst
@@ -36,6 +36,7 @@ Here you can find detailed documentation about each one of 
Airflow's core concep
     dags
     tasks
     operators
+    dynamic-task-mapping
     sensors
     deferring
     smart-sensors
diff --git a/docs/apache-airflow/howto/dynamic-dag-generation.rst 
b/docs/apache-airflow/howto/dynamic-dag-generation.rst
index 185c577d18..fb86abbe54 100644
--- a/docs/apache-airflow/howto/dynamic-dag-generation.rst
+++ b/docs/apache-airflow/howto/dynamic-dag-generation.rst
@@ -20,6 +20,8 @@
 Dynamic DAG Generation
 ======================
 
+To have a task repeated based on the output/result of a previous task see 
:doc:`/concepts/dynamic-task-mapping`.
+
 Dynamic DAGs with environment variables
 .......................................
 
diff --git a/docs/apache-airflow/img/mapping-simple-graph.png 
b/docs/apache-airflow/img/mapping-simple-graph.png
new file mode 100644
index 0000000000..9e01d027ec
Binary files /dev/null and b/docs/apache-airflow/img/mapping-simple-graph.png 
differ
diff --git a/docs/apache-airflow/img/mapping-simple-grid.png 
b/docs/apache-airflow/img/mapping-simple-grid.png
new file mode 100644
index 0000000000..59f5e6f71c
Binary files /dev/null and b/docs/apache-airflow/img/mapping-simple-grid.png 
differ

Reply via email to