dstandish commented on code in PR #24489:
URL: https://github.com/apache/airflow/pull/24489#discussion_r902880469
##########
docs/apache-airflow/concepts/dynamic-task-mapping.rst:
##########
@@ -208,27 +210,156 @@ In this example you have a regular data delivery to an
S3 bucket and want to app
with DAG(dag_id="mapped_s3", start_date=datetime(2020, 4, 7)) as dag:
- files = S3ListOperator(
+ list_filenames = S3ListOperator(
task_id="get_input",
bucket="example-bucket",
prefix='incoming/provider_a/{{
data_interval_start.strftime("%Y-%m-%d") }}',
)
@task
- def count_lines(aws_conn_id, bucket, file):
+ def count_lines(aws_conn_id, bucket, filename):
hook = S3Hook(aws_conn_id=aws_conn_id)
- return len(hook.read_key(file, bucket).splitlines())
+ return len(hook.read_key(filename, bucket).splitlines())
@task
def total(lines):
return sum(lines)
- counts = count_lines.partial(aws_conn_id="aws_default",
bucket=files.bucket).expand(
- file=XComArg(files)
- )
+ counts = count_lines.partial(
+ aws_conn_id="aws_default", bucket=list_filenames.bucket
+ ).expand(filename=XComArg(list_filenames))
+
total(lines=counts)
+Assigning multiple parameters to a non-TaskFlow operator
+========================================================
+
+Sometimes an upstream needs to specify multiple arguments to a downstream
operator. To do this, you can use the ``expand_kwargs`` function, which takes a
sequence of mappings to map against.
+
+.. code-block:: python
+
+ BashOperator.partial(task_id="bash").expand_kwargs(
+ [
+ {"bash_command": "echo $ENV1", "env": {"ENV1": "1"}},
+ {"bash_command": "printf $ENV2", "env": {"ENV2": "2"}},
+ ],
+ )
+
+This produces two task instances at run-time printing ``1`` and ``2``
respectively.
+
+Similar to ``expand``, you can also map against a XCom that returns a list of
dicts, or a list of XComs each returning a dict. Re-using the S3 example above,
you can use a mapped task to perform “branching” and copy files to different
buckets:
+
+.. code-block:: python
+
+ list_filenames = S3ListOperator(...) # Same as the above example.
+
+
+ @task
+ def create_copy_kwargs(filename):
+ if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
+ dest_bucket_name = "my_text_bucket"
+ else:
+ dest_bucket_name = "my_other_bucket"
+ return {
+ "source_bucket_key": filename,
+ "dest_bucket_key": filename,
+ "dest_bucket_name": dest_bucket_name,
+ }
+
+
+ copy_kwargs = create_copy_kwargs.expand(filename=XComArg(list_filenames))
+
+ # Copy files to another bucket, based on the file's extension.
+ copy_filenames = S3CopyObjectOperator.partial(
+ task_id="copy_files", source_bucket_name=list_filenames.bucket
+ ).expand_kwargs(copy_kwargs)
+
+Filtering items from an expanded task
+=====================================
+
+A mapped task can remove any elements from being passed on to its downstream
tasks by returning ``None``. For example, if we want to *only* copy files from
an S3 bucket to another with certain extensions, we could implement
``create_copy_kwargs`` like this instead:
+
+.. code-block:: python
+
+ @task
+ def create_copy_kwargs(filename):
+ # Skip files not ending with these suffixes.
+ if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
+ return None
+ return {
+ "source_bucket_key": filename,
+ "dest_bucket_key": filename,
+ "dest_bucket_name": "my_other_bucket",
+ }
+
+
+ # copy_kwargs and copy_files are implemented the same.
+
+This makes ``copy_files`` only expand against ``.json`` and ``.yml`` files,
while ignoring the rest.
+
+Transforming mapped data
+========================
+
+Since it is common to want to transform the output data format for task
mapping, especially from a non-TaskFlow operator, where the output format is
pre-determined and cannot be easily converted (such as ``create_copy_kwargs``
in the above example), a special ``map()`` function can be used to easily
perform this kind of transformation. The above example can therefore be
modified like this:
+
+.. code-block:: python
+
+ from airflow.exceptions import AirflowSkipException
+
+ filenames = S3ListOperator(...) # Unchanged.
+
+
+ def create_copy_kwargs(filename):
+ if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
+ raise AirflowSkipException(f"skipping {filename!r}; unexpected
suffix")
+ return {
+ "source_bucket_key": filename,
+ "dest_bucket_key": filename,
+ "dest_bucket_name": "my_other_bucket",
+ }
+
+
+ copy_kwargs = XComArg(list_filenames).map(create_copy_kwargs)
+
+ # Unchanged.
+ copy_filenames =
S3CopyObjectOperator.partial(...).expand_kwargs(copy_kwargs)
+
+There are a couple of things to note:
+
+#. The callable argument of ``map()`` (``create_copy_kwargs`` in the example)
**must not** be a task, but a plain Python function. The transformation is as a
part of the “pre-processing” of the downstream task (i.e. ``copy_files``), not
a standalone task in the DAG.
+#. The callable always take exactly one positional argument. This function is
called for each item in the iterable used for task-mapping, similar to how
Python’s built-in ``map()`` works.
+#. Since the callable is executed as a part of the downstream task, you can
use any existing techniques to write the task function. To mark a component as
skipped, for example, you should raise ``AirflowSkipException``. Note that
returning ``None`` **does not** work here.
Review Comment:
what happens when you return None?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]