This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit cceccf27b281f9da9d8701ddad78ac8aeba6ab55 Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Mon May 9 13:44:35 2022 +0100 Fix scheduler crash when expanding with mapped task that returned none (#23486) When task is expanded from a mapped task that returned no value, it crashes the scheduler. This PR fixes it by first checking if there's a return value from the mapped task, if no returned value, then error in the task itself instead of crashing the scheduler (cherry picked from commit 7813f996ab79b9e6ef07090194f1e621e4f90e17) --- airflow/models/taskinstance.py | 4 +++- tests/models/test_taskinstance.py | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index de85b4cf8c..e1e87ec8e8 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2329,10 +2329,12 @@ class TaskInstance(Base, LoggingMixin): # currently possible for a downstream to depend on one individual mapped # task instance, only a task as a whole. This will change in AIP-42 # Phase 2, and we'll need to further analyze the mapped task case. - if task.is_mapped or next(task.iter_mapped_dependants(), None) is None: + if next(task.iter_mapped_dependants(), None) is None: return if value is None: raise XComForMappingNotPushed() + if task.is_mapped: + return if not isinstance(value, collections.abc.Collection) or isinstance(value, (bytes, str)): raise UnmappableXComTypePushed(value) task_map = TaskMap.from_task_instance_xcom(self, value) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index dac987e431..57355a3ad5 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2831,3 +2831,21 @@ def test_ti_mapped_depends_on_mapped_xcom_arg(dag_maker, session): query = XCom.get_many(run_id=dagrun.run_id, task_ids=["add_one__1"], session=session) assert [x.value for x in query.order_by(None).order_by(XCom.map_index)] == [3, 4, 5] + + +def test_ti_mapped_depends_on_mapped_xcom_arg_XXX(dag_maker, session): + with dag_maker(session=session) as dag: + + @dag.task + def add_one(x): + x + 1 + + two_three_four = add_one.expand(x=[1, 2, 3]) + add_one.expand(x=two_three_four) + + dagrun = dag_maker.create_dagrun() + for map_index in range(3): + ti = dagrun.get_task_instance("add_one", map_index=map_index) + ti.refresh_from_task(dag.get_task("add_one")) + with pytest.raises(XComForMappingNotPushed): + ti.run()