This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 41733b23516 Fix XCom PATCH/POST to store native values (instead of 
json.dumps) (#64220) (#67116)
41733b23516 is described below

commit 41733b23516ad200dc20e170e04facadefc0a636
Author: Rahul Vats <[email protected]>
AuthorDate: Mon May 18 21:58:45 2026 +0530

    Fix XCom PATCH/POST to store native values (instead of json.dumps) (#64220) 
(#67116)
    
    (cherry picked from commit acdd9da1ce46a5b08d75b6646e78ff8c8f74fa5c)
    
    Co-authored-by: Henry Chen <[email protected]>
---
 .../api_fastapi/core_api/routes/public/xcom.py     | 53 ++++++++++++++--------
 .../core_api/routes/public/test_xcom.py            | 26 +++++++++--
 2 files changed, 55 insertions(+), 24 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
index 2d98d873278..13c7f78ed28 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
@@ -17,7 +17,6 @@
 from __future__ import annotations
 
 import copy
-import json
 from typing import Annotated
 
 from fastapi import Depends, HTTPException, Query, status
@@ -291,27 +290,24 @@ def create_xcom_entry(
         )
 
     try:
-        value = json.dumps(request_body.value)
-    except (ValueError, TypeError):
+        XComModel.set(
+            key=request_body.key,
+            value=request_body.value,
+            dag_id=dag_id,
+            task_id=task_id,
+            run_id=dag_run_id,
+            map_index=request_body.map_index,
+            serialize=False,
+            session=session,
+        )
+    except (ValueError, TypeError) as e:
         raise HTTPException(
             status.HTTP_400_BAD_REQUEST, f"Couldn't serialise the XCom with 
key: `{request_body.key}`"
-        )
-
-    new = XComModel(
-        dag_run_id=dag_run.id,
-        key=request_body.key,
-        value=value,
-        run_id=dag_run_id,
-        task_id=task_id,
-        dag_id=dag_id,
-        map_index=request_body.map_index,
-    )
-    session.add(new)
-    session.flush()
+        ) from e
 
     xcom = session.scalar(
         select(XComModel)
-        .filter(
+        .where(
             XComModel.dag_id == dag_id,
             XComModel.task_id == task_id,
             XComModel.run_id == dag_run_id,
@@ -345,11 +341,12 @@ def update_xcom_entry(
     dag_run_id: str,
     xcom_key: str,
     patch_body: XComUpdateBody,
+    *,
     session: SessionDep,
 ) -> XComResponseNative:
     """Update an existing XCom entry."""
     # Check if XCom entry exists
-    xcom_entry = session.scalar(
+    xcom_query = (
         select(XComModel)
         .where(
             XComModel.dag_id == dag_id,
@@ -361,6 +358,7 @@ def update_xcom_entry(
         .limit(1)
         .options(joinedload(XComModel.task), 
joinedload(XComModel.dag_run).joinedload(DR.dag_model))
     )
+    xcom_entry = session.scalar(xcom_query)
 
     if not xcom_entry:
         raise HTTPException(
@@ -368,9 +366,24 @@ def update_xcom_entry(
             f"The XCom with key: `{xcom_key}` with mentioned task instance 
doesn't exist.",
         )
 
-    # Update XCom entry
-    xcom_entry.value = json.dumps(patch_body.value)
+    try:
+        XComModel.set(
+            key=xcom_key,
+            value=patch_body.value,
+            dag_id=dag_id,
+            task_id=task_id,
+            run_id=dag_run_id,
+            map_index=patch_body.map_index,
+            serialize=False,
+            session=session,
+        )
+    except (ValueError, TypeError) as e:
+        raise HTTPException(
+            status.HTTP_400_BAD_REQUEST, f"Couldn't serialise the XCom with 
key: `{xcom_key}`"
+        ) from e
 
+    # Fetch after setting, to get fresh object for response
+    xcom_entry = session.scalar(xcom_query)
     return XComResponseNative.model_validate(xcom_entry)
 
 
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
index 0b2864c6e2d..bf4471be73a 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
@@ -680,7 +680,7 @@ class TestCreateXComEntry(TestXComEndpoint):
             # Validate the created XCom response
             current_data = response.json()
             assert current_data["key"] == request_body.key
-            assert current_data["value"] == 
XComModel.serialize_value(request_body.value)
+            assert current_data["value"] == request_body.value
             assert current_data["dag_id"] == dag_id
             assert current_data["task_id"] == task_id
             assert current_data["run_id"] == dag_run_id
@@ -716,7 +716,7 @@ class TestCreateXComEntry(TestXComEndpoint):
         )
         assert get_resp.status_code == 200
         assert get_resp.json()["key"] == slash_key
-        assert get_resp.json()["value"] == json.dumps(TEST_XCOM_VALUE)
+        assert get_resp.json()["value"] == TEST_XCOM_VALUE
 
     @pytest.mark.parametrize(
         ("key", "value"),
@@ -833,7 +833,7 @@ class TestPatchXComEntry(TestXComEndpoint):
         assert response.status_code == expected_status
 
         if expected_status == 200:
-            assert response.json()["value"] == json.dumps(patch_body["value"])
+            assert response.json()["value"] == patch_body["value"]
         else:
             assert response.json()["detail"] == expected_detail
         check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry", 
logical_date=None)
@@ -862,7 +862,25 @@ class TestPatchXComEntry(TestXComEndpoint):
         )
         assert response.status_code == 200
         assert response.json()["key"] == slash_key
-        assert response.json()["value"] == json.dumps(new_value)
+        assert response.json()["value"] == new_value
+        check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry", 
logical_date=None)
+
+    def test_patch_xcom_preserves_int_type(self, test_client, session):
+        """Test scenario described in #59032: if existing XCom value type is 
int,
+        after patching with different value, it should still be int in the API 
response.
+        """
+        key = "int_type_xcom"
+        # Create with int value
+        self._create_xcom(key, 42)
+        patch_value = 100
+        response = test_client.patch(
+            
f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{key}",
+            json={"value": patch_value},
+        )
+        assert response.status_code == 200
+        data = response.json()
+        assert data["value"] == patch_value
+        assert isinstance(data["value"], int), f"Expected int type but got 
{type(data['value'])}"
         check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry", 
logical_date=None)
 
     @pytest.mark.parametrize(

Reply via email to