This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit cceccf27b281f9da9d8701ddad78ac8aeba6ab55
Author: Ephraim Anierobi <splendidzig...@gmail.com>
AuthorDate: Mon May 9 13:44:35 2022 +0100

    Fix scheduler crash when expanding with mapped task that returned none 
(#23486)
    
    When task is expanded from a mapped task that returned no value, it
    crashes the scheduler. This PR fixes it by first checking if there's
    a return value from the mapped task, if no returned value, then error
    in the task itself instead of crashing the scheduler
    
    (cherry picked from commit 7813f996ab79b9e6ef07090194f1e621e4f90e17)
---
 airflow/models/taskinstance.py    |  4 +++-
 tests/models/test_taskinstance.py | 18 ++++++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index de85b4cf8c..e1e87ec8e8 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2329,10 +2329,12 @@ class TaskInstance(Base, LoggingMixin):
         # currently possible for a downstream to depend on one individual 
mapped
         # task instance, only a task as a whole. This will change in AIP-42
         # Phase 2, and we'll need to further analyze the mapped task case.
-        if task.is_mapped or next(task.iter_mapped_dependants(), None) is None:
+        if next(task.iter_mapped_dependants(), None) is None:
             return
         if value is None:
             raise XComForMappingNotPushed()
+        if task.is_mapped:
+            return
         if not isinstance(value, collections.abc.Collection) or 
isinstance(value, (bytes, str)):
             raise UnmappableXComTypePushed(value)
         task_map = TaskMap.from_task_instance_xcom(self, value)
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index dac987e431..57355a3ad5 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2831,3 +2831,21 @@ def test_ti_mapped_depends_on_mapped_xcom_arg(dag_maker, 
session):
 
     query = XCom.get_many(run_id=dagrun.run_id, task_ids=["add_one__1"], 
session=session)
     assert [x.value for x in query.order_by(None).order_by(XCom.map_index)] == 
[3, 4, 5]
+
+
+def test_ti_mapped_depends_on_mapped_xcom_arg_XXX(dag_maker, session):
+    with dag_maker(session=session) as dag:
+
+        @dag.task
+        def add_one(x):
+            x + 1
+
+        two_three_four = add_one.expand(x=[1, 2, 3])
+        add_one.expand(x=two_three_four)
+
+    dagrun = dag_maker.create_dagrun()
+    for map_index in range(3):
+        ti = dagrun.get_task_instance("add_one", map_index=map_index)
+        ti.refresh_from_task(dag.get_task("add_one"))
+        with pytest.raises(XComForMappingNotPushed):
+            ti.run()

Reply via email to