This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cdbeb481670 Kubernetes Pod Operator - handle pod preemption before
container creation (#68328)
cdbeb481670 is described below
commit cdbeb48167032b6879000572055e22f7733ae7ee
Author: John Horan <[email protected]>
AuthorDate: Wed Jun 10 22:32:54 2026 +0100
Kubernetes Pod Operator - handle pod preemption before container creation
(#68328)
* handle preemption before container creation
* add message
---
.../providers/cncf/kubernetes/utils/pod_manager.py | 7 +++++++
.../unit/cncf/kubernetes/utils/test_pod_manager.py | 22 ++++++++++++++++++++++
2 files changed, 29 insertions(+)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 835d239f209..ec7d339f377 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -174,8 +174,15 @@ async def await_pod_start(
else:
remote_pod = pod_manager.read_pod(pod)
pod_status = remote_pod.status
+
+ if pod_status.phase == PodPhase.FAILED and
pod_status.container_statuses is None:
+ pod_manager.stop_watching_events = True
+ pod_manager.log.info("::endgroup::")
+ raise PodLaunchFailedException("Pod failed before containers
started")
+
if pod_status.phase not in (PodPhase.PENDING, PodPhase.UNKNOWN):
pod_manager.stop_watching_events = True
+ pod_manager.log.info("Pod has reached %s phase before launch
timeout", pod_status.phase)
pod_manager.log.info("::endgroup::")
break
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
index e2390f90dc3..2f5245217d9 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
@@ -1044,6 +1044,28 @@ class TestPodManager:
)
mock_log_info.assert_any_call("Waiting %ss to get the POD
running...", startup_timeout)
+ @pytest.mark.asyncio
+ async def test_start_pod_preemption_raises_error(self):
+ """After a pod is scheduled on a node, it is possible that it gets
preempted by another pod, such as a daemonset on a new node, it is possible
this happens before
+ any containers are created. In that case airflow needs to recreate
the pod.
+ """
+
+ pod_response = mock.MagicMock()
+ pod_response.status.phase = "Failed"
+ pod_response.status.container_statuses = None
+ pod_response.status.message = "Pod was rejected: Node didn't have
enough resource: memory, requested: 547356672, used: 14813233152, capacity:
15334334464"
+ pod_response.status.reason = "OutOfmemory"
+
+ self.mock_kube_client.read_namespaced_pod.return_value = pod_response
+ expected_msg = "Pod failed before containers started"
+ mock_pod = MagicMock()
+ with pytest.raises(AirflowException, match=expected_msg):
+ await self.pod_manager.await_pod_start(
+ pod=mock_pod,
+ schedule_timeout=60,
+ startup_timeout=60,
+ )
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
def test_container_is_running(self, container_is_running_mock):
mock_pod = MagicMock()