This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit cd60cfc5dfcfa36f3d222bd20801b6ffcd5331b1
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Jul 29 10:34:50 2025 +0530

    Do not ignore `include_prior_dates` in xcom_pull when `map_indexes` is not 
specified (#53809)
    
    (cherry picked from commit 2a2d3e1b3f60a5f2281859d612d26e0fdb7f4f80)
---
 .../api_fastapi/execution_api/routes/xcoms.py      |  2 +
 .../api_fastapi/execution_api/versions/__init__.py |  7 +++-
 .../execution_api/versions/v2025_08_10.py          | 11 +++++
 .../execution_api/versions/head/test_xcoms.py      | 49 ++++++++++++++++++++++
 task-sdk/src/airflow/sdk/api/client.py             |  3 ++
 task-sdk/src/airflow/sdk/bases/xcom.py             |  5 +++
 task-sdk/src/airflow/sdk/execution_time/comms.py   |  1 +
 .../src/airflow/sdk/execution_time/supervisor.py   |  9 +++-
 .../src/airflow/sdk/execution_time/task_runner.py  |  1 +
 .../task_sdk/execution_time/test_supervisor.py     |  3 +-
 .../task_sdk/execution_time/test_task_runner.py    |  4 +-
 11 files changed, 90 insertions(+), 5 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py
index 9762c7b3498..eae3539f593 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py
@@ -230,6 +230,7 @@ class GetXComSliceFilterParams(BaseModel):
     start: int | None = None
     stop: int | None = None
     step: int | None = None
+    include_prior_dates: bool = False
 
 
 @router.get(
@@ -249,6 +250,7 @@ def get_mapped_xcom_by_slice(
         key=key,
         task_ids=task_id,
         dag_ids=dag_id,
+        include_prior_dates=params.include_prior_dates,
         session=session,
     )
     query = query.order_by(None)
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
index fee781a8ecf..aeadd1affc9 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
@@ -23,11 +23,16 @@ from airflow.api_fastapi.execution_api.versions.v2025_04_28 
import AddRenderedMa
 from airflow.api_fastapi.execution_api.versions.v2025_05_20 import 
DowngradeUpstreamMapIndexes
 from airflow.api_fastapi.execution_api.versions.v2025_08_10 import (
     AddDagRunStateFieldAndPreviousEndpoint,
+    AddIncludePriorDatesToGetXComSlice,
 )
 
 bundle = VersionBundle(
     HeadVersion(),
-    Version("2025-08-10", AddDagRunStateFieldAndPreviousEndpoint),
+    Version(
+        "2025-08-10",
+        AddDagRunStateFieldAndPreviousEndpoint,
+        AddIncludePriorDatesToGetXComSlice,
+    ),
     Version("2025-05-20", DowngradeUpstreamMapIndexes),
     Version("2025-04-28", AddRenderedMapIndexField),
     Version("2025-04-11"),
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_08_10.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_08_10.py
index 188eaec2d79..ec66915e4d9 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_08_10.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_08_10.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 from cadwyn import ResponseInfo, VersionChange, 
convert_response_to_previous_version_for, endpoint, schema
 
 from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun, 
TIRunContext
+from airflow.api_fastapi.execution_api.routes.xcoms import 
GetXComSliceFilterParams
 
 
 class AddDagRunStateFieldAndPreviousEndpoint(VersionChange):
@@ -37,3 +38,13 @@ class AddDagRunStateFieldAndPreviousEndpoint(VersionChange):
         """Remove the `state` field from the dag_run object when converting to 
the previous version."""
         if "dag_run" in response.body and isinstance(response.body["dag_run"], 
dict):
             response.body["dag_run"].pop("state", None)
+
+
+class AddIncludePriorDatesToGetXComSlice(VersionChange):
+    """Add the `include_prior_dates` field to GetXComSliceFilterParams."""
+
+    description = __doc__
+
+    instructions_to_migrate_to_previous_version = (
+        
schema(GetXComSliceFilterParams).field("include_prior_dates").didnt_exist,
+    )
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_xcoms.py 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_xcoms.py
index 1b10e81cd23..ea93c2f96e0 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_xcoms.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_xcoms.py
@@ -31,6 +31,7 @@ from airflow.models.taskmap import TaskMap
 from airflow.models.xcom import XComModel
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.serialization.serde import deserialize, serialize
+from airflow.utils import timezone
 from airflow.utils.session import create_session
 
 pytestmark = pytest.mark.db_test
@@ -273,6 +274,54 @@ class TestXComsGetEndpoint:
         assert response.status_code == 200
         assert response.json() == ["f", "o", "b"][key]
 
+    @pytest.mark.parametrize(
+        "include_prior_dates, expected_xcoms",
+        [[True, ["earlier_value", "later_value"]], [False, ["later_value"]]],
+    )
+    def test_xcom_get_slice_accepts_include_prior_dates(
+        self, client, dag_maker, session, include_prior_dates, expected_xcoms
+    ):
+        """Test that the slice endpoint accepts include_prior_dates parameter 
and works correctly."""
+
+        with dag_maker(dag_id="dag"):
+            EmptyOperator(task_id="task")
+
+        earlier_run = dag_maker.create_dagrun(
+            run_id="earlier_run", 
logical_date=timezone.parse("2024-01-01T00:00:00Z")
+        )
+        later_run = dag_maker.create_dagrun(
+            run_id="later_run", 
logical_date=timezone.parse("2024-01-02T00:00:00Z")
+        )
+
+        earlier_ti = earlier_run.get_task_instance("task")
+        later_ti = later_run.get_task_instance("task")
+
+        earlier_xcom = XComModel(
+            key="test_key",
+            value="earlier_value",
+            dag_run_id=earlier_ti.dag_run.id,
+            run_id=earlier_ti.run_id,
+            task_id=earlier_ti.task_id,
+            dag_id=earlier_ti.dag_id,
+        )
+        later_xcom = XComModel(
+            key="test_key",
+            value="later_value",
+            dag_run_id=later_ti.dag_run.id,
+            run_id=later_ti.run_id,
+            task_id=later_ti.task_id,
+            dag_id=later_ti.dag_id,
+        )
+        session.add_all([earlier_xcom, later_xcom])
+        session.commit()
+
+        response = client.get(
+            
f"/execution/xcoms/dag/later_run/task/test_key/slice?include_prior_dates={include_prior_dates}"
+        )
+        assert response.status_code == 200
+
+        assert response.json() == expected_xcoms
+
 
 class TestXComsSetEndpoint:
     @pytest.mark.parametrize(
diff --git a/task-sdk/src/airflow/sdk/api/client.py 
b/task-sdk/src/airflow/sdk/api/client.py
index f36c46f45de..bbf6eb4dea0 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -491,6 +491,7 @@ class XComOperations:
         start: int | None,
         stop: int | None,
         step: int | None,
+        include_prior_dates: bool = False,
     ) -> XComSequenceSliceResponse:
         params = {}
         if start is not None:
@@ -499,6 +500,8 @@ class XComOperations:
             params["stop"] = stop
         if step is not None:
             params["step"] = step
+        if include_prior_dates:
+            params["include_prior_dates"] = include_prior_dates
         resp = 
self.client.get(f"xcoms/{dag_id}/{run_id}/{task_id}/{key}/slice", params=params)
         return XComSequenceSliceResponse.model_validate_json(resp.read())
 
diff --git a/task-sdk/src/airflow/sdk/bases/xcom.py 
b/task-sdk/src/airflow/sdk/bases/xcom.py
index 423fde12012..7c982a050dd 100644
--- a/task-sdk/src/airflow/sdk/bases/xcom.py
+++ b/task-sdk/src/airflow/sdk/bases/xcom.py
@@ -277,6 +277,7 @@ class BaseXCom:
         dag_id: str,
         task_id: str,
         run_id: str,
+        include_prior_dates: bool = False,
     ) -> Any:
         """
         Retrieve all XCom values for a task, typically from all map indexes.
@@ -291,6 +292,9 @@ class BaseXCom:
         :param run_id: DAG run ID for the task.
         :param dag_id: DAG ID to pull XComs from.
         :param task_id: Task ID to pull XComs from.
+        :param include_prior_dates: If *False* (default), only XComs from the
+            specified DAG run are returned. If *True*, the latest matching 
XComs are
+            returned regardless of the run they belong to.
         :return: List of all XCom values if found.
         """
         from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
@@ -304,6 +308,7 @@ class BaseXCom:
                 start=None,
                 stop=None,
                 step=None,
+                include_prior_dates=include_prior_dates,
             ),
         )
 
diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py 
b/task-sdk/src/airflow/sdk/execution_time/comms.py
index 1f9eb3141a3..30694905217 100644
--- a/task-sdk/src/airflow/sdk/execution_time/comms.py
+++ b/task-sdk/src/airflow/sdk/execution_time/comms.py
@@ -692,6 +692,7 @@ class GetXComSequenceSlice(BaseModel):
     start: int | None
     stop: int | None
     step: int | None
+    include_prior_dates: bool = False
     type: Literal["GetXComSequenceSlice"] = "GetXComSequenceSlice"
 
 
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index ccac2d8d2ad..fffa1ad1555 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -1132,7 +1132,14 @@ class ActivitySubprocess(WatchedSubprocess):
                 resp = xcom
         elif isinstance(msg, GetXComSequenceSlice):
             xcoms = self.client.xcoms.get_sequence_slice(
-                msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.start, 
msg.stop, msg.step
+                msg.dag_id,
+                msg.run_id,
+                msg.task_id,
+                msg.key,
+                msg.start,
+                msg.stop,
+                msg.step,
+                msg.include_prior_dates,
             )
             resp = XComSequenceSliceResult.from_response(xcoms)
         elif isinstance(msg, DeferTask):
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index a6bf2d11e1b..d4d09c8e20c 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -361,6 +361,7 @@ class RuntimeTaskInstance(TaskInstance):
                     key=key,
                     task_id=t_id,
                     dag_id=dag_id,
+                    include_prior_dates=include_prior_dates,
                 )
 
                 if values is None:
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 511cccc2a33..196494c258d 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -1954,10 +1954,11 @@ class TestHandleRequest:
                     start=None,
                     stop=None,
                     step=None,
+                    include_prior_dates=False,
                 ),
                 {"root": ["foo", "bar"], "type": "XComSequenceSliceResult"},
                 "xcoms.get_sequence_slice",
-                ("test_dag", "test_run", "test_task", "test_key", None, None, 
None),
+                ("test_dag", "test_run", "test_task", "test_key", None, None, 
None, False),
                 {},
                 XComSequenceSliceResult(root=["foo", "bar"]),
                 None,
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index b65c093856e..2556845dac8 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -2049,8 +2049,7 @@ class TestXComAfterTaskExecution:
 
         class CustomOperator(BaseOperator):
             def execute(self, context):
-                value = context["ti"].xcom_pull(task_ids="pull_task", 
key="key")
-                print(f"Pulled XCom Value: {value}")
+                context["ti"].xcom_pull(task_ids="pull_task", key="key")
 
         task = CustomOperator(task_id="pull_task")
         runtime_ti = create_runtime_ti(task=task)
@@ -2061,6 +2060,7 @@ class TestXComAfterTaskExecution:
             dag_id="test_dag",
             task_id="pull_task",
             run_id="test_run",
+            include_prior_dates=False,
         )
 
         assert not any(

Reply via email to