This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 4b7ffaa [FLINK-26473] Check for deployment errors when listJobs fails 4b7ffaa is described below commit 4b7ffaa47082a5baaa4150d6315a9307087e64c6 Author: Thomas Weise <t...@apache.org> AuthorDate: Sun Mar 13 21:41:11 2022 -0700 [FLINK-26473] Check for deployment errors when listJobs fails --- .../exception/DeploymentFailedException.java | 31 +++++++-- .../kubernetes/operator/observer/BaseObserver.java | 37 ++++++++-- .../kubernetes/operator/observer/JobObserver.java | 15 +++- .../kubernetes/operator/service/FlinkService.java | 12 ++++ .../kubernetes/operator/utils/FlinkUtils.java | 16 +++-- .../flink/kubernetes/operator/TestUtils.java | 37 +++++++--- .../kubernetes/operator/TestingFlinkService.java | 12 ++++ .../controller/FlinkDeploymentControllerTest.java | 79 ++++++++++++++++++++++ 8 files changed, 210 insertions(+), 29 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java index b0ea7f3..572f065 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java @@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.exception; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import io.fabric8.kubernetes.api.model.ContainerStateWaiting; import io.fabric8.kubernetes.api.model.Event; import io.fabric8.kubernetes.api.model.EventBuilder; import io.fabric8.kubernetes.api.model.apps.DeploymentCondition; @@ -26,15 +27,33 @@ import io.fabric8.kubernetes.api.model.apps.DeploymentCondition; /** Exception to signal terminal deployment failure. */ public class DeploymentFailedException extends RuntimeException { public static final String COMPONENT_JOBMANAGER = "JobManagerDeployment"; + public static final String REASON_CRASH_LOOP_BACKOFF = "CrashLoopBackOff"; + private static final long serialVersionUID = -1070179896083579221L; public final String component; - public final DeploymentCondition deployCondition; + public final String type; + public final String reason; + public final String lastTransitionTime; + public final String lastUpdateTime; public DeploymentFailedException(String component, DeploymentCondition deployCondition) { super(deployCondition.getMessage()); this.component = component; - this.deployCondition = deployCondition; + this.type = deployCondition.getType(); + this.reason = deployCondition.getReason(); + this.lastTransitionTime = deployCondition.getLastTransitionTime(); + this.lastUpdateTime = deployCondition.getLastUpdateTime(); + } + + public DeploymentFailedException( + String component, String type, ContainerStateWaiting stateWaiting) { + super(stateWaiting.getMessage()); + this.component = component; + this.type = type; + this.reason = stateWaiting.getReason(); + this.lastTransitionTime = null; + this.lastUpdateTime = null; } public static Event asEvent(DeploymentFailedException dfe, FlinkDeployment flinkApp) { @@ -47,10 +66,10 @@ public class DeploymentFailedException extends RuntimeException { .withNamespace(flinkApp.getMetadata().getNamespace()) .withUid(flinkApp.getMetadata().getUid()) .endInvolvedObject() - .withType(dfe.deployCondition.getType()) - .withReason(dfe.deployCondition.getReason()) - .withFirstTimestamp(dfe.deployCondition.getLastTransitionTime()) - .withLastTimestamp(dfe.deployCondition.getLastUpdateTime()) + .withType(dfe.type) + .withReason(dfe.reason) + .withFirstTimestamp(dfe.lastTransitionTime) + .withLastTimestamp(dfe.lastUpdateTime) .withMessage(dfe.getMessage()) .withNewMetadata() .withGenerateName(flinkApp.getMetadata().getName()) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java index 180cb6f..a694f6d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java @@ -25,6 +25,10 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; import org.apache.flink.kubernetes.operator.service.FlinkService; +import io.fabric8.kubernetes.api.model.ContainerStateWaiting; +import io.fabric8.kubernetes.api.model.ContainerStatus; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.DeploymentCondition; import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; @@ -42,6 +46,7 @@ public abstract class BaseObserver implements Observer { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); public static final String JOB_STATE_UNKNOWN = "UNKNOWN"; + protected final FlinkService flinkService; protected final FlinkOperatorConfiguration operatorConfiguration; @@ -57,10 +62,6 @@ public abstract class BaseObserver implements Observer { JobManagerDeploymentStatus previousJmStatus = deploymentStatus.getJobManagerDeploymentStatus(); - if (JobManagerDeploymentStatus.READY == previousJmStatus) { - return; - } - if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) { deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); return; @@ -86,7 +87,7 @@ public abstract class BaseObserver implements Observer { return; } logger.info( - "JobManager deployment {} in namespace {} exists but not ready yet, status {}", + "JobManager deployment {} in namespace {} exists but not ready, status {}", flinkApp.getMetadata().getName(), flinkApp.getMetadata().getNamespace(), status); @@ -104,6 +105,18 @@ public abstract class BaseObserver implements Observer { return; } } + + // checking the pod is expensive; only do it when the deployment isn't ready + try { + checkCrashLoopBackoff(flinkApp, effectiveConfig); + } catch (DeploymentFailedException dfe) { + if (!JobManagerDeploymentStatus.ERROR.equals( + deploymentStatus.getJobManagerDeploymentStatus())) { + throw dfe; + } + return; + } + deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); return; } @@ -119,6 +132,20 @@ public abstract class BaseObserver implements Observer { : null); } + private void checkCrashLoopBackoff(FlinkDeployment flinkApp, Configuration effectiveConfig) { + PodList jmPods = flinkService.getJmPodList(flinkApp, effectiveConfig); + for (Pod pod : jmPods.getItems()) { + for (ContainerStatus cs : pod.getStatus().getContainerStatuses()) { + ContainerStateWaiting csw = cs.getState().getWaiting(); + if (DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF.equals(csw.getReason())) { + logger.warn("JobManager pod fails: {} {}", csw.getReason(), csw.getMessage()); + throw new DeploymentFailedException( + DeploymentFailedException.COMPONENT_JOBMANAGER, "Warning", csw); + } + } + } + } + protected boolean isClusterReady(FlinkDeployment dep) { return dep.getStatus().getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.READY; } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java index ea5a58f..026ad01 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeoutException; /** The observer of {@link org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */ public class JobObserver extends BaseObserver { @@ -46,16 +47,20 @@ public class JobObserver extends BaseObserver { @Override public void observe(FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) { - observeJmDeployment(flinkApp, context, effectiveConfig); + if (JobManagerDeploymentStatus.READY + != flinkApp.getStatus().getJobManagerDeploymentStatus()) { + observeJmDeployment(flinkApp, context, effectiveConfig); + } if (isClusterReady(flinkApp)) { - boolean jobFound = observeFlinkJobStatus(flinkApp, effectiveConfig); + boolean jobFound = observeFlinkJobStatus(flinkApp, context, effectiveConfig); if (jobFound) { observeSavepointStatus(flinkApp, effectiveConfig); } } } - private boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) { + private boolean observeFlinkJobStatus( + FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) { logger.info("Getting job statuses for {}", flinkApp.getMetadata().getName()); FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus(); @@ -65,6 +70,10 @@ public class JobObserver extends BaseObserver { } catch (Exception e) { logger.error("Exception while listing jobs", e); flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN); + if (e instanceof TimeoutException) { + // check for problems with the underlying deployment + observeJmDeployment(flinkApp, context, effectiveConfig); + } return false; } if (clusterJobStatuses.isEmpty()) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java index 4dbd1e6..c3faf97 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java @@ -52,6 +52,7 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHea import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.client.NamespacedKubernetesClient; import org.apache.commons.lang3.ObjectUtils; import org.slf4j.Logger; @@ -263,4 +264,15 @@ public class FlinkService { return SavepointFetchResult.completed(savepoint); } } + + public PodList getJmPodList(FlinkDeployment deployment, Configuration conf) { + final String namespace = conf.getString(KubernetesConfigOptions.NAMESPACE); + final String clusterId; + try (ClusterClient<String> clusterClient = getClusterClient(conf)) { + clusterId = clusterClient.getClusterId(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + return FlinkUtils.getJmPodList(kubernetesClient, namespace, clusterId); + } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java index 38da91f..4a3f5b3 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java @@ -170,12 +170,7 @@ public class FlinkUtils { for (int i = 0; i < 60; i++) { if (jobManagerRunning) { - PodList jmPodList = - kubernetesClient - .pods() - .inNamespace(namespace) - .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId)) - .list(); + PodList jmPodList = getJmPodList(kubernetesClient, namespace, clusterId); if (jmPodList == null || jmPodList.getItems().isEmpty()) { jobManagerRunning = false; @@ -217,4 +212,13 @@ public class FlinkUtils { conf.getString(KubernetesConfigOptions.NAMESPACE), conf.getString(KubernetesConfigOptions.CLUSTER_ID)); } + + public static PodList getJmPodList( + KubernetesClient kubernetesClient, String namespace, String clusterId) { + return kubernetesClient + .pods() + .inNamespace(namespace) + .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId)) + .list(); + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java index 3bb1251..a1fbb97 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java @@ -126,6 +126,18 @@ public class TestUtils { }; } + public static Deployment createDeployment(boolean ready) { + DeploymentStatus status = new DeploymentStatus(); + status.setAvailableReplicas(ready ? 1 : 0); + status.setReplicas(1); + DeploymentSpec spec = new DeploymentSpec(); + spec.setReplicas(1); + Deployment deployment = new Deployment(); + deployment.setSpec(spec); + deployment.setStatus(status); + return deployment; + } + public static Context createContextWithReadyJobManagerDeployment() { return new Context() { @Override @@ -136,15 +148,22 @@ public class TestUtils { @Override public <T> Optional<T> getSecondaryResource( Class<T> expectedType, String eventSourceName) { - DeploymentStatus status = new DeploymentStatus(); - status.setAvailableReplicas(1); - status.setReplicas(1); - DeploymentSpec spec = new DeploymentSpec(); - spec.setReplicas(1); - Deployment deployment = new Deployment(); - deployment.setSpec(spec); - deployment.setStatus(status); - return Optional.of((T) deployment); + return Optional.of((T) createDeployment(true)); + } + }; + } + + public static Context createContextWithInProgressDeployment() { + return new Context() { + @Override + public Optional<RetryInfo> getRetryInfo() { + return Optional.empty(); + } + + @Override + public <T> Optional<T> getSecondaryResource( + Class<T> expectedType, String eventSourceName) { + return Optional.of((T) createDeployment(false)); } }; } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index 38c3374..5f034d2 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -30,6 +30,8 @@ import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import io.fabric8.kubernetes.api.model.PodList; + import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -46,6 +48,7 @@ public class TestingFlinkService extends FlinkService { private List<Tuple2<String, JobStatusMessage>> jobs = new ArrayList<>(); private Set<String> sessions = new HashSet<>(); private boolean isPortReady = true; + private PodList podList = new PodList(); public TestingFlinkService() { super(null, null); @@ -132,4 +135,13 @@ public class TestingFlinkService extends FlinkService { public void setPortReady(boolean isPortReady) { this.isPortReady = isPortReady; } + + @Override + public PodList getJmPodList(FlinkDeployment deployment, Configuration conf) { + return podList; + } + + public void setJmPodList(PodList podList) { + this.podList = podList; + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 9f011de..3c1af8c 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -26,6 +26,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; +import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.observer.ObserverFactory; import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory; @@ -34,7 +35,12 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator; import org.apache.flink.runtime.client.JobStatusMessage; +import io.fabric8.kubernetes.api.model.ContainerStatus; +import io.fabric8.kubernetes.api.model.ContainerStatusBuilder; import io.fabric8.kubernetes.api.model.EventBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodListBuilder; +import io.fabric8.kubernetes.api.model.PodStatusBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; @@ -199,6 +205,79 @@ public class FlinkDeploymentControllerTest { } @Test + public void verifyInProgressDeploymentWithCrashLoopBackoff() throws Exception { + mockServer + .expect() + .post() + .withPath("/api/v1/namespaces/flink-operator-test/events") + .andReturn( + HttpURLConnection.HTTP_CREATED, + new EventBuilder().withNewMetadata().endMetadata().build()) + .once(); + + String crashLoopMessage = "container fails"; + ContainerStatus cs = + new ContainerStatusBuilder() + .withNewState() + .withNewWaiting() + .withReason(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF) + .withMessage(crashLoopMessage) + .endWaiting() + .endState() + .build(); + + Pod pod = TestUtils.getTestPod("host", "apiVersion", Collections.emptyList()); + pod.setStatus( + new PodStatusBuilder() + .withContainerStatuses(Collections.singletonList(cs)) + .build()); + flinkService.setJmPodList(new PodListBuilder().withItems(pod).build()); + + FlinkDeployment appCluster = TestUtils.buildApplicationCluster(); + UpdateControl<FlinkDeployment> updateControl; + + testController.reconcile(appCluster, TestUtils.createEmptyContext()); + updateControl = + testController.reconcile( + appCluster, TestUtils.createContextWithInProgressDeployment()); + assertTrue(updateControl.isUpdateStatus()); + assertEquals( + Optional.of( + TimeUnit.SECONDS.toMillis( + operatorConfiguration.getReconcileIntervalSeconds())), + updateControl.getScheduleDelay()); + + RecordedRequest recordedRequest = mockServer.getLastRequest(); + assertEquals("POST", recordedRequest.getMethod()); + String recordedRequestBody = recordedRequest.getBody().readUtf8(); + assertTrue( + recordedRequestBody.contains(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF)); + assertTrue(recordedRequestBody.contains(crashLoopMessage)); + assertEquals( + JobManagerDeploymentStatus.ERROR, + appCluster.getStatus().getJobManagerDeploymentStatus()); + + // Validate reconciliation status + ReconciliationStatus reconciliationStatus = + appCluster.getStatus().getReconciliationStatus(); + assertFalse(reconciliationStatus.isSuccess()); + + // next cycle should not create another event + updateControl = + testController.reconcile( + appCluster, TestUtils.createContextWithFailedJobManagerDeployment()); + assertEquals( + JobManagerDeploymentStatus.ERROR, + appCluster.getStatus().getJobManagerDeploymentStatus()); + assertFalse(updateControl.isUpdateStatus()); + assertEquals( + JobManagerDeploymentStatus.READY + .rescheduleAfter(appCluster, operatorConfiguration) + .toMillis(), + updateControl.getScheduleDelay().get()); + } + + @Test public void verifyUpgradeFromSavepoint() { FlinkDeployment appCluster = TestUtils.buildApplicationCluster(); appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);