This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 684b4597764daefe53fb1399298324bec5bc738e Author: Thomas Weise <t...@apache.org> AuthorDate: Sat Feb 26 14:55:43 2022 -0800 [FLINK-26261] Refactor to simplify reconciliation logic --- .../flink/kubernetes/operator/FlinkOperator.java | 3 - .../controller/FlinkDeploymentController.java | 95 ++++--------------- .../operator/observer/JobStatusObserver.java | 4 - .../operator/reconciler/BaseReconciler.java | 103 +++++++++++++++++++++ .../operator/reconciler/JobReconciler.java | 36 ++++++- .../operator/reconciler/SessionReconciler.java | 25 ++++- .../kubernetes/operator/service/FlinkService.java | 2 +- .../kubernetes/operator/TestingFlinkService.java | 2 +- .../controller/FlinkDeploymentControllerTest.java | 38 ++------ .../operator/reconciler/JobReconcilerTest.java | 45 +++++++-- 10 files changed, 218 insertions(+), 135 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index 7e8edf5..8fa72b9 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -21,7 +21,6 @@ import org.apache.flink.kubernetes.operator.config.DefaultConfig; import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig; import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController; import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils; -import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; import org.apache.flink.kubernetes.operator.reconciler.JobReconciler; import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler; import org.apache.flink.kubernetes.operator.service.FlinkService; @@ -56,7 +55,6 @@ public class FlinkOperator { FlinkService flinkService = new FlinkService(client); - JobStatusObserver observer = new JobStatusObserver(flinkService); JobReconciler jobReconciler = new JobReconciler(client, flinkService); SessionReconciler sessionReconciler = new SessionReconciler(client, flinkService); @@ -68,7 +66,6 @@ public class FlinkOperator { client, namespace, validator, - observer, jobReconciler, sessionReconciler); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index a6c6072..2e79926 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -23,16 +23,15 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; -import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; +import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler; import org.apache.flink.kubernetes.operator.reconciler.JobReconciler; import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.kubernetes.operator.utils.IngressUtils; import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator; +import org.apache.flink.kubernetes.utils.Constants; import io.fabric8.kubernetes.api.model.apps.Deployment; -import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; -import io.fabric8.kubernetes.api.model.apps.DeploymentStatus; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.javaoperatorsdk.operator.api.reconciler.Context; @@ -50,10 +49,8 @@ import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; import java.util.List; import java.util.Optional; -import java.util.concurrent.TimeUnit; /** Controller that runs the main reconcile loop for Flink deployments. */ @ControllerConfiguration(generationAwareEventProcessing = false) @@ -63,33 +60,26 @@ public class FlinkDeploymentController EventSourceInitializer<FlinkDeployment> { private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class); - public static final int REFRESH_SECONDS = 60; - public static final int PORT_READY_DELAY_SECONDS = 10; - private final KubernetesClient kubernetesClient; private final String operatorNamespace; private final FlinkDeploymentValidator validator; - private final JobStatusObserver observer; private final JobReconciler jobReconciler; private final SessionReconciler sessionReconciler; private final DefaultConfig defaultConfig; - private final HashSet<String> jobManagerDeployments = new HashSet<>(); public FlinkDeploymentController( DefaultConfig defaultConfig, KubernetesClient kubernetesClient, String operatorNamespace, FlinkDeploymentValidator validator, - JobStatusObserver observer, JobReconciler jobReconciler, SessionReconciler sessionReconciler) { this.defaultConfig = defaultConfig; this.kubernetesClient = kubernetesClient; this.operatorNamespace = operatorNamespace; this.validator = validator; - this.observer = observer; this.jobReconciler = jobReconciler; this.sessionReconciler = sessionReconciler; } @@ -104,7 +94,7 @@ public class FlinkDeploymentController operatorNamespace, kubernetesClient, true); - jobManagerDeployments.remove(flinkApp.getMetadata().getSelfLink()); + getReconciler(flinkApp).removeDeployment(flinkApp); return DeleteControl.defaultDelete(); } @@ -122,14 +112,11 @@ public class FlinkDeploymentController Configuration effectiveConfig = FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig()); try { - // only check job status when the JM deployment is ready - boolean shouldReconcile = - !jobManagerDeployments.contains(flinkApp.getMetadata().getSelfLink()) - || observer.observeFlinkJobStatus(flinkApp, effectiveConfig); - if (shouldReconcile) { - reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig); - updateForReconciliationSuccess(flinkApp); - } + UpdateControl<FlinkDeployment> updateControl = + getReconciler(flinkApp) + .reconcile(operatorNamespace, flinkApp, context, effectiveConfig); + updateForReconciliationSuccess(flinkApp); + return updateControl; } catch (InvalidDeploymentException ide) { LOG.error("Reconciliation failed", ide); updateForReconciliationError(flinkApp, ide.getMessage()); @@ -137,62 +124,10 @@ public class FlinkDeploymentController } catch (Exception e) { throw new ReconciliationException(e); } - - return checkJobManagerDeployment(flinkApp, context, effectiveConfig); } - private UpdateControl<FlinkDeployment> checkJobManagerDeployment( - FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) { - if (!jobManagerDeployments.contains(flinkApp.getMetadata().getSelfLink())) { - Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class); - if (deployment.isPresent()) { - DeploymentStatus status = deployment.get().getStatus(); - DeploymentSpec spec = deployment.get().getSpec(); - if (status != null - && status.getAvailableReplicas() != null - && spec.getReplicas().intValue() == status.getReplicas() - && spec.getReplicas().intValue() == status.getAvailableReplicas()) { - // typically it takes a few seconds for the REST server to be ready - if (observer.isJobManagerReady(effectiveConfig)) { - LOG.info( - "JobManager deployment {} in namespace {} is ready", - flinkApp.getMetadata().getName(), - flinkApp.getMetadata().getNamespace()); - jobManagerDeployments.add(flinkApp.getMetadata().getSelfLink()); - if (flinkApp.getStatus().getJobStatus() != null) { - // short circuit, if the job was already running - // reschedule for immediate job status check - return UpdateControl.updateStatus(flinkApp).rescheduleAfter(0); - } - } - LOG.info( - "JobManager deployment {} in namespace {} port not ready", - flinkApp.getMetadata().getName(), - flinkApp.getMetadata().getNamespace()); - return UpdateControl.updateStatus(flinkApp) - .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS); - } else { - LOG.info( - "JobManager deployment {} in namespace {} not yet ready, status {}", - flinkApp.getMetadata().getName(), - flinkApp.getMetadata().getNamespace(), - status); - } - } - } - return UpdateControl.updateStatus(flinkApp) - .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS); - } - - private void reconcileFlinkDeployment( - String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) - throws Exception { - - if (flinkApp.getSpec().getJob() == null) { - sessionReconciler.reconcile(operatorNamespace, flinkApp, effectiveConfig); - } else { - jobReconciler.reconcile(operatorNamespace, flinkApp, effectiveConfig); - } + private BaseReconciler getReconciler(FlinkDeployment flinkDeployment) { + return flinkDeployment.getSpec().getJob() == null ? sessionReconciler : jobReconciler; } private void updateForReconciliationSuccess(FlinkDeployment flinkApp) { @@ -217,10 +152,14 @@ public class FlinkDeploymentController .apps() .deployments() .inAnyNamespace() - .withLabel("type", "flink-native-kubernetes") - .withLabel("component", "jobmanager") + .withLabel(Constants.LABEL_TYPE_KEY, Constants.LABEL_TYPE_NATIVE_TYPE) + .withLabel( + Constants.LABEL_COMPONENT_KEY, + Constants.LABEL_COMPONENT_JOB_MANAGER) .runnableInformer(0); - return List.of(new InformerEventSource<>(deploymentInformer, Mappers.fromLabel("app"))); + return List.of( + new InformerEventSource<>( + deploymentInformer, Mappers.fromLabel(Constants.LABEL_APP_KEY))); } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java index cb56dc3..577d73b 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java @@ -83,10 +83,6 @@ public class JobStatusObserver { } } - public boolean isJobManagerReady(Configuration config) { - return flinkService.isJobManagerReady(config); - } - /** Merge previous job status with the new one from the flink job cluster. */ private JobStatus mergeJobStatus( JobStatus oldStatus, List<JobStatusMessage> clusterJobStatuses) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java new file mode 100644 index 0000000..f1c0c23 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.service.FlinkService; + +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; +import io.fabric8.kubernetes.api.model.apps.DeploymentStatus; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +/** BaseReconciler with functionality that is common to job and session modes. */ +public abstract class BaseReconciler { + + private static final Logger LOG = LoggerFactory.getLogger(BaseReconciler.class); + + public static final int REFRESH_SECONDS = 60; + public static final int PORT_READY_DELAY_SECONDS = 10; + + private final HashSet<String> jobManagerDeployments = new HashSet<>(); + + public boolean removeDeployment(FlinkDeployment flinkApp) { + return jobManagerDeployments.remove(flinkApp.getMetadata().getUid()); + } + + public abstract UpdateControl<FlinkDeployment> reconcile( + String operatorNamespace, + FlinkDeployment flinkApp, + Context context, + Configuration effectiveConfig) + throws Exception; + + protected UpdateControl<FlinkDeployment> checkJobManagerDeployment( + FlinkDeployment flinkApp, + Context context, + Configuration effectiveConfig, + FlinkService flinkService) { + if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) { + Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class); + if (deployment.isPresent()) { + DeploymentStatus status = deployment.get().getStatus(); + DeploymentSpec spec = deployment.get().getSpec(); + if (status != null + && status.getAvailableReplicas() != null + && spec.getReplicas().intValue() == status.getReplicas() + && spec.getReplicas().intValue() == status.getAvailableReplicas()) { + // typically it takes a few seconds for the REST server to be ready + if (flinkService.isJobManagerPortReady(effectiveConfig)) { + LOG.info( + "JobManager deployment {} in namespace {} is ready", + flinkApp.getMetadata().getName(), + flinkApp.getMetadata().getNamespace()); + jobManagerDeployments.add(flinkApp.getMetadata().getUid()); + if (flinkApp.getStatus().getJobStatus() != null) { + // pre-existing deployments on operator restart - proceed with + // reconciliation + return null; + } + } + LOG.info( + "JobManager deployment {} in namespace {} port not ready", + flinkApp.getMetadata().getName(), + flinkApp.getMetadata().getNamespace()); + return UpdateControl.updateStatus(flinkApp) + .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS); + } + LOG.info( + "JobManager deployment {} in namespace {} not yet ready, status {}", + flinkApp.getMetadata().getName(), + flinkApp.getMetadata().getNamespace(), + status); + // TODO: how frequently do we want here + return UpdateControl.updateStatus(flinkApp) + .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS); + } + } + return null; + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java index fb599e8..7a072e0 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java @@ -25,36 +25,45 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; import org.apache.flink.kubernetes.operator.crd.spec.JobState; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; +import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.IngressUtils; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Optional; +import java.util.concurrent.TimeUnit; /** * Reconciler responsible for handling the job lifecycle according to the desired and current * states. */ -public class JobReconciler { +public class JobReconciler extends BaseReconciler { private static final Logger LOG = LoggerFactory.getLogger(JobReconciler.class); private final KubernetesClient kubernetesClient; private final FlinkService flinkService; + private final JobStatusObserver observer; public JobReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) { this.kubernetesClient = kubernetesClient; this.flinkService = flinkService; + this.observer = new JobStatusObserver(flinkService); } - public void reconcile( - String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) + @Override + public UpdateControl<FlinkDeployment> reconcile( + String operatorNamespace, + FlinkDeployment flinkApp, + Context context, + Configuration effectiveConfig) throws Exception { - FlinkDeploymentSpec lastReconciledSpec = flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec(); JobSpec jobSpec = flinkApp.getSpec().getJob(); @@ -65,9 +74,22 @@ public class JobReconciler { Optional.ofNullable(jobSpec.getInitialSavepointPath())); IngressUtils.updateIngressRules( flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false); - return; } + // wait until the deployment is ready + UpdateControl<FlinkDeployment> uc = + checkJobManagerDeployment(flinkApp, context, effectiveConfig, flinkService); + if (uc != null) { + return uc; + } + + if (!observer.observeFlinkJobStatus(flinkApp, effectiveConfig)) { + return UpdateControl.updateStatus(flinkApp) + .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS); + } + + // TODO: following assumes that current job is running + // What if it never enters running state due to bad deployment? boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec); if (specChanged) { JobState currentJobState = lastReconciledSpec.getJob().getState(); @@ -97,6 +119,9 @@ public class JobReconciler { } } } + + return UpdateControl.updateStatus(flinkApp) + .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS); } private void deployFlinkJob( @@ -150,6 +175,7 @@ public class JobReconciler { effectiveConfig); JobStatus jobStatus = flinkApp.getStatus().getJobStatus(); jobStatus.setState("suspended"); + removeDeployment(flinkApp); savepointOpt.ifPresent(jobStatus::setSavepointLocation); return savepointOpt; } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java index 820e31f..0c9ade3 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java @@ -24,14 +24,18 @@ import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.IngressUtils; import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + /** * Reconciler responsible for handling the session cluster lifecycle according to the desired and * current states. */ -public class SessionReconciler { +public class SessionReconciler extends BaseReconciler { private static final Logger LOG = LoggerFactory.getLogger(SessionReconciler.class); @@ -43,8 +47,12 @@ public class SessionReconciler { this.flinkService = flinkService; } - public void reconcile( - String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) + @Override + public UpdateControl<FlinkDeployment> reconcile( + String operatorNamespace, + FlinkDeployment flinkApp, + Context context, + Configuration effectiveConfig) throws Exception { FlinkDeploymentSpec lastReconciledSpec = @@ -54,14 +62,21 @@ public class SessionReconciler { flinkService.submitSessionCluster(flinkApp, effectiveConfig); IngressUtils.updateIngressRules( flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false); - return; } - boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec); + UpdateControl<FlinkDeployment> uc = + checkJobManagerDeployment(flinkApp, context, effectiveConfig, flinkService); + if (uc != null) { + return uc; + } + boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec); if (specChanged) { upgradeSessionCluster(flinkApp, effectiveConfig); } + + return UpdateControl.updateStatus(flinkApp) + .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS); } private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration effectiveConfig) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java index a2e731c..fc98dcf 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java @@ -100,7 +100,7 @@ public class FlinkService { LOG.info("Session cluster {} deployed", deployment.getMetadata().getName()); } - public boolean isJobManagerReady(Configuration config) { + public boolean isJobManagerPortReady(Configuration config) { final URI uri; try (ClusterClient<String> clusterClient = getClusterClient(config)) { uri = URI.create(clusterClient.getWebInterfaceURL()); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index 398ef84..626c0ae 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -102,7 +102,7 @@ public class TestingFlinkService extends FlinkService { } @Override - public boolean isJobManagerReady(Configuration config) { + public boolean isJobManagerPortReady(Configuration config) { return true; } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 24db802..ab7c884 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -25,24 +25,20 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; -import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; +import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler; import org.apache.flink.kubernetes.operator.reconciler.JobReconciler; +import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest; import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator; import org.apache.flink.runtime.client.JobStatusMessage; -import io.fabric8.kubernetes.api.model.apps.Deployment; -import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; -import io.fabric8.kubernetes.api.model.apps.DeploymentStatus; import io.javaoperatorsdk.operator.api.reconciler.Context; -import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -53,27 +49,7 @@ import static org.junit.Assert.assertTrue; /** @link JobStatusObserver unit tests */ public class FlinkDeploymentControllerTest { - private final Context context = - new Context() { - @Override - public Optional<RetryInfo> getRetryInfo() { - return Optional.empty(); - } - - @Override - public <T> Optional<T> getSecondaryResource( - Class<T> expectedType, String eventSourceName) { - DeploymentStatus status = new DeploymentStatus(); - status.setAvailableReplicas(1); - status.setReplicas(1); - DeploymentSpec spec = new DeploymentSpec(); - spec.setReplicas(1); - Deployment deployment = new Deployment(); - deployment.setSpec(spec); - deployment.setStatus(status); - return Optional.of((T) deployment); - } - }; + private final Context context = JobReconcilerTest.createContextWithReadyJobManagerDeployment(); @Test public void verifyBasicReconcileLoop() { @@ -86,13 +62,13 @@ public class FlinkDeploymentControllerTest { updateControl = testController.reconcile(appCluster, context); assertTrue(updateControl.isUpdateStatus()); assertEquals( - FlinkDeploymentController.PORT_READY_DELAY_SECONDS * 1000, + BaseReconciler.PORT_READY_DELAY_SECONDS * 1000, (long) updateControl.getScheduleDelay().get()); updateControl = testController.reconcile(appCluster, context); assertTrue(updateControl.isUpdateStatus()); assertEquals( - FlinkDeploymentController.REFRESH_SECONDS * 1000, + BaseReconciler.REFRESH_SECONDS * 1000, (long) updateControl.getScheduleDelay().get()); // Validate reconciliation status @@ -105,7 +81,7 @@ public class FlinkDeploymentControllerTest { updateControl = testController.reconcile(appCluster, context); assertTrue(updateControl.isUpdateStatus()); assertEquals( - FlinkDeploymentController.REFRESH_SECONDS * 1000, + BaseReconciler.REFRESH_SECONDS * 1000, (long) updateControl.getScheduleDelay().get()); // Validate job status @@ -216,7 +192,6 @@ public class FlinkDeploymentControllerTest { } private FlinkDeploymentController createTestController(TestingFlinkService flinkService) { - JobStatusObserver observer = new JobStatusObserver(flinkService); JobReconciler jobReconciler = new JobReconciler(null, flinkService); SessionReconciler sessionReconciler = new SessionReconciler(null, flinkService); @@ -225,7 +200,6 @@ public class FlinkDeploymentControllerTest { null, "test", new DefaultDeploymentValidator(), - observer, jobReconciler, sessionReconciler); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java index 8f99806..6a2109b 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java @@ -25,13 +25,20 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.JobState; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; +import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; +import io.fabric8.kubernetes.api.model.apps.DeploymentStatus; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; import org.junit.jupiter.api.Test; import java.util.List; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -40,15 +47,39 @@ import static org.junit.jupiter.api.Assertions.assertTrue; /** @link JobStatusObserver unit tests */ public class JobReconcilerTest { + public static Context createContextWithReadyJobManagerDeployment() { + return new Context() { + @Override + public Optional<RetryInfo> getRetryInfo() { + return Optional.empty(); + } + + @Override + public <T> Optional<T> getSecondaryResource( + Class<T> expectedType, String eventSourceName) { + DeploymentStatus status = new DeploymentStatus(); + status.setAvailableReplicas(1); + status.setReplicas(1); + DeploymentSpec spec = new DeploymentSpec(); + spec.setReplicas(1); + Deployment deployment = new Deployment(); + deployment.setSpec(spec); + deployment.setStatus(status); + return Optional.of((T) deployment); + } + }; + } + @Test public void testUpgrade() throws Exception { + Context context = JobReconcilerTest.createContextWithReadyJobManagerDeployment(); TestingFlinkService flinkService = new TestingFlinkService(); JobReconciler reconciler = new JobReconciler(null, flinkService); FlinkDeployment deployment = TestUtils.buildApplicationCluster(); Configuration config = FlinkUtils.getEffectiveConfig(deployment, new Configuration()); - reconciler.reconcile("test", deployment, config); + reconciler.reconcile("test", deployment, context, config); List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs(); verifyAndSetRunningJobsToStatus(deployment, runningJobs); @@ -56,7 +87,7 @@ public class JobReconcilerTest { FlinkDeployment statelessUpgrade = TestUtils.clone(deployment); statelessUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS); statelessUpgrade.getSpec().getFlinkConfiguration().put("new", "conf"); - reconciler.reconcile("test", statelessUpgrade, config); + reconciler.reconcile("test", statelessUpgrade, context, config); runningJobs = flinkService.listJobs(); assertEquals(1, runningJobs.size()); @@ -72,7 +103,7 @@ public class JobReconcilerTest { statefulUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT); statefulUpgrade.getSpec().getFlinkConfiguration().put("new", "conf2"); - reconciler.reconcile("test", statefulUpgrade, new Configuration(config)); + reconciler.reconcile("test", statefulUpgrade, context, new Configuration(config)); runningJobs = flinkService.listJobs(); assertEquals(1, runningJobs.size()); @@ -82,13 +113,15 @@ public class JobReconcilerTest { @Test public void testUpgradeModeChangeFromSavepointToLastState() throws Exception { final String expectedSavepointPath = "savepoint_0"; + final Context context = JobReconcilerTest.createContextWithReadyJobManagerDeployment(); final TestingFlinkService flinkService = new TestingFlinkService(); + JobStatusObserver observer = new JobStatusObserver(flinkService); final JobReconciler reconciler = new JobReconciler(null, flinkService); final FlinkDeployment deployment = TestUtils.buildApplicationCluster(); final Configuration config = FlinkUtils.getEffectiveConfig(deployment, new Configuration()); - reconciler.reconcile("test", deployment, config); + reconciler.reconcile("test", deployment, context, config); List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs(); verifyAndSetRunningJobsToStatus(deployment, runningJobs); @@ -97,7 +130,7 @@ public class JobReconcilerTest { deployment.getSpec().getJob().setState(JobState.SUSPENDED); deployment.getSpec().setImage("new-image-1"); - reconciler.reconcile("test", deployment, config); + reconciler.reconcile("test", deployment, context, config); assertEquals(0, flinkService.listJobs().size()); assertTrue( JobState.SUSPENDED @@ -118,7 +151,7 @@ public class JobReconcilerTest { deployment.getSpec().getJob().setState(JobState.RUNNING); deployment.getSpec().setImage("new-image-2"); - reconciler.reconcile("test", deployment, config); + reconciler.reconcile("test", deployment, context, config); runningJobs = flinkService.listJobs(); assertEquals(expectedSavepointPath, config.get(SavepointConfigOptions.SAVEPOINT_PATH)); assertEquals(1, runningJobs.size());