kaxil commented on code in PR #64297:
URL: https://github.com/apache/airflow/pull/64297#discussion_r3019043220
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -2058,6 +2058,135 @@ def mock_get_all_side_effect(task_id, **kwargs):
assert mock_get_one.called
assert not mock_get_all.called
+ @pytest.mark.parametrize(
+ ("task_ids", "default", "expected_default"),
+ [
+ pytest.param("task_a", "fallback", "fallback",
id="single_task_str_default"),
+ pytest.param("task_a", NOTSET, NOTSET,
id="single_task_NOTSET_default"),
+ pytest.param(["task_a"], "fallback", ["fallback"],
id="list_task_str_default"),
+ pytest.param(
+ ["task_a", "task_b"],
+ "fallback",
+ ["fallback", "fallback"],
+ id="multiple_tasks_str_default",
+ ),
+ ],
+ )
+ def test_xcom_pull_default_with_notset_map_indexes(
+ self,
+ create_runtime_ti,
+ mock_supervisor_comms,
+ task_ids,
+ default,
+ expected_default,
+ ):
+ """Test that xcom_pull returns `default` when no XCom is found and
map_indexes is NOTSET."""
+
+ class CustomOperator(BaseOperator):
+ def execute(self, context):
+ print("This is a custom operator")
+
+ task = CustomOperator(task_id="pull_task")
+ runtime_ti = create_runtime_ti(task=task)
+
+ with patch.object(XCom, "get_all", return_value=None) as mock_get_all:
Review Comment:
Missing `autospec=True` on all three `patch.object` calls. Per project
convention, mocks should use `autospec` to catch signature mismatches. Same
applies to the other two tests at lines 2137 and 2186.
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -2058,6 +2058,135 @@ def mock_get_all_side_effect(task_id, **kwargs):
assert mock_get_one.called
assert not mock_get_all.called
+ @pytest.mark.parametrize(
+ ("task_ids", "default", "expected_default"),
+ [
+ pytest.param("task_a", "fallback", "fallback",
id="single_task_str_default"),
+ pytest.param("task_a", NOTSET, NOTSET,
id="single_task_NOTSET_default"),
+ pytest.param(["task_a"], "fallback", ["fallback"],
id="list_task_str_default"),
+ pytest.param(
+ ["task_a", "task_b"],
+ "fallback",
+ ["fallback", "fallback"],
+ id="multiple_tasks_str_default",
+ ),
+ ],
+ )
+ def test_xcom_pull_default_with_notset_map_indexes(
+ self,
+ create_runtime_ti,
+ mock_supervisor_comms,
+ task_ids,
+ default,
+ expected_default,
+ ):
+ """Test that xcom_pull returns `default` when no XCom is found and
map_indexes is NOTSET."""
+
+ class CustomOperator(BaseOperator):
Review Comment:
The `CustomOperator` inner class is identical across all three tests and its
`execute` is never called. The existing `test_xcom_pull_with_no_map_index` test
(line 2080 pre-PR) uses `BaseOperator(task_id=...)` directly -- consider doing
the same for consistency and less noise.
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -2058,6 +2058,135 @@ def mock_get_all_side_effect(task_id, **kwargs):
assert mock_get_one.called
assert not mock_get_all.called
+ @pytest.mark.parametrize(
+ ("task_ids", "default", "expected_default"),
+ [
+ pytest.param("task_a", "fallback", "fallback",
id="single_task_str_default"),
+ pytest.param("task_a", NOTSET, NOTSET,
id="single_task_NOTSET_default"),
+ pytest.param(["task_a"], "fallback", ["fallback"],
id="list_task_str_default"),
+ pytest.param(
+ ["task_a", "task_b"],
+ "fallback",
+ ["fallback", "fallback"],
+ id="multiple_tasks_str_default",
+ ),
+ ],
+ )
+ def test_xcom_pull_default_with_notset_map_indexes(
+ self,
+ create_runtime_ti,
+ mock_supervisor_comms,
+ task_ids,
+ default,
+ expected_default,
+ ):
+ """Test that xcom_pull returns `default` when no XCom is found and
map_indexes is NOTSET."""
+
+ class CustomOperator(BaseOperator):
+ def execute(self, context):
+ print("This is a custom operator")
+
+ task = CustomOperator(task_id="pull_task")
+ runtime_ti = create_runtime_ti(task=task)
+
+ with patch.object(XCom, "get_all", return_value=None) as mock_get_all:
+ result = runtime_ti.xcom_pull(key="key", task_ids=task_ids,
default=default)
+ assert result == expected_default
+ assert mock_get_all.called
+
+ @pytest.mark.parametrize(
+ ("task_ids", "default", "expected_result"),
+ [
+ pytest.param(
+ "task_a",
+ "fallback",
+ [],
+ id="single_task_empty_list_returns_empty",
+ ),
+ pytest.param(
+ ["task_a"],
+ "fallback",
+ [],
+ id="list_single_task_empty_list_returns_empty",
+ ),
+ pytest.param(
+ ["task_a", "task_b"],
+ "fallback",
+ [],
+ id="multiple_tasks_empty_list_returns_empty",
+ ),
+ ],
+ )
+ def test_xcom_pull_with_get_all_returning_empty_list(
Review Comment:
`XCom.get_all()` returns `None` when no values are found (see
`bases/xcom.py` lines 320-321: `if not msg.root: return None`). It never
returns `[]`. This test mocks an impossible return value and ends up just
exercising Python's `list.extend([])`. The `default` parameter in the
parametrize is unused in the assertions -- the result is always `[]`
regardless. Consider dropping this test or adding a comment explaining it
guards against custom XCom backend behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]