This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 34d6f4c33f8 fix/issue-66592: Respecting unwrap_single for 
non-deferrable execution (#66596)
34d6f4c33f8 is described below

commit 34d6f4c33f83bfe0ec13344baf472cbb391c91dc
Author: Jake McGrath <[email protected]>
AuthorDate: Sat May 9 12:40:14 2026 -0400

    fix/issue-66592: Respecting unwrap_single for non-deferrable execution 
(#66596)
---
 .../providers/cncf/kubernetes/operators/job.py     |  2 +-
 .../unit/cncf/kubernetes/operators/test_job.py     | 42 ++++++++++++++++++++++
 2 files changed, 43 insertions(+), 1 deletion(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
index 49eef06c33c..510346d34ad 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
@@ -255,7 +255,7 @@ class KubernetesJobOperator(KubernetesPodOperator):
                     f"Kubernetes job '{self.job.metadata.name}' is failed with 
error '{error_message}'"
                 )
             if self.do_xcom_push:
-                return xcom_result
+                return xcom_result[0] if self.unwrap_single and 
len(xcom_result) == 1 else xcom_result
 
     def execute_deferrable(self):
         self.defer(
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
index 4e1bd0c0eb0..6bde7c4772d 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
@@ -1119,6 +1119,48 @@ class TestKubernetesJobOperator:
         assert result == return_value
         assert mock_client.return_value.list_namespaced_pod.call_count == 
successful_try + 1
 
+    @pytest.mark.non_db_test_override
+    @pytest.mark.parametrize(
+        ("unwrap_single", "parallelism", "expected"),
+        [
+            (True, 1, "xcom-result"),
+            (True, 2, ["xcom-result", "xcom-result"]),
+            (False, 1, ["xcom-result"]),
+        ],
+    )
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.extract_xcom"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+    
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"),
 mock.MagicMock())
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"), 
mock.MagicMock())
+    @patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start", 
mock.MagicMock())
+    @patch(f"{POD_MANAGER_CLASS}.await_container_completion", mock.MagicMock())
+    @patch(HOOK_CLASS)
+    def test_execute_xcom_respects_unwrap_single(
+        self,
+        mock_hook,
+        mock_get_pods,
+        mock_extract_xcom,
+        unwrap_single,
+        parallelism,
+        expected,
+    ):
+        mock_hook.return_value.is_job_failed.return_value = None
+        mock_extract_xcom.return_value = "xcom-result"
+        mock_get_pods.return_value = [mock.MagicMock() for _ in 
range(parallelism)]
+
+        op = KubernetesJobOperator(
+            task_id="test_task_id",
+            wait_until_job_complete=True,
+            do_xcom_push=True,
+            get_logs=False,
+            parallelism=parallelism,
+            unwrap_single=unwrap_single,
+        )
+
+        result = op.execute(context=dict(ti=mock.MagicMock()))
+
+        assert result == expected
+
     @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
     
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
     @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.hook"), 
new_callable=mock.PropertyMock)

Reply via email to