This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d9a26ceef Add branching based on mapped task group example to 
dynamic-task-mapping.rst (#36480)
0d9a26ceef is described below

commit 0d9a26ceefb2a5661f19cce292e05939d3f2a0c1
Author: Ryan Hatter <25823361+rnh...@users.noreply.github.com>
AuthorDate: Fri Dec 29 18:04:14 2023 -0500

    Add branching based on mapped task group example to 
dynamic-task-mapping.rst (#36480)
    
    * Add branching based on mapped task group example to 
dynamic-task-mapping.rst
    
    Based on trying to solve [this stack overflow 
question](https://stackoverflow.com/questions/77730116/branching-not-working-in-airflow-as-expected/77730300#77730300),
 it seems impossible to reliably branch mapped tasks based on the result of an 
upstream task. However, it's possible to do this in a mapped task group, which 
this example demonstrates.
    
    * trying to force blacken-docs
---
 .../dynamic-task-mapping.rst                       | 44 ++++++++++++++++++++--
 1 file changed, 41 insertions(+), 3 deletions(-)

diff --git 
a/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst 
b/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst
index ecfe8e2413..81102dd54e 100644
--- a/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst
@@ -313,7 +313,7 @@ For example, this code will *not* work:
 
 
         @task_group
-        def my_group(value):
+        def my_task_group(value):
             if not value:  # DOES NOT work as you'd expect!
                 task_a = EmptyOperator(...)
             else:
@@ -321,9 +321,9 @@ For example, this code will *not* work:
             task_a << my_task(value)
 
 
-        my_group.expand(value=[0, 1, 2])
+        my_task_group.expand(value=[0, 1, 2])
 
-When code in ``my_group`` is executed, ``value`` would still only be a 
reference, not the real value, so the ``if not value`` branch will not work as 
you likely want. However, if you pass that reference into a task, it will 
become resolved when the task is executed, and the three ``my_task`` instances 
will therefore receive 1, 2, and 3, respectively.
+When code in ``my_task_group`` is executed, ``value`` would still only be a 
reference, not the real value, so the ``if not value`` branch will not work as 
you likely want. However, if you pass that reference into a task, it will 
become resolved when the task is executed, and the three ``my_task`` instances 
will therefore receive 1, 2, and 3, respectively.
 
 It is, therefore, important to remember that, if you intend to perform any 
logic to a value passed into a task group function, you must always use a task 
to run the logic, such as  ``@task.branch`` (or ``BranchPythonOperator``) for 
conditions, and task mapping methods for loops.
 
@@ -375,6 +375,44 @@ Similar to a mapped task group, depending on a mapped task 
group's output would
 
 It is also possible to perform any operations as results from a normal mapped 
task.
 
+Branching on a mapped task group's output
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+While it's not possible to implement branching logic (for example using 
``@task.branch``) on the results of a mapped task, it is possible to branch 
based on the *input* of a task group. The following example demonstrates 
executing one of three tasks based on the input to a mapped task group.
+
+.. code-block:: python
+
+    inputs = ["a", "b", "c"]
+
+
+    @task_group(group_id="my_task_group")
+    def my_task_group(input):
+        @task.branch
+        def branch(element):
+            if "a" in element:
+                return "my_task_group.a"
+            elif "b" in element:
+                return "my_task_group.b"
+            else:
+                return "my_task_group.c"
+
+        @task
+        def a():
+            print("a")
+
+        @task
+        def b():
+            print("b")
+
+        @task
+        def c():
+            print("c")
+
+        branch(input) >> [a(), b(), c()]
+
+
+    my_task_group.expand(input=inputs)
+
 Filtering items from a mapped task
 ==================================
 

Reply via email to