This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d4ab23ea0c Kubernetes Pod Operator: Skip async defferal when pod 
already complete (#58684)
8d4ab23ea0c is described below

commit 8d4ab23ea0c42232beacfeb1d83a17b831fd1995
Author: John Horan <[email protected]>
AuthorDate: Wed Jan 14 21:10:03 2026 +0000

    Kubernetes Pod Operator: Skip async defferal when pod already complete 
(#58684)
    
    * skip defferal when pod already complete
    
    * fix
    
    * mypy
    
    * fix test
    
    * make gcp signature match
    
    * add test
    
    * merge
    
    * format
    
    * swap condition
    
    * fix mypy
---
 .../providers/cncf/kubernetes/operators/pod.py     | 70 ++++++++++++++--------
 .../providers/cncf/kubernetes/triggers/pod.py      |  6 +-
 .../unit/cncf/kubernetes/operators/test_pod.py     | 52 ++++++++++++++++
 .../google/cloud/operators/kubernetes_engine.py    |  4 +-
 4 files changed, 102 insertions(+), 30 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index 4e36e0eefdc..865f8a13f15 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -63,7 +63,7 @@ from 
airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
     generic_api_retry,
 )
 from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
-from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
+from airflow.providers.cncf.kubernetes.triggers.pod import ContainerState, 
KubernetesPodTrigger
 from airflow.providers.cncf.kubernetes.utils import xcom_sidecar
 from airflow.providers.cncf.kubernetes.utils.container import (
     container_is_succeeded,
@@ -852,7 +852,7 @@ class KubernetesPodOperator(BaseOperator):
         ti.xcom_push(key="pod_name", value=self.pod.metadata.name)
         ti.xcom_push(key="pod_namespace", value=self.pod.metadata.namespace)
 
-        self.invoke_defer_method()
+        self.invoke_defer_method(context=context)
 
     def convert_config_file_to_dict(self):
         """Convert passed config_file to dict representation."""
@@ -863,7 +863,9 @@ class KubernetesPodOperator(BaseOperator):
         else:
             self._config_dict = None
 
-    def invoke_defer_method(self, last_log_time: DateTime | None = None) -> 
None:
+    def invoke_defer_method(
+        self, last_log_time: DateTime | None = None, context: Context | None = 
None
+    ) -> None:
         """Redefine triggers which are being used in child classes."""
         self.convert_config_file_to_dict()
 
@@ -882,29 +884,47 @@ class KubernetesPodOperator(BaseOperator):
                 self.log.info("Successfully resolved connection extras for 
deferral.")
 
         trigger_start_time = datetime.datetime.now(tz=datetime.timezone.utc)
-        self.defer(
-            trigger=KubernetesPodTrigger(
-                pod_name=self.pod.metadata.name,  # type: ignore[union-attr]
-                pod_namespace=self.pod.metadata.namespace,  # type: 
ignore[union-attr]
-                trigger_start_time=trigger_start_time,
-                kubernetes_conn_id=self.kubernetes_conn_id,
-                connection_extras=connection_extras,
-                cluster_context=self.cluster_context,
-                config_dict=self._config_dict,
-                in_cluster=self.in_cluster,
-                poll_interval=self.poll_interval,
-                get_logs=self.get_logs,
-                startup_timeout=self.startup_timeout_seconds,
-                startup_check_interval=self.startup_check_interval_seconds,
-                schedule_timeout=self.schedule_timeout_seconds,
-                base_container_name=self.base_container_name,
-                on_finish_action=self.on_finish_action.value,
-                last_log_time=last_log_time,
-                logging_interval=self.logging_interval,
-                trigger_kwargs=self.trigger_kwargs,
-            ),
-            method_name="trigger_reentry",
+
+        trigger = KubernetesPodTrigger(
+            pod_name=self.pod.metadata.name,  # type: ignore[union-attr]
+            pod_namespace=self.pod.metadata.namespace,  # type: 
ignore[union-attr]
+            trigger_start_time=trigger_start_time,
+            kubernetes_conn_id=self.kubernetes_conn_id,
+            connection_extras=connection_extras,
+            cluster_context=self.cluster_context,
+            config_dict=self._config_dict,
+            in_cluster=self.in_cluster,
+            poll_interval=self.poll_interval,
+            get_logs=self.get_logs,
+            startup_timeout=self.startup_timeout_seconds,
+            startup_check_interval=self.startup_check_interval_seconds,
+            schedule_timeout=self.schedule_timeout_seconds,
+            base_container_name=self.base_container_name,
+            on_finish_action=self.on_finish_action.value,
+            last_log_time=last_log_time,
+            logging_interval=self.logging_interval,
+            trigger_kwargs=self.trigger_kwargs,
         )
+        container_state = trigger.define_container_state(self.pod) if self.pod 
else None
+        if context and (
+            container_state == ContainerState.TERMINATED or container_state == 
ContainerState.FAILED
+        ):
+            self.log.info("Skipping deferral as pod is already in a terminal 
state")
+            self.trigger_reentry(
+                context=context,
+                event={
+                    "status": "failed" if container_state == 
ContainerState.FAILED else "success",
+                    "namespace": trigger.pod_namespace,
+                    "name": trigger.pod_name,
+                    "message": "Container failed"
+                    if container_state == ContainerState.FAILED
+                    else "Container succeeded",
+                    "last_log_time": last_log_time,
+                    **(self.trigger_kwargs or {}),
+                },
+            )
+        else:
+            self.defer(trigger=trigger, method_name="trigger_reentry")
 
     def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
         """
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
index d5c8c59520c..6ad7b79b6f4 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -335,12 +335,10 @@ class KubernetesPodTrigger(BaseTrigger):
         return AsyncPodManager(async_hook=self.hook)
 
     def define_container_state(self, pod: V1Pod) -> ContainerState:
-        pod_containers = pod.status.container_statuses
-
-        if pod_containers is None:
+        if pod.status is None or pod.status.container_statuses is None:
             return ContainerState.UNDEFINED
 
-        container = next(c for c in pod_containers if c.name == 
self.base_container_name)
+        container = next(c for c in pod.status.container_statuses if c.name == 
self.base_container_name)
 
         for state in (ContainerState.RUNNING, ContainerState.WAITING, 
ContainerState.TERMINATED):
             state_obj = getattr(container.state, state)
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index b754654a556..a9f76dfa5b4 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -70,6 +70,7 @@ POD_MANAGER_CLASS = 
"airflow.providers.cncf.kubernetes.utils.pod_manager.PodMana
 POD_MANAGER_MODULE = "airflow.providers.cncf.kubernetes.utils.pod_manager"
 HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesHook"
 KUB_OP_PATH = 
"airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.{}"
+TRIGGER_CLASS = 
"airflow.providers.cncf.kubernetes.triggers.pod.KubernetesPodTrigger"
 
 TEST_TASK_ID = "kubernetes_task_async"
 TEST_NAMESPACE = "default"
@@ -2436,8 +2437,10 @@ class TestKubernetesPodOperatorAsync:
     @patch(KUB_OP_PATH.format("build_pod_request_obj"))
     @patch(KUB_OP_PATH.format("get_or_create_pod"))
     
@patch("airflow.providers.cncf.kubernetes.operators.pod.BaseHook.get_connection")
+    @patch(f"{TRIGGER_CLASS}.define_container_state")
     def test_async_create_pod_should_execute_successfully(
         self,
+        mocked_container_state,
         mocked_get_connection,
         mocked_pod,
         mocked_pod_obj,
@@ -2876,6 +2879,55 @@ class TestKubernetesPodOperatorAsync:
             "context": context,
         }
 
+    @patch(KUB_OP_PATH.format("client"))
+    @patch(KUB_OP_PATH.format("find_pod"))
+    @patch(KUB_OP_PATH.format("build_pod_request_obj"))
+    @patch(KUB_OP_PATH.format("get_or_create_pod"))
+    @patch(KUB_OP_PATH.format("trigger_reentry"))
+    def test_skip_deferral_on_terminated_pod(
+        self,
+        mocked_trigger_reentry,
+        mocked_get_or_create_pod,
+        mocked_build_pod_request_obj,
+        mocked_find_pod,
+        mocked_client,
+        mocker,
+    ):
+        k = KubernetesPodOperator(
+            task_id=TEST_TASK_ID,
+            namespace=TEST_NAMESPACE,
+            image=TEST_IMAGE,
+            cmds=TEST_CMDS,
+            arguments=TEST_ARGS,
+            labels=TEST_LABELS,
+            name=TEST_NAME,
+            in_cluster=True,
+            get_logs=True,
+            deferrable=True,
+        )
+        mock_file = mock_open(read_data='{"a": "b"}')
+        mocker.patch("builtins.open", mock_file)
+
+        mocked_find_pod.return_value.metadata.name = TEST_NAME
+        mocked_find_pod.return_value.metadata.namespace = TEST_NAMESPACE
+        mocked_get_or_create_pod.return_value.status.container_statuses = [
+            k8s.V1ContainerStatus(
+                name=k.base_container_name,
+                
state=k8s.V1ContainerState(terminated=k8s.V1ContainerStateTerminated(exit_code=0)),
+                image="alpine",
+                image_id="",
+                ready=False,
+                restart_count=0,
+            )
+        ]
+
+        context = create_context(k)
+        ti_mock = MagicMock(**{"map_index": -1})
+        context["ti"] = ti_mock
+
+        k.execute(context)
+        mocked_trigger_reentry.assert_called_once()
+
 
 @pytest.mark.parametrize("do_xcom_push", [True, False])
 @patch(KUB_OP_PATH.format("extract_xcom"))
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
 
b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
index e04ecd3b65c..f836561a0c9 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -660,7 +660,9 @@ class GKEStartPodOperator(GKEOperatorMixin, 
KubernetesPodOperator):
         if self.config_file:
             raise AirflowException("config_file is not an allowed parameter 
for the GKEStartPodOperator.")
 
-    def invoke_defer_method(self, last_log_time: DateTime | None = None):
+    def invoke_defer_method(
+        self, last_log_time: DateTime | None = None, context: Context | None = 
None
+    ) -> None:
         """Redefine triggers which are being used in child classes."""
         trigger_start_time = timezone.utcnow()
         on_finish_action = self.on_finish_action

Reply via email to