This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new df2e2b831b4 fix Inconsistent XCom Return Type in Mapped Task Groups 
with Dynamic … (#59104)
df2e2b831b4 is described below

commit df2e2b831b4a9f2c13c109fa609579d20fffac39
Author: Henry Chen <[email protected]>
AuthorDate: Wed Mar 11 03:38:31 2026 +0800

    fix Inconsistent XCom Return Type in Mapped Task Groups with Dynamic … 
(#59104)
    
    * fix Inconsistent XCom Return Type in Mapped Task Groups with Dynamic Task 
Mapping
    
    * add unit test for xcom_pull
    
    * remove asc func
---
 airflow-core/src/airflow/models/taskinstance.py    | 10 +++-
 .../tests/unit/models/test_taskinstance.py         | 65 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 3 deletions(-)

diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 8112d955582..32daf41aec3 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1683,6 +1683,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
     ) -> Any:
         """:meta private:"""  # noqa: D400
         # This is only kept for compatibility in tests for now while AIP-72 is 
in progress.
+
         if dag_id is None:
             dag_id = self.dag_id
         if run_id is None:
@@ -1714,12 +1715,15 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
             ).first()
             if first is None:  # No matching XCom at all.
                 return default
+
             if map_indexes is not None or first.map_index < 0:
                 return XComModel.deserialize_value(first)
 
-            # raise RuntimeError("Nothing should hit this anymore")
-
-        # TODO: TaskSDK: We should remove this, but many tests still currently 
call `ti.run()`. See #45549
+            return LazyXComSelectSequence.from_select(
+                query.with_only_columns(XComModel.value).order_by(None),
+                order_by=[XComModel.map_index.expression],
+                session=session,
+            )
 
         # At this point either task_ids or map_indexes is explicitly 
multi-value.
         # Order return values to match task_ids and map_indexes ordering.
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py 
b/airflow-core/tests/unit/models/test_taskinstance.py
index a013b09cdb0..82b9adc3162 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -2885,6 +2885,71 @@ class TestMappedTaskInstanceReceiveValue:
             out_lines = [line.strip() for line in f]
         assert out_lines == ["hello FOO", "goodbye FOO", "hello BAR", "goodbye 
BAR"]
 
+    def test_xcom_pull_unmapped_task(self, dag_maker, session):
+        """
+        Test that xcom_pull from unmapped task returns single deserialized 
value.
+
+        For unmapped tasks with map_index < 0, xcom_pull should return the 
single value,
+        not a LazyXComSelectSequence.
+        """
+
+        with dag_maker(dag_id="test_xcom_unmapped", session=session):
+            upstream = PythonOperator(
+                task_id="unmapped_task",
+                python_callable=lambda: {"key": "value"},
+            )
+            downstream = PythonOperator(
+                task_id="downstream",
+                python_callable=lambda: None,
+            )
+            upstream >> downstream
+
+        dag_run = dag_maker.create_dagrun(logical_date=timezone.utcnow())
+
+        # Run upstream task to push xcom
+        dag_maker.run_ti("unmapped_task", dag_run=dag_run, session=session)
+
+        # Get downstream task instance
+        ti_downstream = dag_run.get_task_instance("downstream", 
session=session)
+        ti_downstream.task = dag_maker.dag.task_dict["downstream"]
+
+        # Pull xcom - should return single dict value, not 
LazyXComSelectSequence
+        result = ti_downstream.xcom_pull(task_ids="unmapped_task", 
session=session)
+        assert isinstance(result, dict), f"Expected dict for unmapped task, 
got {type(result)}"
+        assert result == {"key": "value"}
+
+    def test_xcom_pull_returns_lazy_sequence_for_mapped_xcom(self, dag_maker, 
session):
+        """
+        Test that xcom_pull returns LazyXComSelectSequence when XComs are 
mapped (map_index >= 0)
+        and map_indexes is not specified.
+        """
+        from airflow.models.xcom import LazyXComSelectSequence
+
+        with dag_maker(dag_id="test_xcom_mapped_values", session=session):
+
+            @task
+            def push_values(val):
+                return val
+
+            upstream = push_values.expand(val=[2, 4])
+            downstream = PythonOperator(
+                task_id="downstream",
+                python_callable=lambda: None,
+            )
+            upstream >> downstream
+
+        dag_run = dag_maker.create_dagrun(logical_date=timezone.utcnow())
+        dag_maker.run_ti(upstream.operator.task_id, map_index=0, 
dag_run=dag_run, session=session)
+        dag_maker.run_ti(upstream.operator.task_id, map_index=1, 
dag_run=dag_run, session=session)
+
+        ti_downstream = dag_run.get_task_instance("downstream", 
session=session)
+        ti_downstream.task = dag_maker.dag.task_dict["downstream"]
+
+        result = ti_downstream.xcom_pull(task_ids=upstream.operator.task_id, 
session=session)
+        assert isinstance(result, LazyXComSelectSequence), (
+            f"Expected LazyXComSelectSequence for mapped XComs, got 
{type(result)}"
+        )
+
 
 def _get_lazy_xcom_access_expected_sql_lines() -> list[str]:
     backend = os.environ.get("BACKEND")

Reply via email to