This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 76d71783839 Stubbing missing tests for task sdk integration testing
for xcoms (#58031)
76d71783839 is described below
commit 76d7178383997db3f3cfb30f7b6b51a61ad698b1
Author: Amogh Desai <[email protected]>
AuthorDate: Fri Nov 7 19:26:33 2025 +0530
Stubbing missing tests for task sdk integration testing for xcoms (#58031)
---
.../tests/task_sdk_tests/test_xcom_operations.py | 74 ++++++++++++++++++++++
1 file changed, 74 insertions(+)
diff --git a/task-sdk-tests/tests/task_sdk_tests/test_xcom_operations.py
b/task-sdk-tests/tests/task_sdk_tests/test_xcom_operations.py
index 75692835a70..684ac070ed4 100644
--- a/task-sdk-tests/tests/task_sdk_tests/test_xcom_operations.py
+++ b/task-sdk-tests/tests/task_sdk_tests/test_xcom_operations.py
@@ -25,6 +25,8 @@ These tests validate the Execution API endpoints for XCom
operations:
from __future__ import annotations
+import pytest
+
from airflow.sdk.api.datamodels._generated import XComResponse
from airflow.sdk.execution_time.comms import OKResponse
from task_sdk_tests import console
@@ -62,6 +64,18 @@ def test_get_xcom(sdk_client, dag_info):
console.print("[green]✅ XCom get test passed!")
[email protected](reason="TODO: Implement XCom get (not found) test")
+def test_get_xcom_not_found(sdk_client, dag_info):
+ """
+ Test getting non-existent XCom value.
+
+ Expected: XComResponse with value=None or ErrorResponse
+ Endpoint: GET /execution/xcoms/{dag_id}/{run_id}/{task_id}/{key}
+ """
+ console.print("[yellow]TODO: Implement test_get_xcom_not_found")
+ raise NotImplementedError("test_get_xcom_not_found not implemented")
+
+
def test_set_xcom(sdk_client, dag_info):
"""
Test setting XCom value and then getting it to ensure set worked.
@@ -158,3 +172,63 @@ def test_xcom_delete(sdk_client, dag_info):
assert get_response.key == test_key
assert get_response.value is None
console.print("[green]✅ XCom delete test passed!")
+
+
[email protected](reason="TODO: Implement XCom head test")
+def test_xcom_head(sdk_client, dag_info):
+ """
+ Test getting count of mapped XCom values.
+
+ Expected: XComCountResponse with len field in it (should be ideally equal
to number of mapped tasks, since we have None it might throw RuntimeError)
+ Endpoint: HEAD /execution/xcoms/{dag_id}/{run_id}/{task_id}/{key}
+ """
+ console.print("[yellow]TODO: Implement test_xcom_head")
+ raise NotImplementedError("test_xcom_head not implemented")
+
+
[email protected](reason="TODO: Implement XCom get_sequence_item test")
+def test_xcom_get_sequence_item(sdk_client, dag_info):
+ """
+ Test getting XCom sequence item by offset.
+
+ Expected: XComSequenceIndexResponse with value
+ Endpoint: GET
/execution/xcoms/{dag_id}/{run_id}/{task_id}/{key}/item/{offset}
+ """
+ console.print("[yellow]TODO: Implement test_xcom_get_sequence_item")
+ raise NotImplementedError("test_xcom_get_sequence_item not implemented")
+
+
[email protected](reason="TODO: Implement XCom get_sequence_item (not found)
test")
+def test_xcom_get_sequence_item_not_found(sdk_client, dag_info):
+ """
+ Test getting non-existent XCom sequence item.
+
+ Expected: ErrorResponse with XCOM_NOT_FOUND error
+ Endpoint: GET
/execution/xcoms/{dag_id}/{run_id}/{task_id}/{key}/item/{offset}
+ """
+ console.print("[yellow]TODO: Implement
test_xcom_get_sequence_item_not_found")
+ raise NotImplementedError("test_xcom_get_sequence_item_not_found not
implemented")
+
+
[email protected](reason="TODO: Implement XCom get_sequence_slice test")
+def test_xcom_get_sequence_slice(sdk_client, dag_info):
+ """
+ Test getting XCom sequence slice.
+
+ Expected: XComSequenceSliceResponse with list of values
+ Endpoint: GET /execution/xcoms/{dag_id}/{run_id}/{task_id}/{key}/slice
+ """
+ console.print("[yellow]TODO: Implement test_xcom_get_sequence_slice")
+ raise NotImplementedError("test_xcom_get_sequence_slice not implemented")
+
+
[email protected](reason="TODO: Implement XCom get_sequence_slice (not found)
test")
+def test_xcom_get_sequence_slice_not_found(sdk_client, dag_info):
+ """
+ Test getting slice for non-existent XCom key.
+
+ Expected: XComSequenceSliceResponse as empty list
+ Endpoint: GET /execution/xcoms/{dag_id}/{run_id}/{task_id}/{key}/slice
+ """
+ console.print("[yellow]TODO: Implement
test_xcom_get_sequence_slice_not_found")
+ raise NotImplementedError("test_xcom_get_sequence_slice_not_found not
implemented")