dstandish commented on code in PR #24489:
URL: https://github.com/apache/airflow/pull/24489#discussion_r902880469


##########
docs/apache-airflow/concepts/dynamic-task-mapping.rst:
##########
@@ -208,27 +210,156 @@ In this example you have a regular data delivery to an 
S3 bucket and want to app
 
 
     with DAG(dag_id="mapped_s3", start_date=datetime(2020, 4, 7)) as dag:
-        files = S3ListOperator(
+        list_filenames = S3ListOperator(
             task_id="get_input",
             bucket="example-bucket",
             prefix='incoming/provider_a/{{ 
data_interval_start.strftime("%Y-%m-%d") }}',
         )
 
         @task
-        def count_lines(aws_conn_id, bucket, file):
+        def count_lines(aws_conn_id, bucket, filename):
             hook = S3Hook(aws_conn_id=aws_conn_id)
 
-            return len(hook.read_key(file, bucket).splitlines())
+            return len(hook.read_key(filename, bucket).splitlines())
 
         @task
         def total(lines):
             return sum(lines)
 
-        counts = count_lines.partial(aws_conn_id="aws_default", 
bucket=files.bucket).expand(
-            file=XComArg(files)
-        )
+        counts = count_lines.partial(
+            aws_conn_id="aws_default", bucket=list_filenames.bucket
+        ).expand(filename=XComArg(list_filenames))
+
         total(lines=counts)
 
+Assigning multiple parameters to a non-TaskFlow operator
+========================================================
+
+Sometimes an upstream needs to specify multiple arguments to a downstream 
operator. To do this, you can use the ``expand_kwargs`` function, which takes a 
sequence of mappings to map against.
+
+.. code-block:: python
+
+    BashOperator.partial(task_id="bash").expand_kwargs(
+        [
+            {"bash_command": "echo $ENV1", "env": {"ENV1": "1"}},
+            {"bash_command": "printf $ENV2", "env": {"ENV2": "2"}},
+        ],
+    )
+
+This produces two task instances at run-time printing ``1`` and ``2`` 
respectively.
+
+Similar to ``expand``, you can also map against a XCom that returns a list of 
dicts, or a list of XComs each returning a dict. Re-using the S3 example above, 
you can use a mapped task to perform “branching” and copy files to different 
buckets:
+
+.. code-block:: python
+
+    list_filenames = S3ListOperator(...)  # Same as the above example.
+
+
+    @task
+    def create_copy_kwargs(filename):
+        if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
+            dest_bucket_name = "my_text_bucket"
+        else:
+            dest_bucket_name = "my_other_bucket"
+        return {
+            "source_bucket_key": filename,
+            "dest_bucket_key": filename,
+            "dest_bucket_name": dest_bucket_name,
+        }
+
+
+    copy_kwargs = create_copy_kwargs.expand(filename=XComArg(list_filenames))
+
+    # Copy files to another bucket, based on the file's extension.
+    copy_filenames = S3CopyObjectOperator.partial(
+        task_id="copy_files", source_bucket_name=list_filenames.bucket
+    ).expand_kwargs(copy_kwargs)
+
+Filtering items from an expanded task
+=====================================
+
+A mapped task can remove any elements from being passed on to its downstream 
tasks by returning ``None``. For example, if we want to *only* copy files from 
an S3 bucket to another with certain extensions, we could implement 
``create_copy_kwargs`` like this instead:
+
+.. code-block:: python
+
+    @task
+    def create_copy_kwargs(filename):
+        # Skip files not ending with these suffixes.
+        if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
+            return None
+        return {
+            "source_bucket_key": filename,
+            "dest_bucket_key": filename,
+            "dest_bucket_name": "my_other_bucket",
+        }
+
+
+    # copy_kwargs and copy_files are implemented the same.
+
+This makes ``copy_files`` only expand against ``.json`` and ``.yml`` files, 
while ignoring the rest.
+
+Transforming mapped data
+========================
+
+Since it is common to want to transform the output data format for task 
mapping, especially from a non-TaskFlow operator, where the output format is 
pre-determined and cannot be easily converted (such as ``create_copy_kwargs`` 
in the above example), a special ``map()`` function can be used to easily 
perform this kind of transformation. The above example can therefore be 
modified like this:
+
+.. code-block:: python
+
+    from airflow.exceptions import AirflowSkipException
+
+    filenames = S3ListOperator(...)  # Unchanged.
+
+
+    def create_copy_kwargs(filename):
+        if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
+            raise AirflowSkipException(f"skipping {filename!r}; unexpected 
suffix")
+        return {
+            "source_bucket_key": filename,
+            "dest_bucket_key": filename,
+            "dest_bucket_name": "my_other_bucket",
+        }
+
+
+    copy_kwargs = XComArg(list_filenames).map(create_copy_kwargs)
+
+    # Unchanged.
+    copy_filenames = 
S3CopyObjectOperator.partial(...).expand_kwargs(copy_kwargs)
+
+There are a couple of things to note:
+
+#. The callable argument of ``map()`` (``create_copy_kwargs`` in the example) 
**must not** be a task, but a plain Python function. The transformation is as a 
part of the “pre-processing” of the downstream task (i.e. ``copy_files``), not 
a standalone task in the DAG.
+#. The callable always take exactly one positional argument. This function is 
called for each item in the iterable used for task-mapping, similar to how 
Python’s built-in ``map()`` works.
+#. Since the callable is executed as a part of the downstream task, you can 
use any existing techniques to write the task function. To mark a component as 
skipped, for example, you should raise ``AirflowSkipException``. Note that 
returning ``None`` **does not** work here.

Review Comment:
   what happens when you return None?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to