This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 4bac805 [FLINK-26432] Cleanly separate validator, observer and reconciler modules 4bac805 is described below commit 4bac8059ec5d3c68da19beec22f29c434562516e Author: Gyula Fora <g_f...@apple.com> AuthorDate: Mon Feb 28 06:17:11 2022 +0100 [FLINK-26432] Cleanly separate validator, observer and reconciler modules closes #26 --- .../flink/kubernetes/operator/FlinkOperator.java | 3 + .../controller/FlinkDeploymentController.java | 20 ++- .../operator/crd/status/FlinkDeploymentStatus.java | 6 +- .../JobManagerDeploymentStatus.java | 4 +- .../operator/observer/JobStatusObserver.java | 115 --------------- .../kubernetes/operator/observer/Observer.java | 161 +++++++++++++++++++++ .../operator/reconciler/BaseReconciler.java | 74 +--------- .../operator/reconciler/JobReconciler.java | 27 +--- .../operator/reconciler/SessionReconciler.java | 7 - .../flink/kubernetes/operator/TestUtils.java | 17 +++ .../kubernetes/operator/TestingFlinkService.java | 7 +- .../controller/FlinkDeploymentControllerTest.java | 47 +++--- .../operator/observer/JobStatusObserverTest.java | 77 ---------- .../kubernetes/operator/observer/ObserverTest.java | 135 +++++++++++++++++ .../operator/reconciler/JobReconcilerTest.java | 4 +- .../crds/flinkdeployments.flink.apache.org-v1.yml | 7 + 16 files changed, 394 insertions(+), 317 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index 8fa72b9..fd9d60c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -21,6 +21,7 @@ import org.apache.flink.kubernetes.operator.config.DefaultConfig; import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig; import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController; import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils; +import org.apache.flink.kubernetes.operator.observer.Observer; import org.apache.flink.kubernetes.operator.reconciler.JobReconciler; import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler; import org.apache.flink.kubernetes.operator.service.FlinkService; @@ -55,6 +56,7 @@ public class FlinkOperator { FlinkService flinkService = new FlinkService(client); + Observer observer = new Observer(flinkService); JobReconciler jobReconciler = new JobReconciler(client, flinkService); SessionReconciler sessionReconciler = new SessionReconciler(client, flinkService); @@ -66,6 +68,7 @@ public class FlinkOperator { client, namespace, validator, + observer, jobReconciler, sessionReconciler); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index 5dff8f4..b9d5cd4 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; +import org.apache.flink.kubernetes.operator.observer.Observer; import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler; import org.apache.flink.kubernetes.operator.reconciler.JobReconciler; import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler; @@ -64,6 +65,7 @@ public class FlinkDeploymentController private final String operatorNamespace; private final FlinkDeploymentValidator validator; + private final Observer observer; private final JobReconciler jobReconciler; private final SessionReconciler sessionReconciler; private final DefaultConfig defaultConfig; @@ -73,12 +75,14 @@ public class FlinkDeploymentController KubernetesClient kubernetesClient, String operatorNamespace, FlinkDeploymentValidator validator, + Observer observer, JobReconciler jobReconciler, SessionReconciler sessionReconciler) { this.defaultConfig = defaultConfig; this.kubernetesClient = kubernetesClient; this.operatorNamespace = operatorNamespace; this.validator = validator; + this.observer = observer; this.jobReconciler = jobReconciler; this.sessionReconciler = sessionReconciler; } @@ -86,12 +90,12 @@ public class FlinkDeploymentController @Override public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) { LOG.info("Stopping cluster {}", flinkApp.getMetadata().getName()); + Configuration effectiveConfig = + FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig()); + + observer.observe(flinkApp, context, effectiveConfig); return getReconciler(flinkApp) - .shutdownAndDelete( - operatorNamespace, - flinkApp, - context, - FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig())); + .shutdownAndDelete(operatorNamespace, flinkApp, effectiveConfig); } @Override @@ -107,6 +111,12 @@ public class FlinkDeploymentController Configuration effectiveConfig = FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig()); + + boolean readyToReconcile = observer.observe(flinkApp, context, effectiveConfig); + if (!readyToReconcile) { + return flinkApp.getStatus().getJobManagerDeploymentStatus().toUpdateControl(flinkApp); + } + try { UpdateControl<FlinkDeployment> updateControl = getReconciler(flinkApp) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java index 928c827..e192163 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java @@ -17,6 +17,8 @@ package org.apache.flink.kubernetes.operator.crd.status; +import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus; + import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -26,6 +28,8 @@ import lombok.NoArgsConstructor; @NoArgsConstructor @AllArgsConstructor public class FlinkDeploymentStatus { - private JobStatus jobStatus; + private JobStatus jobStatus = new JobStatus(); + private JobManagerDeploymentStatus jobManagerDeploymentStatus = + JobManagerDeploymentStatus.MISSING; private ReconciliationStatus reconciliationStatus = new ReconciliationStatus(); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobManagerDeploymentStatus.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java similarity index 97% rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobManagerDeploymentStatus.java rename to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java index 5e634a9..08262ff 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobManagerDeploymentStatus.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.reconciler; +package org.apache.flink.kubernetes.operator.observer; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; @@ -44,12 +44,12 @@ public enum JobManagerDeploymentStatus { public UpdateControl<FlinkDeployment> toUpdateControl(FlinkDeployment flinkDeployment) { switch (this) { case DEPLOYING: + case READY: return UpdateControl.updateStatus(flinkDeployment) .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS); case DEPLOYED_NOT_READY: return UpdateControl.updateStatus(flinkDeployment) .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS); - case READY: case MISSING: default: return null; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java deleted file mode 100644 index 577d73b..0000000 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.kubernetes.operator.observer; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; -import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; -import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; -import org.apache.flink.kubernetes.operator.crd.spec.JobState; -import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; -import org.apache.flink.kubernetes.operator.crd.status.JobStatus; -import org.apache.flink.kubernetes.operator.service.FlinkService; -import org.apache.flink.runtime.client.JobStatusMessage; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -/** Observes the actual state of the running jobs on the Flink cluster. */ -public class JobStatusObserver { - - private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class); - - private final FlinkService flinkService; - - public JobStatusObserver(FlinkService flinkService) { - this.flinkService = flinkService; - } - - public boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) - throws Exception { - FlinkDeploymentSpec lastReconciledSpec = - flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec(); - - if (lastReconciledSpec == null) { - // This is the first run, nothing to observe - return true; - } - - JobSpec jobSpec = lastReconciledSpec.getJob(); - - if (jobSpec == null) { - // This is a session cluster, nothing to observe - return true; - } - - if (!jobSpec.getState().equals(JobState.RUNNING)) { - // The job is not running, nothing to observe - return true; - } - LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName()); - FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus(); - - Collection<JobStatusMessage> clusterJobStatuses = flinkService.listJobs(effectiveConfig); - if (clusterJobStatuses.isEmpty()) { - LOG.info("No jobs found on {} yet", flinkApp.getMetadata().getName()); - return false; - } else { - flinkAppStatus.setJobStatus( - mergeJobStatus( - flinkAppStatus.getJobStatus(), new ArrayList<>(clusterJobStatuses))); - LOG.info("Job statuses updated for {}", flinkApp.getMetadata().getName()); - return true; - } - } - - /** Merge previous job status with the new one from the flink job cluster. */ - private JobStatus mergeJobStatus( - JobStatus oldStatus, List<JobStatusMessage> clusterJobStatuses) { - JobStatus newStatus = oldStatus; - Collections.sort( - clusterJobStatuses, - (j1, j2) -> -1 * Long.compare(j1.getStartTime(), j2.getStartTime())); - JobStatusMessage newJob = clusterJobStatuses.get(0); - - if (newStatus == null) { - newStatus = createJobStatus(newJob); - } else { - newStatus.setState(newJob.getJobState().name()); - newStatus.setJobName(newJob.getJobName()); - newStatus.setJobId(newJob.getJobId().toHexString()); - // track the start time, changing timestamp would cause busy reconciliation - newStatus.setUpdateTime(String.valueOf(newJob.getStartTime())); - } - return newStatus; - } - - public static JobStatus createJobStatus(JobStatusMessage message) { - JobStatus jobStatus = new JobStatus(); - jobStatus.setJobId(message.getJobId().toHexString()); - jobStatus.setJobName(message.getJobName()); - jobStatus.setState(message.getJobState().name()); - jobStatus.setUpdateTime(String.valueOf(message.getStartTime())); - return jobStatus; - } -} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java new file mode 100644 index 0000000..2772fbb --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.observer; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; +import org.apache.flink.kubernetes.operator.crd.status.JobStatus; +import org.apache.flink.kubernetes.operator.service.FlinkService; +import org.apache.flink.runtime.client.JobStatusMessage; + +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; +import io.fabric8.kubernetes.api.model.apps.DeploymentStatus; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +/** Observes the actual state of the running jobs on the Flink cluster. */ +public class Observer { + + private static final Logger LOG = LoggerFactory.getLogger(Observer.class); + + private final FlinkService flinkService; + + public Observer(FlinkService flinkService) { + this.flinkService = flinkService; + } + + public boolean observe( + FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) { + observeJmDeployment(flinkApp, context, effectiveConfig); + return isReadyToReconcile(flinkApp, effectiveConfig); + } + + private void observeJmDeployment( + FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) { + FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus(); + JobManagerDeploymentStatus previousJmStatus = + deploymentStatus.getJobManagerDeploymentStatus(); + + if (JobManagerDeploymentStatus.READY == previousJmStatus) { + return; + } + + if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) { + deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); + return; + } + + Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class); + if (deployment.isPresent()) { + DeploymentStatus status = deployment.get().getStatus(); + DeploymentSpec spec = deployment.get().getSpec(); + if (status != null + && status.getAvailableReplicas() != null + && spec.getReplicas().intValue() == status.getReplicas() + && spec.getReplicas().intValue() == status.getAvailableReplicas() + && flinkService.isJobManagerPortReady(effectiveConfig)) { + + // typically it takes a few seconds for the REST server to be ready + LOG.info( + "JobManager deployment {} in namespace {} port ready, waiting for the REST API...", + flinkApp.getMetadata().getName(), + flinkApp.getMetadata().getNamespace()); + deploymentStatus.setJobManagerDeploymentStatus( + JobManagerDeploymentStatus.DEPLOYED_NOT_READY); + return; + } + LOG.info( + "JobManager deployment {} in namespace {} exists but not ready yet, status {}", + flinkApp.getMetadata().getName(), + flinkApp.getMetadata().getNamespace(), + status); + + deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); + return; + } + + deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING); + } + + private boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) { + + // No need to observe job status for session clusters + if (flinkApp.getSpec().getJob() == null) { + return true; + } + + LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName()); + FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus(); + + Collection<JobStatusMessage> clusterJobStatuses; + try { + clusterJobStatuses = flinkService.listJobs(effectiveConfig); + } catch (Exception e) { + LOG.error("Exception while listing jobs", e); + flinkAppStatus.getJobStatus().setState("UNKNOWN"); + return false; + } + if (clusterJobStatuses.isEmpty()) { + LOG.info("No jobs found on {} yet", flinkApp.getMetadata().getName()); + return false; + } else { + updateJobStatus(flinkAppStatus.getJobStatus(), new ArrayList<>(clusterJobStatuses)); + LOG.info("Job statuses updated for {}", flinkApp.getMetadata().getName()); + return true; + } + } + + private boolean isReadyToReconcile(FlinkDeployment flinkApp, Configuration effectiveConfig) { + JobManagerDeploymentStatus jmDeploymentStatus = + flinkApp.getStatus().getJobManagerDeploymentStatus(); + + switch (jmDeploymentStatus) { + case READY: + return observeFlinkJobStatus(flinkApp, effectiveConfig); + case MISSING: + return true; + case DEPLOYING: + case DEPLOYED_NOT_READY: + return false; + default: + throw new RuntimeException("Unknown status: " + jmDeploymentStatus); + } + } + + /** Update previous job status based on the job list from the cluster. */ + private void updateJobStatus(JobStatus status, List<JobStatusMessage> clusterJobStatuses) { + Collections.sort( + clusterJobStatuses, (j1, j2) -> Long.compare(j2.getStartTime(), j1.getStartTime())); + JobStatusMessage newJob = clusterJobStatuses.get(0); + + status.setState(newJob.getJobState().name()); + status.setJobName(newJob.getJobName()); + status.setJobId(newJob.getJobId().toHexString()); + // track the start time, changing timestamp would cause busy reconciliation + status.setUpdateTime(String.valueOf(newJob.getStartTime())); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java index d6cbb29..b94956b 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java @@ -19,13 +19,11 @@ package org.apache.flink.kubernetes.operator.reconciler; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.kubernetes.operator.utils.IngressUtils; -import io.fabric8.kubernetes.api.model.apps.Deployment; -import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; -import io.fabric8.kubernetes.api.model.apps.DeploymentStatus; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; @@ -33,9 +31,6 @@ import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; -import java.util.Optional; - /** BaseReconciler with functionality that is common to job and session modes. */ public abstract class BaseReconciler { @@ -44,8 +39,6 @@ public abstract class BaseReconciler { public static final int REFRESH_SECONDS = 60; public static final int PORT_READY_DELAY_SECONDS = 10; - private final HashSet<String> jobManagerDeployments = new HashSet<>(); - protected final KubernetesClient kubernetesClient; protected final FlinkService flinkService; @@ -54,10 +47,6 @@ public abstract class BaseReconciler { this.flinkService = flinkService; } - public boolean removeDeployment(FlinkDeployment flinkApp) { - return jobManagerDeployments.remove(flinkApp.getMetadata().getUid()); - } - public abstract UpdateControl<FlinkDeployment> reconcile( String operatorNamespace, FlinkDeployment flinkApp, @@ -65,70 +54,15 @@ public abstract class BaseReconciler { Configuration effectiveConfig) throws Exception; - protected JobManagerDeploymentStatus checkJobManagerDeployment( - FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) { - if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) { - Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class); - if (deployment.isPresent()) { - DeploymentStatus status = deployment.get().getStatus(); - DeploymentSpec spec = deployment.get().getSpec(); - if (status != null - && status.getAvailableReplicas() != null - && spec.getReplicas().intValue() == status.getReplicas() - && spec.getReplicas().intValue() == status.getAvailableReplicas()) { - // typically it takes a few seconds for the REST server to be ready - if (flinkService.isJobManagerPortReady(effectiveConfig)) { - LOG.info( - "JobManager deployment {} in namespace {} is ready", - flinkApp.getMetadata().getName(), - flinkApp.getMetadata().getNamespace()); - jobManagerDeployments.add(flinkApp.getMetadata().getUid()); - if (flinkApp.getStatus().getJobStatus() != null) { - // pre-existing deployments on operator restart - proceed with - // reconciliation - return JobManagerDeploymentStatus.READY; - } - } - LOG.info( - "JobManager deployment {} in namespace {} port not ready", - flinkApp.getMetadata().getName(), - flinkApp.getMetadata().getNamespace()); - return JobManagerDeploymentStatus.DEPLOYED_NOT_READY; - } - LOG.info( - "JobManager deployment {} in namespace {} not yet ready, status {}", - flinkApp.getMetadata().getName(), - flinkApp.getMetadata().getNamespace(), - status); - - return JobManagerDeploymentStatus.DEPLOYING; - } - return JobManagerDeploymentStatus.MISSING; - } - return JobManagerDeploymentStatus.READY; - } - - /** - * Shuts down the job and deletes all kubernetes resources including k8s HA resources. It will - * first perform a graceful shutdown (cancel) before deleting the data if that is unsuccessful - * in order to avoid leaking HA metadata to durable storage. - * - * <p>This feature is limited at the moment to cleaning up native kubernetes HA resources, other - * HA providers like ZK need to be cleaned up manually after deletion. - */ public DeleteControl shutdownAndDelete( - String operatorNamespace, - FlinkDeployment flinkApp, - Context context, - Configuration effectiveConfig) { + String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) { - if (checkJobManagerDeployment(flinkApp, context, effectiveConfig) - == JobManagerDeploymentStatus.READY) { + if (JobManagerDeploymentStatus.READY + == flinkApp.getStatus().getJobManagerDeploymentStatus()) { shutdown(flinkApp, effectiveConfig); } else { FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true); } - removeDeployment(flinkApp); IngressUtils.updateIngressRules( flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, true); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java index ee121e9..3abb860 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java @@ -25,7 +25,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; import org.apache.flink.kubernetes.operator.crd.spec.JobState; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; -import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; +import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.kubernetes.operator.utils.IngressUtils; @@ -48,11 +48,8 @@ public class JobReconciler extends BaseReconciler { private static final Logger LOG = LoggerFactory.getLogger(JobReconciler.class); - private final JobStatusObserver observer; - public JobReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) { super(kubernetesClient, flinkService); - this.observer = new JobStatusObserver(flinkService); } @Override @@ -72,19 +69,7 @@ public class JobReconciler extends BaseReconciler { Optional.ofNullable(jobSpec.getInitialSavepointPath())); IngressUtils.updateIngressRules( flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false); - } - - // wait until the deployment is ready - UpdateControl<FlinkDeployment> uc = - checkJobManagerDeployment(flinkApp, context, effectiveConfig) - .toUpdateControl(flinkApp); - if (uc != null) { - return uc; - } - - if (!observer.observeFlinkJobStatus(flinkApp, effectiveConfig)) { - return UpdateControl.updateStatus(flinkApp) - .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS); + return JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(flinkApp); } // TODO: following assumes that current job is running @@ -174,15 +159,17 @@ public class JobReconciler extends BaseReconciler { effectiveConfig); JobStatus jobStatus = flinkApp.getStatus().getJobStatus(); jobStatus.setState("suspended"); - removeDeployment(flinkApp); + flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING); savepointOpt.ifPresent(jobStatus::setSavepointLocation); return savepointOpt; } @Override protected void shutdown(FlinkDeployment flinkApp, Configuration effectiveConfig) { - if (flinkApp.getStatus().getJobStatus() != null - && flinkApp.getStatus().getJobStatus().getJobId() != null) { + if (org.apache.flink.api.common.JobStatus.RUNNING + .name() + .equalsIgnoreCase(flinkApp.getStatus().getJobStatus().getState())) { + LOG.info("Job is running, attempting graceful shutdown."); try { flinkService.cancelJob( JobID.fromHexString(flinkApp.getStatus().getJobStatus().getJobId()), diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java index e72de56..d63fdeb 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java @@ -60,13 +60,6 @@ public class SessionReconciler extends BaseReconciler { flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false); } - UpdateControl<FlinkDeployment> uc = - checkJobManagerDeployment(flinkApp, context, effectiveConfig) - .toUpdateControl(flinkApp); - if (uc != null) { - return uc; - } - boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec); if (specChanged) { upgradeSessionCluster(flinkApp, effectiveConfig); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java index 9ada6f8..055d91b 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java @@ -33,9 +33,12 @@ import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodSpec; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; import java.util.Collections; import java.util.List; +import java.util.Optional; /** Testing utilities. */ public class TestUtils { @@ -109,4 +112,18 @@ public class TestUtils { pod.setSpec(podSpec); return pod; } + + public static Context createEmptyContext() { + return new Context() { + @Override + public Optional<RetryInfo> getRetryInfo() { + return Optional.empty(); + } + + @Override + public <T> Optional<T> getSecondaryResource(Class<T> aClass, String s) { + return Optional.empty(); + } + }; + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index 610176b..2aab5ab 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -41,6 +41,7 @@ public class TestingFlinkService extends FlinkService { private List<Tuple2<String, JobStatusMessage>> jobs = new ArrayList<>(); private Set<String> sessions = new HashSet<>(); + private boolean isPortReady = true; public TestingFlinkService() { super(null); @@ -104,6 +105,10 @@ public class TestingFlinkService extends FlinkService { @Override public boolean isJobManagerPortReady(Configuration config) { - return true; + return isPortReady; + } + + public void setPortReady(boolean isPortReady) { + this.isPortReady = isPortReady; } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index ee7a386..f4468c6 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -25,8 +25,8 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; -import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler; -import org.apache.flink.kubernetes.operator.reconciler.JobManagerDeploymentStatus; +import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus; +import org.apache.flink.kubernetes.operator.observer.Observer; import org.apache.flink.kubernetes.operator.reconciler.JobReconciler; import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest; import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler; @@ -60,20 +60,12 @@ public class FlinkDeploymentControllerTest { UpdateControl<FlinkDeployment> updateControl; - updateControl = testController.reconcile(appCluster, context); + updateControl = testController.reconcile(appCluster, TestUtils.createEmptyContext()); assertTrue(updateControl.isUpdateStatus()); assertEquals( - JobManagerDeploymentStatus.DEPLOYED_NOT_READY - .toUpdateControl(appCluster) - .getScheduleDelay(), + JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(appCluster).getScheduleDelay(), updateControl.getScheduleDelay()); - updateControl = testController.reconcile(appCluster, context); - assertTrue(updateControl.isUpdateStatus()); - assertEquals( - BaseReconciler.REFRESH_SECONDS * 1000, - (long) updateControl.getScheduleDelay().get()); - // Validate reconciliation status ReconciliationStatus reconciliationStatus = appCluster.getStatus().getReconciliationStatus(); @@ -84,8 +76,16 @@ public class FlinkDeploymentControllerTest { updateControl = testController.reconcile(appCluster, context); assertTrue(updateControl.isUpdateStatus()); assertEquals( - BaseReconciler.REFRESH_SECONDS * 1000, - (long) updateControl.getScheduleDelay().get()); + JobManagerDeploymentStatus.DEPLOYED_NOT_READY + .toUpdateControl(appCluster) + .getScheduleDelay(), + updateControl.getScheduleDelay()); + + updateControl = testController.reconcile(appCluster, context); + assertTrue(updateControl.isUpdateStatus()); + assertEquals( + JobManagerDeploymentStatus.READY.toUpdateControl(appCluster).getScheduleDelay(), + updateControl.getScheduleDelay()); // Validate job status JobStatus jobStatus = appCluster.getStatus().getJobStatus(); @@ -122,7 +122,7 @@ public class FlinkDeploymentControllerTest { appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT); appCluster.getSpec().getJob().setInitialSavepointPath("s0"); - testController.reconcile(appCluster, context); + testController.reconcile(appCluster, TestUtils.createEmptyContext()); List<Tuple2<String, JobStatusMessage>> jobs = flinkService.listJobs(); assertEquals(1, jobs.size()); assertEquals("s0", jobs.get(0).f0); @@ -143,16 +143,20 @@ public class FlinkDeploymentControllerTest { jobs = flinkService.listJobs(); assertEquals(1, jobs.size()); assertEquals("savepoint_0", jobs.get(0).f0); + testController.reconcile(appCluster, context); // Suspend job appCluster = TestUtils.clone(appCluster); appCluster.getSpec().getJob().setState(JobState.SUSPENDED); testController.reconcile(appCluster, context); + assertEquals( + JobManagerDeploymentStatus.MISSING, + appCluster.getStatus().getJobManagerDeploymentStatus()); // Resume from last savepoint appCluster = TestUtils.clone(appCluster); appCluster.getSpec().getJob().setState(JobState.RUNNING); - testController.reconcile(appCluster, context); + testController.reconcile(appCluster, TestUtils.createEmptyContext()); jobs = flinkService.listJobs(); assertEquals(1, jobs.size()); assertEquals("savepoint_1", jobs.get(0).f0); @@ -171,7 +175,7 @@ public class FlinkDeploymentControllerTest { appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS); appCluster.getSpec().getJob().setInitialSavepointPath("s0"); - testController.reconcile(appCluster, context); + testController.reconcile(appCluster, TestUtils.createEmptyContext()); List<Tuple2<String, JobStatusMessage>> jobs = flinkService.listJobs(); assertEquals(1, jobs.size()); assertEquals("s0", jobs.get(0).f0); @@ -180,6 +184,13 @@ public class FlinkDeploymentControllerTest { appCluster = TestUtils.clone(appCluster); appCluster.getSpec().getJob().setParallelism(100); + UpdateControl<FlinkDeployment> updateControl = + testController.reconcile(appCluster, context); + assertEquals( + JobManagerDeploymentStatus.DEPLOYED_NOT_READY + .toUpdateControl(appCluster) + .getScheduleDelay(), + updateControl.getScheduleDelay()); testController.reconcile(appCluster, context); jobs = flinkService.listJobs(); assertEquals(1, jobs.size()); @@ -200,6 +211,7 @@ public class FlinkDeploymentControllerTest { } private FlinkDeploymentController createTestController(TestingFlinkService flinkService) { + Observer observer = new Observer(flinkService); JobReconciler jobReconciler = new JobReconciler(null, flinkService); SessionReconciler sessionReconciler = new SessionReconciler(null, flinkService); @@ -208,6 +220,7 @@ public class FlinkDeploymentControllerTest { null, "test", new DefaultDeploymentValidator(), + observer, jobReconciler, sessionReconciler); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java deleted file mode 100644 index 769af93..0000000 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.kubernetes.operator.observer; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.TestUtils; -import org.apache.flink.kubernetes.operator.TestingFlinkService; -import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; -import org.apache.flink.kubernetes.operator.crd.spec.JobState; -import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; -import org.apache.flink.kubernetes.operator.service.FlinkService; -import org.apache.flink.kubernetes.operator.utils.FlinkUtils; - -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -/** @link JobStatusObserver unit tests */ -public class JobStatusObserverTest { - - @Test - public void observeSessionCluster() throws Exception { - FlinkService flinkService = new TestingFlinkService(); - JobStatusObserver observer = new JobStatusObserver(flinkService); - FlinkDeployment deployment = TestUtils.buildSessionCluster(); - deployment - .getStatus() - .getReconciliationStatus() - .setLastReconciledSpec(deployment.getSpec()); - assertTrue( - observer.observeFlinkJobStatus( - deployment, - FlinkUtils.getEffectiveConfig(deployment, new Configuration()))); - } - - @Test - public void observeApplicationCluster() throws Exception { - TestingFlinkService flinkService = new TestingFlinkService(); - JobStatusObserver observer = new JobStatusObserver(flinkService); - FlinkDeployment deployment = TestUtils.buildApplicationCluster(); - Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new Configuration()); - - assertTrue(observer.observeFlinkJobStatus(deployment, conf)); - deployment.setStatus(new FlinkDeploymentStatus()); - deployment - .getStatus() - .getReconciliationStatus() - .setLastReconciledSpec(deployment.getSpec()); - - flinkService.submitApplicationCluster(deployment, conf); - assertTrue(observer.observeFlinkJobStatus(deployment, conf)); - - assertEquals( - deployment.getMetadata().getName(), - deployment.getStatus().getJobStatus().getJobName()); - - deployment.getSpec().getJob().setState(JobState.SUSPENDED); - flinkService.clear(); - assertTrue(observer.observeFlinkJobStatus(deployment, conf)); - } -} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java new file mode 100644 index 0000000..4e7615c --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.observer; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.TestUtils; +import org.apache.flink.kubernetes.operator.TestingFlinkService; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; +import org.apache.flink.kubernetes.operator.crd.status.JobStatus; +import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest; +import org.apache.flink.kubernetes.operator.service.FlinkService; +import org.apache.flink.kubernetes.operator.utils.FlinkUtils; + +import io.javaoperatorsdk.operator.api.reconciler.Context; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** @link Observer unit tests */ +public class ObserverTest { + + private final Context readyContext = + JobReconcilerTest.createContextWithReadyJobManagerDeployment(); + + @Test + public void observeSessionCluster() { + FlinkService flinkService = new TestingFlinkService(); + Observer observer = new Observer(flinkService); + FlinkDeployment deployment = TestUtils.buildSessionCluster(); + deployment + .getStatus() + .getReconciliationStatus() + .setLastReconciledSpec(deployment.getSpec()); + + assertFalse( + observer.observe( + deployment, + readyContext, + FlinkUtils.getEffectiveConfig(deployment, new Configuration()))); + + assertEquals( + JobManagerDeploymentStatus.DEPLOYED_NOT_READY, + deployment.getStatus().getJobManagerDeploymentStatus()); + + assertTrue( + observer.observe( + deployment, + readyContext, + FlinkUtils.getEffectiveConfig(deployment, new Configuration()))); + + assertEquals( + JobManagerDeploymentStatus.READY, + deployment.getStatus().getJobManagerDeploymentStatus()); + } + + @Test + public void observeApplicationCluster() { + TestingFlinkService flinkService = new TestingFlinkService(); + Observer observer = new Observer(flinkService); + FlinkDeployment deployment = TestUtils.buildApplicationCluster(); + Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new Configuration()); + + assertTrue(observer.observe(deployment, TestUtils.createEmptyContext(), conf)); + + deployment.setStatus(new FlinkDeploymentStatus()); + deployment + .getStatus() + .getReconciliationStatus() + .setLastReconciledSpec(deployment.getSpec()); + deployment.getStatus().setJobStatus(new JobStatus()); + flinkService.submitApplicationCluster(deployment, conf); + + // Validate port check logic + flinkService.setPortReady(false); + + // Port not ready + assertFalse(observer.observe(deployment, readyContext, conf)); + assertEquals( + JobManagerDeploymentStatus.DEPLOYING, + deployment.getStatus().getJobManagerDeploymentStatus()); + + assertFalse(observer.observe(deployment, readyContext, conf)); + assertEquals( + JobManagerDeploymentStatus.DEPLOYING, + deployment.getStatus().getJobManagerDeploymentStatus()); + + flinkService.setPortReady(true); + // Port ready but we have to recheck once again + assertFalse(observer.observe(deployment, readyContext, conf)); + assertEquals( + JobManagerDeploymentStatus.DEPLOYED_NOT_READY, + deployment.getStatus().getJobManagerDeploymentStatus()); + + // Stable ready + assertTrue(observer.observe(deployment, readyContext, conf)); + assertEquals( + JobManagerDeploymentStatus.READY, + deployment.getStatus().getJobManagerDeploymentStatus()); + + assertTrue(observer.observe(deployment, readyContext, conf)); + assertEquals( + JobManagerDeploymentStatus.READY, + deployment.getStatus().getJobManagerDeploymentStatus()); + + assertEquals( + deployment.getMetadata().getName(), + deployment.getStatus().getJobStatus().getJobName()); + + // Test listing failure + flinkService.clear(); + assertFalse(observer.observe(deployment, readyContext, conf)); + assertEquals( + JobManagerDeploymentStatus.READY, + deployment.getStatus().getJobManagerDeploymentStatus()); + assertEquals("UNKNOWN", deployment.getStatus().getJobStatus().getState()); + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java index 6a2109b..2ee8793 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java @@ -25,7 +25,7 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.JobState; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; -import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; +import org.apache.flink.kubernetes.operator.observer.Observer; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; @@ -115,7 +115,7 @@ public class JobReconcilerTest { final String expectedSavepointPath = "savepoint_0"; final Context context = JobReconcilerTest.createContextWithReadyJobManagerDeployment(); final TestingFlinkService flinkService = new TestingFlinkService(); - JobStatusObserver observer = new JobStatusObserver(flinkService); + Observer observer = new Observer(flinkService); final JobReconciler reconciler = new JobReconciler(null, flinkService); final FlinkDeployment deployment = TestUtils.buildApplicationCluster(); diff --git a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml index 5ae554c..ec9ee63 100644 --- a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml +++ b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml @@ -9077,6 +9077,13 @@ spec: savepointLocation: type: string type: object + jobManagerDeploymentStatus: + enum: + - READY + - DEPLOYED_NOT_READY + - DEPLOYING + - MISSING + type: string reconciliationStatus: properties: success: