This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cdbeb481670 Kubernetes Pod Operator - handle pod preemption before 
container creation (#68328)
cdbeb481670 is described below

commit cdbeb48167032b6879000572055e22f7733ae7ee
Author: John Horan <[email protected]>
AuthorDate: Wed Jun 10 22:32:54 2026 +0100

    Kubernetes Pod Operator - handle pod preemption before container creation 
(#68328)
    
    * handle preemption before container creation
    
    * add message
---
 .../providers/cncf/kubernetes/utils/pod_manager.py |  7 +++++++
 .../unit/cncf/kubernetes/utils/test_pod_manager.py | 22 ++++++++++++++++++++++
 2 files changed, 29 insertions(+)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 835d239f209..ec7d339f377 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -174,8 +174,15 @@ async def await_pod_start(
         else:
             remote_pod = pod_manager.read_pod(pod)
         pod_status = remote_pod.status
+
+        if pod_status.phase == PodPhase.FAILED and 
pod_status.container_statuses is None:
+            pod_manager.stop_watching_events = True
+            pod_manager.log.info("::endgroup::")
+            raise PodLaunchFailedException("Pod failed before containers 
started")
+
         if pod_status.phase not in (PodPhase.PENDING, PodPhase.UNKNOWN):
             pod_manager.stop_watching_events = True
+            pod_manager.log.info("Pod has reached %s phase before launch 
timeout", pod_status.phase)
             pod_manager.log.info("::endgroup::")
             break
 
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
index e2390f90dc3..2f5245217d9 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
@@ -1044,6 +1044,28 @@ class TestPodManager:
             )
             mock_log_info.assert_any_call("Waiting %ss to get the POD 
running...", startup_timeout)
 
+    @pytest.mark.asyncio
+    async def test_start_pod_preemption_raises_error(self):
+        """After a pod is scheduled on a node, it is possible that it gets 
preempted by another pod, such as a daemonset on a new node, it is possible 
this happens before
+        any containers are created.  In that case airflow needs to recreate 
the pod.
+        """
+
+        pod_response = mock.MagicMock()
+        pod_response.status.phase = "Failed"
+        pod_response.status.container_statuses = None
+        pod_response.status.message = "Pod was rejected: Node didn't have 
enough resource: memory, requested: 547356672, used: 14813233152, capacity: 
15334334464"
+        pod_response.status.reason = "OutOfmemory"
+
+        self.mock_kube_client.read_namespaced_pod.return_value = pod_response
+        expected_msg = "Pod failed before containers started"
+        mock_pod = MagicMock()
+        with pytest.raises(AirflowException, match=expected_msg):
+            await self.pod_manager.await_pod_start(
+                pod=mock_pod,
+                schedule_timeout=60,
+                startup_timeout=60,
+            )
+
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
     def test_container_is_running(self, container_is_running_mock):
         mock_pod = MagicMock()

Reply via email to