This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new a59a4800193 [v3-1-test] fix Inconsistent XCom Return Type in Mapped 
Task Groups with Dynamic … (#59104) (#63293)
a59a4800193 is described below

commit a59a4800193343dea9ff18afa1d082759cbc4d1e
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Mar 10 22:18:47 2026 +0100

    [v3-1-test] fix Inconsistent XCom Return Type in Mapped Task Groups with 
Dynamic … (#59104) (#63293)
    
    * fix Inconsistent XCom Return Type in Mapped Task Groups with Dynamic Task 
Mapping
    
    * add unit test for xcom_pull
    
    * remove asc func
    (cherry picked from commit df2e2b831b4a9f2c13c109fa609579d20fffac39)
    
    Co-authored-by: Henry Chen <[email protected]>
---
 airflow-core/src/airflow/models/taskinstance.py    | 10 +++-
 .../tests/unit/models/test_taskinstance.py         | 65 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 3 deletions(-)

diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 7d58046d03a..95a4430f9ac 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1847,6 +1847,7 @@ class TaskInstance(Base, LoggingMixin):
     ) -> Any:
         """:meta private:"""  # noqa: D400
         # This is only kept for compatibility in tests for now while AIP-72 is 
in progress.
+
         if dag_id is None:
             dag_id = self.dag_id
         if run_id is None:
@@ -1878,12 +1879,15 @@ class TaskInstance(Base, LoggingMixin):
             ).first()
             if first is None:  # No matching XCom at all.
                 return default
+
             if map_indexes is not None or first.map_index < 0:
                 return XComModel.deserialize_value(first)
 
-            # raise RuntimeError("Nothing should hit this anymore")
-
-        # TODO: TaskSDK: We should remove this, but many tests still currently 
call `ti.run()`. See #45549
+            return LazyXComSelectSequence.from_select(
+                query.with_only_columns(XComModel.value).order_by(None),
+                order_by=[XComModel.map_index.expression],
+                session=session,
+            )
 
         # At this point either task_ids or map_indexes is explicitly 
multi-value.
         # Order return values to match task_ids and map_indexes ordering.
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py 
b/airflow-core/tests/unit/models/test_taskinstance.py
index f3ffcedf218..615c40ac918 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -3029,6 +3029,71 @@ class TestMappedTaskInstanceReceiveValue:
             out_lines = [line.strip() for line in f]
         assert out_lines == ["hello FOO", "goodbye FOO", "hello BAR", "goodbye 
BAR"]
 
+    def test_xcom_pull_unmapped_task(self, dag_maker, session):
+        """
+        Test that xcom_pull from unmapped task returns single deserialized 
value.
+
+        For unmapped tasks with map_index < 0, xcom_pull should return the 
single value,
+        not a LazyXComSelectSequence.
+        """
+
+        with dag_maker(dag_id="test_xcom_unmapped", session=session):
+            upstream = PythonOperator(
+                task_id="unmapped_task",
+                python_callable=lambda: {"key": "value"},
+            )
+            downstream = PythonOperator(
+                task_id="downstream",
+                python_callable=lambda: None,
+            )
+            upstream >> downstream
+
+        dag_run = dag_maker.create_dagrun(logical_date=timezone.utcnow())
+
+        # Run upstream task to push xcom
+        dag_maker.run_ti("unmapped_task", dag_run=dag_run, session=session)
+
+        # Get downstream task instance
+        ti_downstream = dag_run.get_task_instance("downstream", 
session=session)
+        ti_downstream.task = dag_maker.dag.task_dict["downstream"]
+
+        # Pull xcom - should return single dict value, not 
LazyXComSelectSequence
+        result = ti_downstream.xcom_pull(task_ids="unmapped_task", 
session=session)
+        assert isinstance(result, dict), f"Expected dict for unmapped task, 
got {type(result)}"
+        assert result == {"key": "value"}
+
+    def test_xcom_pull_returns_lazy_sequence_for_mapped_xcom(self, dag_maker, 
session):
+        """
+        Test that xcom_pull returns LazyXComSelectSequence when XComs are 
mapped (map_index >= 0)
+        and map_indexes is not specified.
+        """
+        from airflow.models.xcom import LazyXComSelectSequence
+
+        with dag_maker(dag_id="test_xcom_mapped_values", session=session):
+
+            @task
+            def push_values(val):
+                return val
+
+            upstream = push_values.expand(val=[2, 4])
+            downstream = PythonOperator(
+                task_id="downstream",
+                python_callable=lambda: None,
+            )
+            upstream >> downstream
+
+        dag_run = dag_maker.create_dagrun(logical_date=timezone.utcnow())
+        dag_maker.run_ti(upstream.operator.task_id, map_index=0, 
dag_run=dag_run, session=session)
+        dag_maker.run_ti(upstream.operator.task_id, map_index=1, 
dag_run=dag_run, session=session)
+
+        ti_downstream = dag_run.get_task_instance("downstream", 
session=session)
+        ti_downstream.task = dag_maker.dag.task_dict["downstream"]
+
+        result = ti_downstream.xcom_pull(task_ids=upstream.operator.task_id, 
session=session)
+        assert isinstance(result, LazyXComSelectSequence), (
+            f"Expected LazyXComSelectSequence for mapped XComs, got 
{type(result)}"
+        )
+
 
 def _get_lazy_xcom_access_expected_sql_lines() -> list[str]:
     backend = os.environ.get("BACKEND")

Reply via email to