This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b7ffaa  [FLINK-26473] Check for deployment errors when listJobs fails
4b7ffaa is described below

commit 4b7ffaa47082a5baaa4150d6315a9307087e64c6
Author: Thomas Weise <t...@apache.org>
AuthorDate: Sun Mar 13 21:41:11 2022 -0700

    [FLINK-26473] Check for deployment errors when listJobs fails
---
 .../exception/DeploymentFailedException.java       | 31 +++++++--
 .../kubernetes/operator/observer/BaseObserver.java | 37 ++++++++--
 .../kubernetes/operator/observer/JobObserver.java  | 15 +++-
 .../kubernetes/operator/service/FlinkService.java  | 12 ++++
 .../kubernetes/operator/utils/FlinkUtils.java      | 16 +++--
 .../flink/kubernetes/operator/TestUtils.java       | 37 +++++++---
 .../kubernetes/operator/TestingFlinkService.java   | 12 ++++
 .../controller/FlinkDeploymentControllerTest.java  | 79 ++++++++++++++++++++++
 8 files changed, 210 insertions(+), 29 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
index b0ea7f3..572f065 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.exception;
 
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 
+import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
 import io.fabric8.kubernetes.api.model.Event;
 import io.fabric8.kubernetes.api.model.EventBuilder;
 import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
@@ -26,15 +27,33 @@ import 
io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
 /** Exception to signal terminal deployment failure. */
 public class DeploymentFailedException extends RuntimeException {
     public static final String COMPONENT_JOBMANAGER = "JobManagerDeployment";
+    public static final String REASON_CRASH_LOOP_BACKOFF = "CrashLoopBackOff";
+
     private static final long serialVersionUID = -1070179896083579221L;
 
     public final String component;
-    public final DeploymentCondition deployCondition;
+    public final String type;
+    public final String reason;
+    public final String lastTransitionTime;
+    public final String lastUpdateTime;
 
     public DeploymentFailedException(String component, DeploymentCondition 
deployCondition) {
         super(deployCondition.getMessage());
         this.component = component;
-        this.deployCondition = deployCondition;
+        this.type = deployCondition.getType();
+        this.reason = deployCondition.getReason();
+        this.lastTransitionTime = deployCondition.getLastTransitionTime();
+        this.lastUpdateTime = deployCondition.getLastUpdateTime();
+    }
+
+    public DeploymentFailedException(
+            String component, String type, ContainerStateWaiting stateWaiting) 
{
+        super(stateWaiting.getMessage());
+        this.component = component;
+        this.type = type;
+        this.reason = stateWaiting.getReason();
+        this.lastTransitionTime = null;
+        this.lastUpdateTime = null;
     }
 
     public static Event asEvent(DeploymentFailedException dfe, FlinkDeployment 
flinkApp) {
@@ -47,10 +66,10 @@ public class DeploymentFailedException extends 
RuntimeException {
                         .withNamespace(flinkApp.getMetadata().getNamespace())
                         .withUid(flinkApp.getMetadata().getUid())
                         .endInvolvedObject()
-                        .withType(dfe.deployCondition.getType())
-                        .withReason(dfe.deployCondition.getReason())
-                        
.withFirstTimestamp(dfe.deployCondition.getLastTransitionTime())
-                        
.withLastTimestamp(dfe.deployCondition.getLastUpdateTime())
+                        .withType(dfe.type)
+                        .withReason(dfe.reason)
+                        .withFirstTimestamp(dfe.lastTransitionTime)
+                        .withLastTimestamp(dfe.lastUpdateTime)
                         .withMessage(dfe.getMessage())
                         .withNewMetadata()
                         .withGenerateName(flinkApp.getMetadata().getName())
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
index 180cb6f..a694f6d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
@@ -25,6 +25,10 @@ import 
org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 
+import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
+import io.fabric8.kubernetes.api.model.ContainerStatus;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
 import io.fabric8.kubernetes.api.model.apps.Deployment;
 import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
 import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
@@ -42,6 +46,7 @@ public abstract class BaseObserver implements Observer {
     protected final Logger logger = LoggerFactory.getLogger(this.getClass());
 
     public static final String JOB_STATE_UNKNOWN = "UNKNOWN";
+
     protected final FlinkService flinkService;
     protected final FlinkOperatorConfiguration operatorConfiguration;
 
@@ -57,10 +62,6 @@ public abstract class BaseObserver implements Observer {
         JobManagerDeploymentStatus previousJmStatus =
                 deploymentStatus.getJobManagerDeploymentStatus();
 
-        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
-            return;
-        }
-
         if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) 
{
             
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
             return;
@@ -86,7 +87,7 @@ public abstract class BaseObserver implements Observer {
                 return;
             }
             logger.info(
-                    "JobManager deployment {} in namespace {} exists but not 
ready yet, status {}",
+                    "JobManager deployment {} in namespace {} exists but not 
ready, status {}",
                     flinkApp.getMetadata().getName(),
                     flinkApp.getMetadata().getNamespace(),
                     status);
@@ -104,6 +105,18 @@ public abstract class BaseObserver implements Observer {
                     return;
                 }
             }
+
+            // checking the pod is expensive; only do it when the deployment 
isn't ready
+            try {
+                checkCrashLoopBackoff(flinkApp, effectiveConfig);
+            } catch (DeploymentFailedException dfe) {
+                if (!JobManagerDeploymentStatus.ERROR.equals(
+                        deploymentStatus.getJobManagerDeploymentStatus())) {
+                    throw dfe;
+                }
+                return;
+            }
+
             
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
             return;
         }
@@ -119,6 +132,20 @@ public abstract class BaseObserver implements Observer {
                         : null);
     }
 
+    private void checkCrashLoopBackoff(FlinkDeployment flinkApp, Configuration 
effectiveConfig) {
+        PodList jmPods = flinkService.getJmPodList(flinkApp, effectiveConfig);
+        for (Pod pod : jmPods.getItems()) {
+            for (ContainerStatus cs : pod.getStatus().getContainerStatuses()) {
+                ContainerStateWaiting csw = cs.getState().getWaiting();
+                if 
(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF.equals(csw.getReason())) {
+                    logger.warn("JobManager pod fails: {} {}", 
csw.getReason(), csw.getMessage());
+                    throw new DeploymentFailedException(
+                            DeploymentFailedException.COMPONENT_JOBMANAGER, 
"Warning", csw);
+                }
+            }
+        }
+    }
+
     protected boolean isClusterReady(FlinkDeployment dep) {
         return dep.getStatus().getJobManagerDeploymentStatus() == 
JobManagerDeploymentStatus.READY;
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
index ea5a58f..026ad01 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
@@ -35,6 +35,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeoutException;
 
 /** The observer of {@link 
org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */
 public class JobObserver extends BaseObserver {
@@ -46,16 +47,20 @@ public class JobObserver extends BaseObserver {
 
     @Override
     public void observe(FlinkDeployment flinkApp, Context context, 
Configuration effectiveConfig) {
-        observeJmDeployment(flinkApp, context, effectiveConfig);
+        if (JobManagerDeploymentStatus.READY
+                != flinkApp.getStatus().getJobManagerDeploymentStatus()) {
+            observeJmDeployment(flinkApp, context, effectiveConfig);
+        }
         if (isClusterReady(flinkApp)) {
-            boolean jobFound = observeFlinkJobStatus(flinkApp, 
effectiveConfig);
+            boolean jobFound = observeFlinkJobStatus(flinkApp, context, 
effectiveConfig);
             if (jobFound) {
                 observeSavepointStatus(flinkApp, effectiveConfig);
             }
         }
     }
 
-    private boolean observeFlinkJobStatus(FlinkDeployment flinkApp, 
Configuration effectiveConfig) {
+    private boolean observeFlinkJobStatus(
+            FlinkDeployment flinkApp, Context context, Configuration 
effectiveConfig) {
         logger.info("Getting job statuses for {}", 
flinkApp.getMetadata().getName());
         FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
 
@@ -65,6 +70,10 @@ public class JobObserver extends BaseObserver {
         } catch (Exception e) {
             logger.error("Exception while listing jobs", e);
             flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN);
+            if (e instanceof TimeoutException) {
+                // check for problems with the underlying deployment
+                observeJmDeployment(flinkApp, context, effectiveConfig);
+            }
             return false;
         }
         if (clusterJobStatuses.isEmpty()) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index 4dbd1e6..c3faf97 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -52,6 +52,7 @@ import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHea
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
 
+import io.fabric8.kubernetes.api.model.PodList;
 import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import org.apache.commons.lang3.ObjectUtils;
 import org.slf4j.Logger;
@@ -263,4 +264,15 @@ public class FlinkService {
             return SavepointFetchResult.completed(savepoint);
         }
     }
+
+    public PodList getJmPodList(FlinkDeployment deployment, Configuration 
conf) {
+        final String namespace = 
conf.getString(KubernetesConfigOptions.NAMESPACE);
+        final String clusterId;
+        try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
+            clusterId = clusterClient.getClusterId();
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+        return FlinkUtils.getJmPodList(kubernetesClient, namespace, clusterId);
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 38da91f..4a3f5b3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -170,12 +170,7 @@ public class FlinkUtils {
 
         for (int i = 0; i < 60; i++) {
             if (jobManagerRunning) {
-                PodList jmPodList =
-                        kubernetesClient
-                                .pods()
-                                .inNamespace(namespace)
-                                
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId))
-                                .list();
+                PodList jmPodList = getJmPodList(kubernetesClient, namespace, 
clusterId);
 
                 if (jmPodList == null || jmPodList.getItems().isEmpty()) {
                     jobManagerRunning = false;
@@ -217,4 +212,13 @@ public class FlinkUtils {
                 conf.getString(KubernetesConfigOptions.NAMESPACE),
                 conf.getString(KubernetesConfigOptions.CLUSTER_ID));
     }
+
+    public static PodList getJmPodList(
+            KubernetesClient kubernetesClient, String namespace, String 
clusterId) {
+        return kubernetesClient
+                .pods()
+                .inNamespace(namespace)
+                .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId))
+                .list();
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 3bb1251..a1fbb97 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -126,6 +126,18 @@ public class TestUtils {
         };
     }
 
+    public static Deployment createDeployment(boolean ready) {
+        DeploymentStatus status = new DeploymentStatus();
+        status.setAvailableReplicas(ready ? 1 : 0);
+        status.setReplicas(1);
+        DeploymentSpec spec = new DeploymentSpec();
+        spec.setReplicas(1);
+        Deployment deployment = new Deployment();
+        deployment.setSpec(spec);
+        deployment.setStatus(status);
+        return deployment;
+    }
+
     public static Context createContextWithReadyJobManagerDeployment() {
         return new Context() {
             @Override
@@ -136,15 +148,22 @@ public class TestUtils {
             @Override
             public <T> Optional<T> getSecondaryResource(
                     Class<T> expectedType, String eventSourceName) {
-                DeploymentStatus status = new DeploymentStatus();
-                status.setAvailableReplicas(1);
-                status.setReplicas(1);
-                DeploymentSpec spec = new DeploymentSpec();
-                spec.setReplicas(1);
-                Deployment deployment = new Deployment();
-                deployment.setSpec(spec);
-                deployment.setStatus(status);
-                return Optional.of((T) deployment);
+                return Optional.of((T) createDeployment(true));
+            }
+        };
+    }
+
+    public static Context createContextWithInProgressDeployment() {
+        return new Context() {
+            @Override
+            public Optional<RetryInfo> getRetryInfo() {
+                return Optional.empty();
+            }
+
+            @Override
+            public <T> Optional<T> getSecondaryResource(
+                    Class<T> expectedType, String eventSourceName) {
+                return Optional.of((T) createDeployment(false));
             }
         };
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 38c3374..5f034d2 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -30,6 +30,8 @@ import 
org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 
+import io.fabric8.kubernetes.api.model.PodList;
+
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -46,6 +48,7 @@ public class TestingFlinkService extends FlinkService {
     private List<Tuple2<String, JobStatusMessage>> jobs = new ArrayList<>();
     private Set<String> sessions = new HashSet<>();
     private boolean isPortReady = true;
+    private PodList podList = new PodList();
 
     public TestingFlinkService() {
         super(null, null);
@@ -132,4 +135,13 @@ public class TestingFlinkService extends FlinkService {
     public void setPortReady(boolean isPortReady) {
         this.isPortReady = isPortReady;
     }
+
+    @Override
+    public PodList getJmPodList(FlinkDeployment deployment, Configuration 
conf) {
+        return podList;
+    }
+
+    public void setJmPodList(PodList podList) {
+        this.podList = podList;
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 9f011de..3c1af8c 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.observer.ObserverFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
@@ -34,7 +35,12 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import 
org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
+import io.fabric8.kubernetes.api.model.ContainerStatus;
+import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
 import io.fabric8.kubernetes.api.model.EventBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodListBuilder;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
@@ -199,6 +205,79 @@ public class FlinkDeploymentControllerTest {
     }
 
     @Test
+    public void verifyInProgressDeploymentWithCrashLoopBackoff() throws 
Exception {
+        mockServer
+                .expect()
+                .post()
+                .withPath("/api/v1/namespaces/flink-operator-test/events")
+                .andReturn(
+                        HttpURLConnection.HTTP_CREATED,
+                        new 
EventBuilder().withNewMetadata().endMetadata().build())
+                .once();
+
+        String crashLoopMessage = "container fails";
+        ContainerStatus cs =
+                new ContainerStatusBuilder()
+                        .withNewState()
+                        .withNewWaiting()
+                        
.withReason(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF)
+                        .withMessage(crashLoopMessage)
+                        .endWaiting()
+                        .endState()
+                        .build();
+
+        Pod pod = TestUtils.getTestPod("host", "apiVersion", 
Collections.emptyList());
+        pod.setStatus(
+                new PodStatusBuilder()
+                        .withContainerStatuses(Collections.singletonList(cs))
+                        .build());
+        flinkService.setJmPodList(new PodListBuilder().withItems(pod).build());
+
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+        UpdateControl<FlinkDeployment> updateControl;
+
+        testController.reconcile(appCluster, TestUtils.createEmptyContext());
+        updateControl =
+                testController.reconcile(
+                        appCluster, 
TestUtils.createContextWithInProgressDeployment());
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                Optional.of(
+                        TimeUnit.SECONDS.toMillis(
+                                
operatorConfiguration.getReconcileIntervalSeconds())),
+                updateControl.getScheduleDelay());
+
+        RecordedRequest recordedRequest = mockServer.getLastRequest();
+        assertEquals("POST", recordedRequest.getMethod());
+        String recordedRequestBody = recordedRequest.getBody().readUtf8();
+        assertTrue(
+                
recordedRequestBody.contains(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF));
+        assertTrue(recordedRequestBody.contains(crashLoopMessage));
+        assertEquals(
+                JobManagerDeploymentStatus.ERROR,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        // Validate reconciliation status
+        ReconciliationStatus reconciliationStatus =
+                appCluster.getStatus().getReconciliationStatus();
+        assertFalse(reconciliationStatus.isSuccess());
+
+        // next cycle should not create another event
+        updateControl =
+                testController.reconcile(
+                        appCluster, 
TestUtils.createContextWithFailedJobManagerDeployment());
+        assertEquals(
+                JobManagerDeploymentStatus.ERROR,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        assertFalse(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.READY
+                        .rescheduleAfter(appCluster, operatorConfiguration)
+                        .toMillis(),
+                updateControl.getScheduleDelay().get());
+    }
+
+    @Test
     public void verifyUpgradeFromSavepoint() {
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
         appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);

Reply via email to