This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1260d4c5fb4 KubernetesPodOperator push xcom after failed pod (#51475)
1260d4c5fb4 is described below

commit 1260d4c5fb415aa43d025daf26bd4c81918b9294
Author: AutomationDev85 <[email protected]>
AuthorDate: Tue Jun 10 16:22:46 2025 +0200

    KubernetesPodOperator push xcom after failed pod (#51475)
    
    Co-authored-by: AutomationDev85 <AutomationDev85>
    Co-authored-by: Jens Scheffler <[email protected]>
---
 .../airflow/providers/cncf/kubernetes/operators/pod.py | 15 ++++++++++++++-
 .../tests/unit/cncf/kubernetes/operators/test_pod.py   | 18 ++++++++++++++++++
 2 files changed, 32 insertions(+), 1 deletion(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index c24dcd716b0..9c387ec3d32 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -84,6 +84,7 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager 
import (
 from airflow.settings import pod_mutation_hook
 from airflow.utils import yaml
 from airflow.utils.helpers import prune_dict, validate_key
+from airflow.utils.xcom import XCOM_RETURN_KEY
 from airflow.version import version as airflow_version
 
 if TYPE_CHECKING:
@@ -720,6 +721,8 @@ class KubernetesPodOperator(BaseOperator):
             self.cleanup(
                 pod=pod_to_clean,
                 remote_pod=self.remote_pod,
+                xcom_result=result,
+                context=context,
             )
             for callback in self.callbacks:
                 callback.on_pod_cleanup(
@@ -991,7 +994,13 @@ class KubernetesPodOperator(BaseOperator):
                 pod=pod, client=self.client, mode=ExecutionMode.SYNC, 
operator=self, context=context
             )
 
-    def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
+    def cleanup(
+        self,
+        pod: k8s.V1Pod,
+        remote_pod: k8s.V1Pod,
+        xcom_result: dict | None = None,
+        context: Context | None = None,
+    ) -> None:
         # Skip cleaning the pod in the following scenarios.
         # 1. If a task got marked as failed, "on_kill" method would be called 
and the pod will be cleaned up
         # there. Cleaning it up again will raise an exception (which might 
cause retry).
@@ -1011,6 +1020,10 @@ class KubernetesPodOperator(BaseOperator):
         )
 
         if failed:
+            if self.do_xcom_push and xcom_result and context:
+                # Ensure that existing XCom is pushed even in case of failure
+                context["ti"].xcom_push(XCOM_RETURN_KEY, xcom_result)
+
             if self.log_events_on_failure:
                 self._read_pod_events(pod, reraise=False)
 
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index b6a014b6470..37af22925d1 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -53,6 +53,7 @@ if TYPE_CHECKING:
     from airflow.utils.context import Context
 from airflow.utils.session import create_session
 from airflow.utils.types import DagRunType
+from airflow.utils.xcom import XCOM_RETURN_KEY
 
 from tests_common.test_utils import db
 from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
@@ -1549,6 +1550,23 @@ class TestKubernetesPodOperator:
         pod = k.build_pod_request_obj({})
         assert re.match(r"a-very-reasonable-task-name-[a-z0-9-]+", 
pod.metadata.name) is not None
 
+    @patch(f"{POD_MANAGER_CLASS}.extract_xcom")
+    @patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start")
+    @patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
+    def test_xcom_push_failed_pod(self, remote_pod, mock_await, 
mock_extract_xcom):
+        """Tests that an xcom is pushed after a pod failed but xcom output 
exists."""
+        k = KubernetesPodOperator(task_id="task", 
on_finish_action="delete_pod", do_xcom_push=True)
+
+        remote_pod.return_value.status.phase = "Failed"
+        mock_extract_xcom.return_value = '{"Test key": "Test value"}'
+        context = create_context(k)
+        context["ti"].xcom_push = MagicMock()
+
+        with pytest.raises(AirflowException):
+            k.execute(context=context)
+
+        context["ti"].xcom_push.assert_called_with(XCOM_RETURN_KEY, {"Test 
key": "Test value"})
+
     @pytest.mark.asyncio
     @pytest.mark.parametrize(
         "kwargs, actual_exit_code, expected_exc",

Reply via email to