This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 43e1716 [FLINK-26399] Make some option of operator configurable 43e1716 is described below commit 43e171627e5cf47a866958c1e88b5c182b686802 Author: 愚鲤 <yuli....@alibaba-inc.com> AuthorDate: Wed Mar 2 23:26:21 2022 +0800 [FLINK-26399] Make some option of operator configurable Closes #31 --- .../flink/kubernetes/operator/FlinkOperator.java | 10 ++++- .../config/FlinkOperatorConfiguration.java | 52 ++++++++++++++++++++++ .../operator/config/OperatorConfigOptions.java | 41 +++++++++++++++++ .../controller/FlinkDeploymentController.java | 8 +++- .../metrics/KubernetesOperatorMetricOptions.java | 1 + .../observer/JobManagerDeploymentStatus.java | 15 ++++--- .../operator/reconciler/BaseReconciler.java | 15 +++---- .../operator/reconciler/JobReconciler.java | 14 ++++-- .../operator/reconciler/SessionReconciler.java | 15 ++++--- .../validation/DefaultDeploymentValidator.java | 12 ++--- .../controller/FlinkDeploymentControllerTest.java | 22 ++++++--- .../operator/reconciler/JobReconcilerTest.java | 9 +++- 12 files changed, 173 insertions(+), 41 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index fd9d60c..284860d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -18,6 +18,7 @@ package org.apache.flink.kubernetes.operator; import org.apache.flink.kubernetes.operator.config.DefaultConfig; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig; import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController; import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils; @@ -55,16 +56,21 @@ public class FlinkOperator { Operator operator = new Operator(client, configurationService); FlinkService flinkService = new FlinkService(client); + FlinkOperatorConfiguration operatorConfiguration = + FlinkOperatorConfiguration.fromConfiguration(defaultConfig.getOperatorConfig()); Observer observer = new Observer(flinkService); - JobReconciler jobReconciler = new JobReconciler(client, flinkService); - SessionReconciler sessionReconciler = new SessionReconciler(client, flinkService); + JobReconciler jobReconciler = + new JobReconciler(client, flinkService, operatorConfiguration); + SessionReconciler sessionReconciler = + new SessionReconciler(client, flinkService, operatorConfiguration); FlinkDeploymentValidator validator = new DefaultDeploymentValidator(); FlinkDeploymentController controller = new FlinkDeploymentController( defaultConfig, + operatorConfiguration, client, namespace, validator, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java new file mode 100644 index 0000000..00e741d --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.config; + +import org.apache.flink.configuration.Configuration; + +/** Configuration class for operator. */ +public class FlinkOperatorConfiguration { + + private final int reconcileIntervalInSec; + + private final int portCheckIntervalInSec; + + public FlinkOperatorConfiguration(int reconcileIntervalInSec, int portCheckIntervalInSec) { + this.reconcileIntervalInSec = reconcileIntervalInSec; + this.portCheckIntervalInSec = portCheckIntervalInSec; + } + + public int getReconcileIntervalInSec() { + return reconcileIntervalInSec; + } + + public int getPortCheckIntervalInSec() { + return portCheckIntervalInSec; + } + + public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) { + int reconcileIntervalInSec = + operatorConfig.getInteger( + OperatorConfigOptions.OPERATOR_RECONCILER_RESCHEDULE_INTERVAL_IN_SEC); + int portCheckIntervalInSec = + operatorConfig.getInteger( + OperatorConfigOptions.OPERATOR_OBSERVER_PORT_CHECK_INTERVAL_IN_SEC); + return new FlinkOperatorConfiguration(reconcileIntervalInSec, portCheckIntervalInSec); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java new file mode 100644 index 0000000..0a80c07 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.config; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** This class holds configuration constants used by flink operator. */ +public class OperatorConfigOptions { + + public static final ConfigOption<Integer> OPERATOR_RECONCILER_RESCHEDULE_INTERVAL_IN_SEC = + ConfigOptions.key("operator.reconciler.reschedule.interval.sec") + .intType() + .defaultValue(60) + .withDescription( + "The interval in second for the controller to reschedule the reconcile process"); + + public static final ConfigOption<Integer> OPERATOR_OBSERVER_PORT_CHECK_INTERVAL_IN_SEC = + ConfigOptions.key("operator.observer.port-check.interval.sec") + .intType() + .defaultValue(10) + .withDescription( + "The interval in second for the controller to reschedule the reconcile process to " + + "wait for deployment to be ready"); +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index b9d5cd4..bc27ac5 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.controller; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.config.DefaultConfig; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException; @@ -69,9 +70,11 @@ public class FlinkDeploymentController private final JobReconciler jobReconciler; private final SessionReconciler sessionReconciler; private final DefaultConfig defaultConfig; + private final FlinkOperatorConfiguration operatorConfiguration; public FlinkDeploymentController( DefaultConfig defaultConfig, + FlinkOperatorConfiguration operatorConfiguration, KubernetesClient kubernetesClient, String operatorNamespace, FlinkDeploymentValidator validator, @@ -79,6 +82,7 @@ public class FlinkDeploymentController JobReconciler jobReconciler, SessionReconciler sessionReconciler) { this.defaultConfig = defaultConfig; + this.operatorConfiguration = operatorConfiguration; this.kubernetesClient = kubernetesClient; this.operatorNamespace = operatorNamespace; this.validator = validator; @@ -114,7 +118,9 @@ public class FlinkDeploymentController boolean readyToReconcile = observer.observe(flinkApp, context, effectiveConfig); if (!readyToReconcile) { - return flinkApp.getStatus().getJobManagerDeploymentStatus().toUpdateControl(flinkApp); + return flinkApp.getStatus() + .getJobManagerDeploymentStatus() + .toUpdateControl(flinkApp, operatorConfiguration); } try { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java index 5d092f8..3c07674 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigOptions; public class KubernetesOperatorMetricOptions { public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR = ConfigOptions.key("metrics.scope.kubernetes-operator") + .stringType() .defaultValue("<host>.kubernetes-operator.<namespace>.<name>") .withDescription( "Defines the scope format string that is applied to all metrics scoped to the kubernetes operator."); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java index 08262ff..960c1cd 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java @@ -17,15 +17,13 @@ package org.apache.flink.kubernetes.operator.observer; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import java.util.concurrent.TimeUnit; -import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS; -import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS; - /** Status of the Flink JobManager Kubernetes deployment. */ public enum JobManagerDeploymentStatus { @@ -41,15 +39,20 @@ public enum JobManagerDeploymentStatus { /** JobManager deployment not found, probably not started or killed by user. */ MISSING; - public UpdateControl<FlinkDeployment> toUpdateControl(FlinkDeployment flinkDeployment) { + public UpdateControl<FlinkDeployment> toUpdateControl( + FlinkDeployment flinkDeployment, FlinkOperatorConfiguration operatorConfiguration) { switch (this) { case DEPLOYING: case READY: return UpdateControl.updateStatus(flinkDeployment) - .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS); + .rescheduleAfter( + operatorConfiguration.getReconcileIntervalInSec(), + TimeUnit.SECONDS); case DEPLOYED_NOT_READY: return UpdateControl.updateStatus(flinkDeployment) - .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS); + .rescheduleAfter( + operatorConfiguration.getPortCheckIntervalInSec(), + TimeUnit.SECONDS); case MISSING: default: return null; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java index b94956b..b4a98d8 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java @@ -18,6 +18,7 @@ package org.apache.flink.kubernetes.operator.reconciler; import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.service.FlinkService; @@ -28,23 +29,21 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** BaseReconciler with functionality that is common to job and session modes. */ public abstract class BaseReconciler { - private static final Logger LOG = LoggerFactory.getLogger(BaseReconciler.class); - - public static final int REFRESH_SECONDS = 60; - public static final int PORT_READY_DELAY_SECONDS = 10; - + protected final FlinkOperatorConfiguration operatorConfiguration; protected final KubernetesClient kubernetesClient; protected final FlinkService flinkService; - public BaseReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) { + public BaseReconciler( + KubernetesClient kubernetesClient, + FlinkService flinkService, + FlinkOperatorConfiguration operatorConfiguration) { this.kubernetesClient = kubernetesClient; this.flinkService = flinkService; + this.operatorConfiguration = operatorConfiguration; } public abstract UpdateControl<FlinkDeployment> reconcile( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java index 3abb860..3d0cb39 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java @@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.reconciler; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; @@ -48,8 +49,11 @@ public class JobReconciler extends BaseReconciler { private static final Logger LOG = LoggerFactory.getLogger(JobReconciler.class); - public JobReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) { - super(kubernetesClient, flinkService); + public JobReconciler( + KubernetesClient kubernetesClient, + FlinkService flinkService, + FlinkOperatorConfiguration operatorConfiguration) { + super(kubernetesClient, flinkService, operatorConfiguration); } @Override @@ -69,7 +73,8 @@ public class JobReconciler extends BaseReconciler { Optional.ofNullable(jobSpec.getInitialSavepointPath())); IngressUtils.updateIngressRules( flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false); - return JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(flinkApp); + return JobManagerDeploymentStatus.DEPLOYING.toUpdateControl( + flinkApp, operatorConfiguration); } // TODO: following assumes that current job is running @@ -105,7 +110,8 @@ public class JobReconciler extends BaseReconciler { } return UpdateControl.updateStatus(flinkApp) - .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS); + .rescheduleAfter( + operatorConfiguration.getReconcileIntervalInSec(), TimeUnit.SECONDS); } private void deployFlinkJob( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java index d63fdeb..7596b4d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java @@ -18,6 +18,7 @@ package org.apache.flink.kubernetes.operator.reconciler; import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.service.FlinkService; @@ -26,8 +27,6 @@ import org.apache.flink.kubernetes.operator.utils.IngressUtils; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; @@ -37,10 +36,11 @@ import java.util.concurrent.TimeUnit; */ public class SessionReconciler extends BaseReconciler { - private static final Logger LOG = LoggerFactory.getLogger(SessionReconciler.class); - - public SessionReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) { - super(kubernetesClient, flinkService); + public SessionReconciler( + KubernetesClient kubernetesClient, + FlinkService flinkService, + FlinkOperatorConfiguration operatorConfiguration) { + super(kubernetesClient, flinkService, operatorConfiguration); } @Override @@ -66,7 +66,8 @@ public class SessionReconciler extends BaseReconciler { } return UpdateControl.updateStatus(flinkApp) - .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS); + .rescheduleAfter( + operatorConfiguration.getReconcileIntervalInSec(), TimeUnit.SECONDS); } private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration effectiveConfig) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java index 9f8046b..aaef01f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java @@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.validation; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec; @@ -38,7 +39,9 @@ import java.util.Set; public class DefaultDeploymentValidator implements FlinkDeploymentValidator { private static final String[] FORBIDDEN_CONF_KEYS = - new String[] {"kubernetes.namespace", "kubernetes.cluster-id"}; + new String[] { + KubernetesConfigOptions.NAMESPACE.key(), KubernetesConfigOptions.CLUSTER_ID.key() + }; private static final Set<String> ALLOWED_LOG_CONF_KEYS = Set.of(Constants.CONFIG_FILE_LOG4J_NAME, Constants.CONFIG_FILE_LOGBACK_NAME); @@ -121,13 +124,12 @@ public class DefaultDeploymentValidator implements FlinkDeploymentValidator { return firstPresent( validateResources("JobManager", jmSpec.getResource()), - validateJmReplicas("JobManager", jmSpec.getReplicas(), confMap)); + validateJmReplicas(jmSpec.getReplicas(), confMap)); } - private Optional<String> validateJmReplicas( - String component, int replicas, Map<String, String> confMap) { + private Optional<String> validateJmReplicas(int replicas, Map<String, String> confMap) { if (replicas < 1) { - return Optional.of(component + " replicas should not be configured less than one."); + return Optional.of("JobManager replicas should not be configured less than one."); } else if (replicas > 1 && !HighAvailabilityMode.isHighAvailabilityModeActivated( Configuration.fromMap(confMap))) { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index f4468c6..e464123 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -18,8 +18,10 @@ package org.apache.flink.kubernetes.operator.controller; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.JobState; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; @@ -51,6 +53,8 @@ import static org.junit.Assert.assertTrue; public class FlinkDeploymentControllerTest { private final Context context = JobReconcilerTest.createContextWithReadyJobManagerDeployment(); + private final FlinkOperatorConfiguration operatorConfiguration = + FlinkOperatorConfiguration.fromConfiguration(new Configuration()); @Test public void verifyBasicReconcileLoop() { @@ -63,7 +67,9 @@ public class FlinkDeploymentControllerTest { updateControl = testController.reconcile(appCluster, TestUtils.createEmptyContext()); assertTrue(updateControl.isUpdateStatus()); assertEquals( - JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(appCluster).getScheduleDelay(), + JobManagerDeploymentStatus.DEPLOYING + .toUpdateControl(appCluster, operatorConfiguration) + .getScheduleDelay(), updateControl.getScheduleDelay()); // Validate reconciliation status @@ -77,14 +83,16 @@ public class FlinkDeploymentControllerTest { assertTrue(updateControl.isUpdateStatus()); assertEquals( JobManagerDeploymentStatus.DEPLOYED_NOT_READY - .toUpdateControl(appCluster) + .toUpdateControl(appCluster, operatorConfiguration) .getScheduleDelay(), updateControl.getScheduleDelay()); updateControl = testController.reconcile(appCluster, context); assertTrue(updateControl.isUpdateStatus()); assertEquals( - JobManagerDeploymentStatus.READY.toUpdateControl(appCluster).getScheduleDelay(), + JobManagerDeploymentStatus.READY + .toUpdateControl(appCluster, operatorConfiguration) + .getScheduleDelay(), updateControl.getScheduleDelay()); // Validate job status @@ -188,7 +196,7 @@ public class FlinkDeploymentControllerTest { testController.reconcile(appCluster, context); assertEquals( JobManagerDeploymentStatus.DEPLOYED_NOT_READY - .toUpdateControl(appCluster) + .toUpdateControl(appCluster, operatorConfiguration) .getScheduleDelay(), updateControl.getScheduleDelay()); testController.reconcile(appCluster, context); @@ -212,11 +220,13 @@ public class FlinkDeploymentControllerTest { private FlinkDeploymentController createTestController(TestingFlinkService flinkService) { Observer observer = new Observer(flinkService); - JobReconciler jobReconciler = new JobReconciler(null, flinkService); - SessionReconciler sessionReconciler = new SessionReconciler(null, flinkService); + JobReconciler jobReconciler = new JobReconciler(null, flinkService, operatorConfiguration); + SessionReconciler sessionReconciler = + new SessionReconciler(null, flinkService, operatorConfiguration); return new FlinkDeploymentController( FlinkUtils.loadDefaultConfig(), + operatorConfiguration, null, "test", new DefaultDeploymentValidator(), diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java index 2ee8793..fd940d4 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.JobState; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; @@ -47,6 +48,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; /** @link JobStatusObserver unit tests */ public class JobReconcilerTest { + private final FlinkOperatorConfiguration operatorConfiguration = + FlinkOperatorConfiguration.fromConfiguration(new Configuration()); + public static Context createContextWithReadyJobManagerDeployment() { return new Context() { @Override @@ -75,7 +79,7 @@ public class JobReconcilerTest { Context context = JobReconcilerTest.createContextWithReadyJobManagerDeployment(); TestingFlinkService flinkService = new TestingFlinkService(); - JobReconciler reconciler = new JobReconciler(null, flinkService); + JobReconciler reconciler = new JobReconciler(null, flinkService, operatorConfiguration); FlinkDeployment deployment = TestUtils.buildApplicationCluster(); Configuration config = FlinkUtils.getEffectiveConfig(deployment, new Configuration()); @@ -117,7 +121,8 @@ public class JobReconcilerTest { final TestingFlinkService flinkService = new TestingFlinkService(); Observer observer = new Observer(flinkService); - final JobReconciler reconciler = new JobReconciler(null, flinkService); + final JobReconciler reconciler = + new JobReconciler(null, flinkService, operatorConfiguration); final FlinkDeployment deployment = TestUtils.buildApplicationCluster(); final Configuration config = FlinkUtils.getEffectiveConfig(deployment, new Configuration());