This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new a59a4800193 [v3-1-test] fix Inconsistent XCom Return Type in Mapped
Task Groups with Dynamic … (#59104) (#63293)
a59a4800193 is described below
commit a59a4800193343dea9ff18afa1d082759cbc4d1e
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Mar 10 22:18:47 2026 +0100
[v3-1-test] fix Inconsistent XCom Return Type in Mapped Task Groups with
Dynamic … (#59104) (#63293)
* fix Inconsistent XCom Return Type in Mapped Task Groups with Dynamic Task
Mapping
* add unit test for xcom_pull
* remove asc func
(cherry picked from commit df2e2b831b4a9f2c13c109fa609579d20fffac39)
Co-authored-by: Henry Chen <[email protected]>
---
airflow-core/src/airflow/models/taskinstance.py | 10 +++-
.../tests/unit/models/test_taskinstance.py | 65 ++++++++++++++++++++++
2 files changed, 72 insertions(+), 3 deletions(-)
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 7d58046d03a..95a4430f9ac 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1847,6 +1847,7 @@ class TaskInstance(Base, LoggingMixin):
) -> Any:
""":meta private:""" # noqa: D400
# This is only kept for compatibility in tests for now while AIP-72 is
in progress.
+
if dag_id is None:
dag_id = self.dag_id
if run_id is None:
@@ -1878,12 +1879,15 @@ class TaskInstance(Base, LoggingMixin):
).first()
if first is None: # No matching XCom at all.
return default
+
if map_indexes is not None or first.map_index < 0:
return XComModel.deserialize_value(first)
- # raise RuntimeError("Nothing should hit this anymore")
-
- # TODO: TaskSDK: We should remove this, but many tests still currently
call `ti.run()`. See #45549
+ return LazyXComSelectSequence.from_select(
+ query.with_only_columns(XComModel.value).order_by(None),
+ order_by=[XComModel.map_index.expression],
+ session=session,
+ )
# At this point either task_ids or map_indexes is explicitly
multi-value.
# Order return values to match task_ids and map_indexes ordering.
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py
b/airflow-core/tests/unit/models/test_taskinstance.py
index f3ffcedf218..615c40ac918 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -3029,6 +3029,71 @@ class TestMappedTaskInstanceReceiveValue:
out_lines = [line.strip() for line in f]
assert out_lines == ["hello FOO", "goodbye FOO", "hello BAR", "goodbye
BAR"]
+ def test_xcom_pull_unmapped_task(self, dag_maker, session):
+ """
+ Test that xcom_pull from unmapped task returns single deserialized
value.
+
+ For unmapped tasks with map_index < 0, xcom_pull should return the
single value,
+ not a LazyXComSelectSequence.
+ """
+
+ with dag_maker(dag_id="test_xcom_unmapped", session=session):
+ upstream = PythonOperator(
+ task_id="unmapped_task",
+ python_callable=lambda: {"key": "value"},
+ )
+ downstream = PythonOperator(
+ task_id="downstream",
+ python_callable=lambda: None,
+ )
+ upstream >> downstream
+
+ dag_run = dag_maker.create_dagrun(logical_date=timezone.utcnow())
+
+ # Run upstream task to push xcom
+ dag_maker.run_ti("unmapped_task", dag_run=dag_run, session=session)
+
+ # Get downstream task instance
+ ti_downstream = dag_run.get_task_instance("downstream",
session=session)
+ ti_downstream.task = dag_maker.dag.task_dict["downstream"]
+
+ # Pull xcom - should return single dict value, not
LazyXComSelectSequence
+ result = ti_downstream.xcom_pull(task_ids="unmapped_task",
session=session)
+ assert isinstance(result, dict), f"Expected dict for unmapped task,
got {type(result)}"
+ assert result == {"key": "value"}
+
+ def test_xcom_pull_returns_lazy_sequence_for_mapped_xcom(self, dag_maker,
session):
+ """
+ Test that xcom_pull returns LazyXComSelectSequence when XComs are
mapped (map_index >= 0)
+ and map_indexes is not specified.
+ """
+ from airflow.models.xcom import LazyXComSelectSequence
+
+ with dag_maker(dag_id="test_xcom_mapped_values", session=session):
+
+ @task
+ def push_values(val):
+ return val
+
+ upstream = push_values.expand(val=[2, 4])
+ downstream = PythonOperator(
+ task_id="downstream",
+ python_callable=lambda: None,
+ )
+ upstream >> downstream
+
+ dag_run = dag_maker.create_dagrun(logical_date=timezone.utcnow())
+ dag_maker.run_ti(upstream.operator.task_id, map_index=0,
dag_run=dag_run, session=session)
+ dag_maker.run_ti(upstream.operator.task_id, map_index=1,
dag_run=dag_run, session=session)
+
+ ti_downstream = dag_run.get_task_instance("downstream",
session=session)
+ ti_downstream.task = dag_maker.dag.task_dict["downstream"]
+
+ result = ti_downstream.xcom_pull(task_ids=upstream.operator.task_id,
session=session)
+ assert isinstance(result, LazyXComSelectSequence), (
+ f"Expected LazyXComSelectSequence for mapped XComs, got
{type(result)}"
+ )
+
def _get_lazy_xcom_access_expected_sql_lines() -> list[str]:
backend = os.environ.get("BACKEND")