This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new ee47767  [FLINK-24380][k8s] Terminate the pod if it failed
ee47767 is described below

commit ee4776799ddfca0a7554fbc2d78d640a16eb6ab4
Author: Yangze Guo <karma...@gmail.com>
AuthorDate: Mon Sep 27 15:51:04 2021 +0800

    [FLINK-24380][k8s] Terminate the pod if it failed
    
    This closes #17370.
---
 .../kubeclient/resources/KubernetesPod.java        | 31 ++++++++++++++-
 .../kubeclient/resources/KubernetesPodTest.java    | 45 ++++++++++++++++++++++
 2 files changed, 74 insertions(+), 2 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPod.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPod.java
index 8952865..85eb535 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPod.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPod.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.kubernetes.kubeclient.resources;
 
+import org.apache.flink.annotation.VisibleForTesting;
+
 import io.fabric8.kubernetes.api.model.ContainerStateTerminated;
 import io.fabric8.kubernetes.api.model.Pod;
 
@@ -37,8 +39,15 @@ public class KubernetesPod extends KubernetesResource<Pod> {
 
     public boolean isTerminated() {
         if (getInternalResource().getStatus() != null) {
-            return 
getInternalResource().getStatus().getContainerStatuses().stream()
-                    .anyMatch(e -> e.getState() != null && 
e.getState().getTerminated() != null);
+            final boolean podFailed =
+                    
PodPhase.Failed.name().equals(getInternalResource().getStatus().getPhase());
+            final boolean containersFailed =
+                    
getInternalResource().getStatus().getContainerStatuses().stream()
+                            .anyMatch(
+                                    e ->
+                                            e.getState() != null
+                                                    && 
e.getState().getTerminated() != null);
+            return containersFailed || podFailed;
         }
         return false;
     }
@@ -79,6 +88,24 @@ public class KubernetesPod extends KubernetesResource<Pod> {
                             .collect(Collectors.joining(",")));
         }
         sb.append("]");
+        if 
(PodPhase.Failed.name().equals(getInternalResource().getStatus().getPhase())) {
+            sb.append(
+                    String.format(
+                            ", pod status: %s(reason=%s, message=%s)",
+                            getInternalResource().getStatus().getPhase(),
+                            getInternalResource().getStatus().getReason(),
+                            getInternalResource().getStatus().getMessage()));
+        }
         return sb.toString();
     }
+
+    /** The phase of a Pod, high-level summary of where the Pod is in its 
lifecycle. */
+    @VisibleForTesting
+    enum PodPhase {
+        Pending,
+        Running,
+        Succeeded,
+        Failed,
+        Unknown
+    }
 }
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodTest.java
new file mode 100644
index 0000000..0c4bdd2
--- /dev/null
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.util.TestLogger;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+/** Tests for {@link KubernetesPod}. */
+public class KubernetesPodTest extends TestLogger {
+
+    @Test
+    public void testIsTerminatedShouldReturnTrueWhenPodFailed() {
+        final Pod pod = new PodBuilder().build();
+        pod.setStatus(
+                new PodStatusBuilder()
+                        .withPhase(KubernetesPod.PodPhase.Failed.name())
+                        .withMessage("Pod Node didn't have enough resource")
+                        .withReason("OutOfMemory")
+                        .build());
+        assertThat(new KubernetesPod(pod).isTerminated(), is(true));
+    }
+}

Reply via email to