This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e1c9eb02f6 fix: Transient error state caused by rate limits from the 
container registry in `KubernetesPodOperator` (#62215)
2e1c9eb02f6 is described below

commit 2e1c9eb02f66538213c6257081492b109b22b114
Author: John Horan <[email protected]>
AuthorDate: Sat Feb 21 19:39:01 2026 +0000

    fix: Transient error state caused by rate limits from the container 
registry in `KubernetesPodOperator` (#62215)
    
    * don't fail on transient errors
    
    * remove indent
    
    * single string
    
    * ruff format
---
 .../providers/cncf/kubernetes/utils/pod_manager.py | 49 +++++++++++++++++-----
 .../unit/cncf/kubernetes/utils/test_pod_manager.py |  8 +++-
 2 files changed, 45 insertions(+), 12 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 01f8f4a3fa2..413a1fa4748 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -188,20 +188,49 @@ def detect_pod_terminate_early_issues(pod: V1Pod) -> str 
| None:
     """
     Identify issues that justify terminating the pod early.
 
+    This method distinguishes between permanent failures (e.g., invalid image 
names)
+    and transient errors (e.g., rate limits) that should be retried by 
Kubernetes.
+
     :param pod: The pod object to check.
     :return: An error message if an issue is detected; otherwise, None.
     """
+    # Indicators in error messages that suggest transient issues
+    TRANSIENT_ERROR_PATTERNS = [
+        "pull qps exceeded",
+        "rate limit",
+        "too many requests",
+        "quota exceeded",
+        "temporarily unavailable",
+        "timeout",
+        "account limit",
+    ]
+
+    FATAL_STATES = ["InvalidImageName", "ErrImageNeverPull"]
+    TRANSIENT_STATES = ["ErrImagePull", "ImagePullBackOff"]
+    ERROR_MESSAGE = "Image cannot be pulled, unable to start: 
{reason}\n{message}"
+
     pod_status = pod.status
-    if pod_status.container_statuses:
-        for container_status in pod_status.container_statuses:
-            container_state: V1ContainerState = container_status.state
-            container_waiting: V1ContainerStateWaiting | None = 
container_state.waiting
-            if container_waiting:
-                if container_waiting.reason in ["ErrImagePull", 
"ImagePullBackOff", "InvalidImageName"]:
-                    return (
-                        f"Pod docker image cannot be pulled, unable to start: 
{container_waiting.reason}"
-                        f"\n{container_waiting.message}"
-                    )
+    if not pod_status.container_statuses:
+        return None
+
+    for container_status in pod_status.container_statuses:
+        container_state: V1ContainerState = container_status.state
+        container_waiting: V1ContainerStateWaiting | None = 
container_state.waiting
+        if not container_waiting:
+            continue
+
+        if container_waiting.reason in FATAL_STATES:
+            return ERROR_MESSAGE.format(
+                reason=container_waiting.reason, 
message=container_waiting.message or ""
+            )
+
+        if container_waiting.reason in TRANSIENT_STATES:
+            message_lower = (container_waiting.message or "").lower()
+            is_transient = any(pattern in message_lower for pattern in 
TRANSIENT_ERROR_PATTERNS)
+            if not is_transient:
+                return ERROR_MESSAGE.format(
+                    reason=container_waiting.reason, 
message=container_waiting.message or ""
+                )
     return None
 
 
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
index bfd3d42e12b..544109a599e 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
@@ -715,7 +715,9 @@ class TestPodManager:
         pod_response.status.container_statuses = [container_statuse]
 
         self.mock_kube_client.read_namespaced_pod.return_value = pod_response
-        expected_msg = f"Pod docker image cannot be pulled, unable to start: 
{waiting_state.reason}\n{waiting_state.message}"
+        expected_msg = (
+            f"Image cannot be pulled, unable to start: 
{waiting_state.reason}\n{waiting_state.message}"
+        )
         mock_pod = MagicMock()
         with pytest.raises(AirflowException, match=expected_msg):
             await self.pod_manager.await_pod_start(
@@ -1262,7 +1264,9 @@ class TestAsyncPodManager:
         container_status.state.waiting = waiting_state
         pod_response.status.container_statuses = [container_status]
         self.mock_async_hook.get_pod.return_value = pod_response
-        expected_msg = f"Pod docker image cannot be pulled, unable to start: 
{waiting_state.reason}\n{waiting_state.message}"
+        expected_msg = (
+            f"Image cannot be pulled, unable to start: 
{waiting_state.reason}\n{waiting_state.message}"
+        )
         mock_pod = mock.MagicMock()
         with pytest.raises(AirflowException, match=expected_msg):
             await self.async_pod_manager.await_pod_start(

Reply via email to