This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1260d4c5fb4 KubernetesPodOperator push xcom after failed pod (#51475)
1260d4c5fb4 is described below
commit 1260d4c5fb415aa43d025daf26bd4c81918b9294
Author: AutomationDev85 <[email protected]>
AuthorDate: Tue Jun 10 16:22:46 2025 +0200
KubernetesPodOperator push xcom after failed pod (#51475)
Co-authored-by: AutomationDev85 <AutomationDev85>
Co-authored-by: Jens Scheffler <[email protected]>
---
.../airflow/providers/cncf/kubernetes/operators/pod.py | 15 ++++++++++++++-
.../tests/unit/cncf/kubernetes/operators/test_pod.py | 18 ++++++++++++++++++
2 files changed, 32 insertions(+), 1 deletion(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index c24dcd716b0..9c387ec3d32 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -84,6 +84,7 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager
import (
from airflow.settings import pod_mutation_hook
from airflow.utils import yaml
from airflow.utils.helpers import prune_dict, validate_key
+from airflow.utils.xcom import XCOM_RETURN_KEY
from airflow.version import version as airflow_version
if TYPE_CHECKING:
@@ -720,6 +721,8 @@ class KubernetesPodOperator(BaseOperator):
self.cleanup(
pod=pod_to_clean,
remote_pod=self.remote_pod,
+ xcom_result=result,
+ context=context,
)
for callback in self.callbacks:
callback.on_pod_cleanup(
@@ -991,7 +994,13 @@ class KubernetesPodOperator(BaseOperator):
pod=pod, client=self.client, mode=ExecutionMode.SYNC,
operator=self, context=context
)
- def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
+ def cleanup(
+ self,
+ pod: k8s.V1Pod,
+ remote_pod: k8s.V1Pod,
+ xcom_result: dict | None = None,
+ context: Context | None = None,
+ ) -> None:
# Skip cleaning the pod in the following scenarios.
# 1. If a task got marked as failed, "on_kill" method would be called
and the pod will be cleaned up
# there. Cleaning it up again will raise an exception (which might
cause retry).
@@ -1011,6 +1020,10 @@ class KubernetesPodOperator(BaseOperator):
)
if failed:
+ if self.do_xcom_push and xcom_result and context:
+ # Ensure that existing XCom is pushed even in case of failure
+ context["ti"].xcom_push(XCOM_RETURN_KEY, xcom_result)
+
if self.log_events_on_failure:
self._read_pod_events(pod, reraise=False)
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index b6a014b6470..37af22925d1 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -53,6 +53,7 @@ if TYPE_CHECKING:
from airflow.utils.context import Context
from airflow.utils.session import create_session
from airflow.utils.types import DagRunType
+from airflow.utils.xcom import XCOM_RETURN_KEY
from tests_common.test_utils import db
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
@@ -1549,6 +1550,23 @@ class TestKubernetesPodOperator:
pod = k.build_pod_request_obj({})
assert re.match(r"a-very-reasonable-task-name-[a-z0-9-]+",
pod.metadata.name) is not None
+ @patch(f"{POD_MANAGER_CLASS}.extract_xcom")
+ @patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start")
+ @patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
+ def test_xcom_push_failed_pod(self, remote_pod, mock_await,
mock_extract_xcom):
+ """Tests that an xcom is pushed after a pod failed but xcom output
exists."""
+ k = KubernetesPodOperator(task_id="task",
on_finish_action="delete_pod", do_xcom_push=True)
+
+ remote_pod.return_value.status.phase = "Failed"
+ mock_extract_xcom.return_value = '{"Test key": "Test value"}'
+ context = create_context(k)
+ context["ti"].xcom_push = MagicMock()
+
+ with pytest.raises(AirflowException):
+ k.execute(context=context)
+
+ context["ti"].xcom_push.assert_called_with(XCOM_RETURN_KEY, {"Test
key": "Test value"})
+
@pytest.mark.asyncio
@pytest.mark.parametrize(
"kwargs, actual_exit_code, expected_exc",