This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit cd60cfc5dfcfa36f3d222bd20801b6ffcd5331b1 Author: Amogh Desai <[email protected]> AuthorDate: Tue Jul 29 10:34:50 2025 +0530 Do not ignore `include_prior_dates` in xcom_pull when `map_indexes` is not specified (#53809) (cherry picked from commit 2a2d3e1b3f60a5f2281859d612d26e0fdb7f4f80) --- .../api_fastapi/execution_api/routes/xcoms.py | 2 + .../api_fastapi/execution_api/versions/__init__.py | 7 +++- .../execution_api/versions/v2025_08_10.py | 11 +++++ .../execution_api/versions/head/test_xcoms.py | 49 ++++++++++++++++++++++ task-sdk/src/airflow/sdk/api/client.py | 3 ++ task-sdk/src/airflow/sdk/bases/xcom.py | 5 +++ task-sdk/src/airflow/sdk/execution_time/comms.py | 1 + .../src/airflow/sdk/execution_time/supervisor.py | 9 +++- .../src/airflow/sdk/execution_time/task_runner.py | 1 + .../task_sdk/execution_time/test_supervisor.py | 3 +- .../task_sdk/execution_time/test_task_runner.py | 4 +- 11 files changed, 90 insertions(+), 5 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py index 9762c7b3498..eae3539f593 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py @@ -230,6 +230,7 @@ class GetXComSliceFilterParams(BaseModel): start: int | None = None stop: int | None = None step: int | None = None + include_prior_dates: bool = False @router.get( @@ -249,6 +250,7 @@ def get_mapped_xcom_by_slice( key=key, task_ids=task_id, dag_ids=dag_id, + include_prior_dates=params.include_prior_dates, session=session, ) query = query.order_by(None) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py index fee781a8ecf..aeadd1affc9 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py @@ -23,11 +23,16 @@ from airflow.api_fastapi.execution_api.versions.v2025_04_28 import AddRenderedMa from airflow.api_fastapi.execution_api.versions.v2025_05_20 import DowngradeUpstreamMapIndexes from airflow.api_fastapi.execution_api.versions.v2025_08_10 import ( AddDagRunStateFieldAndPreviousEndpoint, + AddIncludePriorDatesToGetXComSlice, ) bundle = VersionBundle( HeadVersion(), - Version("2025-08-10", AddDagRunStateFieldAndPreviousEndpoint), + Version( + "2025-08-10", + AddDagRunStateFieldAndPreviousEndpoint, + AddIncludePriorDatesToGetXComSlice, + ), Version("2025-05-20", DowngradeUpstreamMapIndexes), Version("2025-04-28", AddRenderedMapIndexField), Version("2025-04-11"), diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_08_10.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_08_10.py index 188eaec2d79..ec66915e4d9 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_08_10.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_08_10.py @@ -20,6 +20,7 @@ from __future__ import annotations from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, endpoint, schema from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun, TIRunContext +from airflow.api_fastapi.execution_api.routes.xcoms import GetXComSliceFilterParams class AddDagRunStateFieldAndPreviousEndpoint(VersionChange): @@ -37,3 +38,13 @@ class AddDagRunStateFieldAndPreviousEndpoint(VersionChange): """Remove the `state` field from the dag_run object when converting to the previous version.""" if "dag_run" in response.body and isinstance(response.body["dag_run"], dict): response.body["dag_run"].pop("state", None) + + +class AddIncludePriorDatesToGetXComSlice(VersionChange): + """Add the `include_prior_dates` field to GetXComSliceFilterParams.""" + + description = __doc__ + + instructions_to_migrate_to_previous_version = ( + schema(GetXComSliceFilterParams).field("include_prior_dates").didnt_exist, + ) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_xcoms.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_xcoms.py index 1b10e81cd23..ea93c2f96e0 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_xcoms.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_xcoms.py @@ -31,6 +31,7 @@ from airflow.models.taskmap import TaskMap from airflow.models.xcom import XComModel from airflow.providers.standard.operators.empty import EmptyOperator from airflow.serialization.serde import deserialize, serialize +from airflow.utils import timezone from airflow.utils.session import create_session pytestmark = pytest.mark.db_test @@ -273,6 +274,54 @@ class TestXComsGetEndpoint: assert response.status_code == 200 assert response.json() == ["f", "o", "b"][key] + @pytest.mark.parametrize( + "include_prior_dates, expected_xcoms", + [[True, ["earlier_value", "later_value"]], [False, ["later_value"]]], + ) + def test_xcom_get_slice_accepts_include_prior_dates( + self, client, dag_maker, session, include_prior_dates, expected_xcoms + ): + """Test that the slice endpoint accepts include_prior_dates parameter and works correctly.""" + + with dag_maker(dag_id="dag"): + EmptyOperator(task_id="task") + + earlier_run = dag_maker.create_dagrun( + run_id="earlier_run", logical_date=timezone.parse("2024-01-01T00:00:00Z") + ) + later_run = dag_maker.create_dagrun( + run_id="later_run", logical_date=timezone.parse("2024-01-02T00:00:00Z") + ) + + earlier_ti = earlier_run.get_task_instance("task") + later_ti = later_run.get_task_instance("task") + + earlier_xcom = XComModel( + key="test_key", + value="earlier_value", + dag_run_id=earlier_ti.dag_run.id, + run_id=earlier_ti.run_id, + task_id=earlier_ti.task_id, + dag_id=earlier_ti.dag_id, + ) + later_xcom = XComModel( + key="test_key", + value="later_value", + dag_run_id=later_ti.dag_run.id, + run_id=later_ti.run_id, + task_id=later_ti.task_id, + dag_id=later_ti.dag_id, + ) + session.add_all([earlier_xcom, later_xcom]) + session.commit() + + response = client.get( + f"/execution/xcoms/dag/later_run/task/test_key/slice?include_prior_dates={include_prior_dates}" + ) + assert response.status_code == 200 + + assert response.json() == expected_xcoms + class TestXComsSetEndpoint: @pytest.mark.parametrize( diff --git a/task-sdk/src/airflow/sdk/api/client.py b/task-sdk/src/airflow/sdk/api/client.py index f36c46f45de..bbf6eb4dea0 100644 --- a/task-sdk/src/airflow/sdk/api/client.py +++ b/task-sdk/src/airflow/sdk/api/client.py @@ -491,6 +491,7 @@ class XComOperations: start: int | None, stop: int | None, step: int | None, + include_prior_dates: bool = False, ) -> XComSequenceSliceResponse: params = {} if start is not None: @@ -499,6 +500,8 @@ class XComOperations: params["stop"] = stop if step is not None: params["step"] = step + if include_prior_dates: + params["include_prior_dates"] = include_prior_dates resp = self.client.get(f"xcoms/{dag_id}/{run_id}/{task_id}/{key}/slice", params=params) return XComSequenceSliceResponse.model_validate_json(resp.read()) diff --git a/task-sdk/src/airflow/sdk/bases/xcom.py b/task-sdk/src/airflow/sdk/bases/xcom.py index 423fde12012..7c982a050dd 100644 --- a/task-sdk/src/airflow/sdk/bases/xcom.py +++ b/task-sdk/src/airflow/sdk/bases/xcom.py @@ -277,6 +277,7 @@ class BaseXCom: dag_id: str, task_id: str, run_id: str, + include_prior_dates: bool = False, ) -> Any: """ Retrieve all XCom values for a task, typically from all map indexes. @@ -291,6 +292,9 @@ class BaseXCom: :param run_id: DAG run ID for the task. :param dag_id: DAG ID to pull XComs from. :param task_id: Task ID to pull XComs from. + :param include_prior_dates: If *False* (default), only XComs from the + specified DAG run are returned. If *True*, the latest matching XComs are + returned regardless of the run they belong to. :return: List of all XCom values if found. """ from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS @@ -304,6 +308,7 @@ class BaseXCom: start=None, stop=None, step=None, + include_prior_dates=include_prior_dates, ), ) diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py b/task-sdk/src/airflow/sdk/execution_time/comms.py index 1f9eb3141a3..30694905217 100644 --- a/task-sdk/src/airflow/sdk/execution_time/comms.py +++ b/task-sdk/src/airflow/sdk/execution_time/comms.py @@ -692,6 +692,7 @@ class GetXComSequenceSlice(BaseModel): start: int | None stop: int | None step: int | None + include_prior_dates: bool = False type: Literal["GetXComSequenceSlice"] = "GetXComSequenceSlice" diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index ccac2d8d2ad..fffa1ad1555 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1132,7 +1132,14 @@ class ActivitySubprocess(WatchedSubprocess): resp = xcom elif isinstance(msg, GetXComSequenceSlice): xcoms = self.client.xcoms.get_sequence_slice( - msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.start, msg.stop, msg.step + msg.dag_id, + msg.run_id, + msg.task_id, + msg.key, + msg.start, + msg.stop, + msg.step, + msg.include_prior_dates, ) resp = XComSequenceSliceResult.from_response(xcoms) elif isinstance(msg, DeferTask): diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index a6bf2d11e1b..d4d09c8e20c 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -361,6 +361,7 @@ class RuntimeTaskInstance(TaskInstance): key=key, task_id=t_id, dag_id=dag_id, + include_prior_dates=include_prior_dates, ) if values is None: diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 511cccc2a33..196494c258d 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -1954,10 +1954,11 @@ class TestHandleRequest: start=None, stop=None, step=None, + include_prior_dates=False, ), {"root": ["foo", "bar"], "type": "XComSequenceSliceResult"}, "xcoms.get_sequence_slice", - ("test_dag", "test_run", "test_task", "test_key", None, None, None), + ("test_dag", "test_run", "test_task", "test_key", None, None, None, False), {}, XComSequenceSliceResult(root=["foo", "bar"]), None, diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index b65c093856e..2556845dac8 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -2049,8 +2049,7 @@ class TestXComAfterTaskExecution: class CustomOperator(BaseOperator): def execute(self, context): - value = context["ti"].xcom_pull(task_ids="pull_task", key="key") - print(f"Pulled XCom Value: {value}") + context["ti"].xcom_pull(task_ids="pull_task", key="key") task = CustomOperator(task_id="pull_task") runtime_ti = create_runtime_ti(task=task) @@ -2061,6 +2060,7 @@ class TestXComAfterTaskExecution: dag_id="test_dag", task_id="pull_task", run_id="test_run", + include_prior_dates=False, ) assert not any(
