This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8d4ab23ea0c Kubernetes Pod Operator: Skip async defferal when pod
already complete (#58684)
8d4ab23ea0c is described below
commit 8d4ab23ea0c42232beacfeb1d83a17b831fd1995
Author: John Horan <[email protected]>
AuthorDate: Wed Jan 14 21:10:03 2026 +0000
Kubernetes Pod Operator: Skip async defferal when pod already complete
(#58684)
* skip defferal when pod already complete
* fix
* mypy
* fix test
* make gcp signature match
* add test
* merge
* format
* swap condition
* fix mypy
---
.../providers/cncf/kubernetes/operators/pod.py | 70 ++++++++++++++--------
.../providers/cncf/kubernetes/triggers/pod.py | 6 +-
.../unit/cncf/kubernetes/operators/test_pod.py | 52 ++++++++++++++++
.../google/cloud/operators/kubernetes_engine.py | 4 +-
4 files changed, 102 insertions(+), 30 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index 4e36e0eefdc..865f8a13f15 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -63,7 +63,7 @@ from
airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
generic_api_retry,
)
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
-from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
+from airflow.providers.cncf.kubernetes.triggers.pod import ContainerState,
KubernetesPodTrigger
from airflow.providers.cncf.kubernetes.utils import xcom_sidecar
from airflow.providers.cncf.kubernetes.utils.container import (
container_is_succeeded,
@@ -852,7 +852,7 @@ class KubernetesPodOperator(BaseOperator):
ti.xcom_push(key="pod_name", value=self.pod.metadata.name)
ti.xcom_push(key="pod_namespace", value=self.pod.metadata.namespace)
- self.invoke_defer_method()
+ self.invoke_defer_method(context=context)
def convert_config_file_to_dict(self):
"""Convert passed config_file to dict representation."""
@@ -863,7 +863,9 @@ class KubernetesPodOperator(BaseOperator):
else:
self._config_dict = None
- def invoke_defer_method(self, last_log_time: DateTime | None = None) ->
None:
+ def invoke_defer_method(
+ self, last_log_time: DateTime | None = None, context: Context | None =
None
+ ) -> None:
"""Redefine triggers which are being used in child classes."""
self.convert_config_file_to_dict()
@@ -882,29 +884,47 @@ class KubernetesPodOperator(BaseOperator):
self.log.info("Successfully resolved connection extras for
deferral.")
trigger_start_time = datetime.datetime.now(tz=datetime.timezone.utc)
- self.defer(
- trigger=KubernetesPodTrigger(
- pod_name=self.pod.metadata.name, # type: ignore[union-attr]
- pod_namespace=self.pod.metadata.namespace, # type:
ignore[union-attr]
- trigger_start_time=trigger_start_time,
- kubernetes_conn_id=self.kubernetes_conn_id,
- connection_extras=connection_extras,
- cluster_context=self.cluster_context,
- config_dict=self._config_dict,
- in_cluster=self.in_cluster,
- poll_interval=self.poll_interval,
- get_logs=self.get_logs,
- startup_timeout=self.startup_timeout_seconds,
- startup_check_interval=self.startup_check_interval_seconds,
- schedule_timeout=self.schedule_timeout_seconds,
- base_container_name=self.base_container_name,
- on_finish_action=self.on_finish_action.value,
- last_log_time=last_log_time,
- logging_interval=self.logging_interval,
- trigger_kwargs=self.trigger_kwargs,
- ),
- method_name="trigger_reentry",
+
+ trigger = KubernetesPodTrigger(
+ pod_name=self.pod.metadata.name, # type: ignore[union-attr]
+ pod_namespace=self.pod.metadata.namespace, # type:
ignore[union-attr]
+ trigger_start_time=trigger_start_time,
+ kubernetes_conn_id=self.kubernetes_conn_id,
+ connection_extras=connection_extras,
+ cluster_context=self.cluster_context,
+ config_dict=self._config_dict,
+ in_cluster=self.in_cluster,
+ poll_interval=self.poll_interval,
+ get_logs=self.get_logs,
+ startup_timeout=self.startup_timeout_seconds,
+ startup_check_interval=self.startup_check_interval_seconds,
+ schedule_timeout=self.schedule_timeout_seconds,
+ base_container_name=self.base_container_name,
+ on_finish_action=self.on_finish_action.value,
+ last_log_time=last_log_time,
+ logging_interval=self.logging_interval,
+ trigger_kwargs=self.trigger_kwargs,
)
+ container_state = trigger.define_container_state(self.pod) if self.pod
else None
+ if context and (
+ container_state == ContainerState.TERMINATED or container_state ==
ContainerState.FAILED
+ ):
+ self.log.info("Skipping deferral as pod is already in a terminal
state")
+ self.trigger_reentry(
+ context=context,
+ event={
+ "status": "failed" if container_state ==
ContainerState.FAILED else "success",
+ "namespace": trigger.pod_namespace,
+ "name": trigger.pod_name,
+ "message": "Container failed"
+ if container_state == ContainerState.FAILED
+ else "Container succeeded",
+ "last_log_time": last_log_time,
+ **(self.trigger_kwargs or {}),
+ },
+ )
+ else:
+ self.defer(trigger=trigger, method_name="trigger_reentry")
def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
"""
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
index d5c8c59520c..6ad7b79b6f4 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -335,12 +335,10 @@ class KubernetesPodTrigger(BaseTrigger):
return AsyncPodManager(async_hook=self.hook)
def define_container_state(self, pod: V1Pod) -> ContainerState:
- pod_containers = pod.status.container_statuses
-
- if pod_containers is None:
+ if pod.status is None or pod.status.container_statuses is None:
return ContainerState.UNDEFINED
- container = next(c for c in pod_containers if c.name ==
self.base_container_name)
+ container = next(c for c in pod.status.container_statuses if c.name ==
self.base_container_name)
for state in (ContainerState.RUNNING, ContainerState.WAITING,
ContainerState.TERMINATED):
state_obj = getattr(container.state, state)
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index b754654a556..a9f76dfa5b4 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -70,6 +70,7 @@ POD_MANAGER_CLASS =
"airflow.providers.cncf.kubernetes.utils.pod_manager.PodMana
POD_MANAGER_MODULE = "airflow.providers.cncf.kubernetes.utils.pod_manager"
HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesHook"
KUB_OP_PATH =
"airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.{}"
+TRIGGER_CLASS =
"airflow.providers.cncf.kubernetes.triggers.pod.KubernetesPodTrigger"
TEST_TASK_ID = "kubernetes_task_async"
TEST_NAMESPACE = "default"
@@ -2436,8 +2437,10 @@ class TestKubernetesPodOperatorAsync:
@patch(KUB_OP_PATH.format("build_pod_request_obj"))
@patch(KUB_OP_PATH.format("get_or_create_pod"))
@patch("airflow.providers.cncf.kubernetes.operators.pod.BaseHook.get_connection")
+ @patch(f"{TRIGGER_CLASS}.define_container_state")
def test_async_create_pod_should_execute_successfully(
self,
+ mocked_container_state,
mocked_get_connection,
mocked_pod,
mocked_pod_obj,
@@ -2876,6 +2879,55 @@ class TestKubernetesPodOperatorAsync:
"context": context,
}
+ @patch(KUB_OP_PATH.format("client"))
+ @patch(KUB_OP_PATH.format("find_pod"))
+ @patch(KUB_OP_PATH.format("build_pod_request_obj"))
+ @patch(KUB_OP_PATH.format("get_or_create_pod"))
+ @patch(KUB_OP_PATH.format("trigger_reentry"))
+ def test_skip_deferral_on_terminated_pod(
+ self,
+ mocked_trigger_reentry,
+ mocked_get_or_create_pod,
+ mocked_build_pod_request_obj,
+ mocked_find_pod,
+ mocked_client,
+ mocker,
+ ):
+ k = KubernetesPodOperator(
+ task_id=TEST_TASK_ID,
+ namespace=TEST_NAMESPACE,
+ image=TEST_IMAGE,
+ cmds=TEST_CMDS,
+ arguments=TEST_ARGS,
+ labels=TEST_LABELS,
+ name=TEST_NAME,
+ in_cluster=True,
+ get_logs=True,
+ deferrable=True,
+ )
+ mock_file = mock_open(read_data='{"a": "b"}')
+ mocker.patch("builtins.open", mock_file)
+
+ mocked_find_pod.return_value.metadata.name = TEST_NAME
+ mocked_find_pod.return_value.metadata.namespace = TEST_NAMESPACE
+ mocked_get_or_create_pod.return_value.status.container_statuses = [
+ k8s.V1ContainerStatus(
+ name=k.base_container_name,
+
state=k8s.V1ContainerState(terminated=k8s.V1ContainerStateTerminated(exit_code=0)),
+ image="alpine",
+ image_id="",
+ ready=False,
+ restart_count=0,
+ )
+ ]
+
+ context = create_context(k)
+ ti_mock = MagicMock(**{"map_index": -1})
+ context["ti"] = ti_mock
+
+ k.execute(context)
+ mocked_trigger_reentry.assert_called_once()
+
@pytest.mark.parametrize("do_xcom_push", [True, False])
@patch(KUB_OP_PATH.format("extract_xcom"))
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
index e04ecd3b65c..f836561a0c9 100644
---
a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++
b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -660,7 +660,9 @@ class GKEStartPodOperator(GKEOperatorMixin,
KubernetesPodOperator):
if self.config_file:
raise AirflowException("config_file is not an allowed parameter
for the GKEStartPodOperator.")
- def invoke_defer_method(self, last_log_time: DateTime | None = None):
+ def invoke_defer_method(
+ self, last_log_time: DateTime | None = None, context: Context | None =
None
+ ) -> None:
"""Redefine triggers which are being used in child classes."""
trigger_start_time = timezone.utcnow()
on_finish_action = self.on_finish_action