This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 48d53b6e050 Apply reserved-key check to XCom update payload (#65915)
(#66913)
48d53b6e050 is described below
commit 48d53b6e050600cf942ca255b0cf3824f5e1b708
Author: Rahul Vats <[email protected]>
AuthorDate: Sat May 16 02:01:34 2026 +0530
Apply reserved-key check to XCom update payload (#65915) (#66913)
XComCreateBody (POST /xcomEntries) rejects payloads containing
reserved serialization keys (__classname__, __type, __var,
__data__, …) via a field_validator that walks the value
recursively. XComUpdateBody (PATCH /xcomEntries/{key}) was
missing the same validator, so a payload that POST correctly
rejects with 422 was accepted on PATCH and stored as-is.
Extracts the recursive walker to a module-level
_check_forbidden_xcom_keys helper and has both XComCreateBody
and XComUpdateBody delegate to it, so create and update apply
the same payload-key check from a single source. A parametrized
test mirroring the existing
test_create_xcom_entry_blocks_forbidden_keys covers the PATCH
path.
(cherry picked from commit c1734893709c9d0bfeaa496799f4c560fc93fac4)
Co-authored-by: Jarek Potiuk <[email protected]>
---
.../api_fastapi/core_api/datamodels/xcom.py | 47 +++++++++++++---------
.../core_api/routes/public/test_xcom.py | 20 +++++++++
2 files changed, 48 insertions(+), 19 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py
index 05cbb47c36c..b42cc176f01 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py
@@ -83,6 +83,28 @@ class XComCollectionResponse(BaseModel):
total_entries: int
+def _check_forbidden_xcom_keys(value: Any) -> Any:
+ """Recursively reject forbidden deserialization keys in user-provided XCom
data."""
+ from airflow._shared.serialization import FORBIDDEN_XCOM_KEYS
+
+ def _walk(obj: Any, path: str = "value") -> None:
+ if isinstance(obj, dict):
+ found = FORBIDDEN_XCOM_KEYS & obj.keys()
+ if found:
+ raise ValueError(
+ f"XCom {path} contains reserved serialization keys: {',
'.join(sorted(found))}. "
+ f"These keys are reserved for internal use."
+ )
+ for k, v in obj.items():
+ _walk(v, f"{path}.{k}")
+ elif isinstance(obj, (list, tuple)):
+ for i, item in enumerate(obj):
+ _walk(item, f"{path}[{i}]")
+
+ _walk(value)
+ return value
+
+
class XComCreateBody(StrictBaseModel):
"""Payload serializer for creating an XCom entry."""
@@ -93,25 +115,7 @@ class XComCreateBody(StrictBaseModel):
@field_validator("value")
@classmethod
def _check_forbidden_keys(cls, value: Any) -> Any:
- """Recursively check for forbidden deserialization keys in
user-provided XCom data."""
- from airflow._shared.serialization import FORBIDDEN_XCOM_KEYS
-
- def _walk_forbidden_keys(obj: Any, path: str = "value") -> None:
- if isinstance(obj, dict):
- found = FORBIDDEN_XCOM_KEYS & obj.keys()
- if found:
- raise ValueError(
- f"XCom {path} contains reserved serialization keys:
{', '.join(sorted(found))}. "
- f"These keys are reserved for internal use."
- )
- for k, v in obj.items():
- _walk_forbidden_keys(v, f"{path}.{k}")
- elif isinstance(obj, (list, tuple)):
- for i, item in enumerate(obj):
- _walk_forbidden_keys(item, f"{path}[{i}]")
-
- _walk_forbidden_keys(value)
- return value
+ return _check_forbidden_xcom_keys(value)
class XComUpdateBody(StrictBaseModel):
@@ -119,3 +123,8 @@ class XComUpdateBody(StrictBaseModel):
value: Any
map_index: int = -1
+
+ @field_validator("value")
+ @classmethod
+ def _check_forbidden_keys(cls, value: Any) -> Any:
+ return _check_forbidden_xcom_keys(value)
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
index c51469d944e..0b2864c6e2d 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
@@ -864,3 +864,23 @@ class TestPatchXComEntry(TestXComEndpoint):
assert response.json()["key"] == slash_key
assert response.json()["value"] == json.dumps(new_value)
check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry",
logical_date=None)
+
+ @pytest.mark.parametrize(
+ ("key", "value"),
+ [
+ ("__classname__", {"__classname__":
"airflow.sdk.definitions.connection.Connection"}),
+ ("__type", {"__type":
"airflow.sdk.definitions.connection.Connection", "__var": {}}),
+ ("__data__", {"nested": {"__data__": "malicious"}}),
+ ],
+ )
+ def test_patch_xcom_entry_blocks_forbidden_keys(self, test_client, key,
value):
+ """Test that XCom update blocks deserialization metadata keys."""
+ self._create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE)
+ response = test_client.patch(
+
f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}",
+ json={"value": value, "map_index": -1},
+ )
+ assert response.status_code == 422
+ detail = str(response.json()["detail"])
+ assert "reserved serialization keys" in detail
+ assert key in detail