This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 85d997e [FLINK-26136] Extract shared deployment validation logic 85d997e is described below commit 85d997eaa0310a298bb3a3a704afacf3844635f7 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Thu Feb 24 12:06:39 2022 +0100 [FLINK-26136] Extract shared deployment validation logic Closes #24 --- .../flink/kubernetes/operator/FlinkOperator.java | 5 + .../controller/FlinkDeploymentController.java | 11 ++ .../operator/reconciler/JobReconciler.java | 14 +- .../operator/reconciler/SessionReconciler.java | 4 - .../kubernetes/operator/service/FlinkService.java | 6 +- .../validation/DefaultDeploymentValidator.java | 163 +++++++++++++++++++ .../validation/FlinkDeploymentValidator.java | 34 ++++ .../flink/kubernetes/operator/TestUtils.java | 7 +- .../controller/FlinkDeploymentControllerTest.java | 2 + .../validation/DeploymentValidatorTest.java | 176 +++++++++++++++++++++ flink-kubernetes-webhook/pom.xml | 10 +- .../operator/admission/FlinkOperatorWebhook.java | 4 +- ...eploymentValidator.java => FlinkValidator.java} | 24 +-- 13 files changed, 425 insertions(+), 35 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index cb05de3..7e8edf5 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -26,6 +26,8 @@ import org.apache.flink.kubernetes.operator.reconciler.JobReconciler; import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; +import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator; +import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.javaoperatorsdk.operator.Operator; @@ -58,11 +60,14 @@ public class FlinkOperator { JobReconciler jobReconciler = new JobReconciler(client, flinkService); SessionReconciler sessionReconciler = new SessionReconciler(client, flinkService); + FlinkDeploymentValidator validator = new DefaultDeploymentValidator(); + FlinkDeploymentController controller = new FlinkDeploymentController( defaultConfig, client, namespace, + validator, observer, jobReconciler, sessionReconciler); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index a6aeb32..f97d5fd 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -28,6 +28,7 @@ import org.apache.flink.kubernetes.operator.reconciler.JobReconciler; import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.kubernetes.operator.utils.IngressUtils; +import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.api.reconciler.Context; @@ -62,6 +63,7 @@ public class FlinkDeploymentController private final String operatorNamespace; + private final FlinkDeploymentValidator validator; private final JobStatusObserver observer; private final JobReconciler jobReconciler; private final SessionReconciler sessionReconciler; @@ -71,12 +73,14 @@ public class FlinkDeploymentController DefaultConfig defaultConfig, KubernetesClient kubernetesClient, String operatorNamespace, + FlinkDeploymentValidator validator, JobStatusObserver observer, JobReconciler jobReconciler, SessionReconciler sessionReconciler) { this.defaultConfig = defaultConfig; this.kubernetesClient = kubernetesClient; this.operatorNamespace = operatorNamespace; + this.validator = validator; this.observer = observer; this.jobReconciler = jobReconciler; this.sessionReconciler = sessionReconciler; @@ -99,6 +103,13 @@ public class FlinkDeploymentController public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) { LOG.info("Reconciling {}", flinkApp.getMetadata().getName()); + Optional<String> validationError = validator.validate(flinkApp); + if (validationError.isPresent()) { + LOG.error("Reconciliation failed: " + validationError.get()); + updateForReconciliationError(flinkApp, validationError.get()); + return UpdateControl.updateStatus(flinkApp); + } + Configuration effectiveConfig = FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig()); try { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java index 882c58a..dee22fb 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java @@ -60,9 +60,6 @@ public class JobReconciler { flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec(); JobSpec jobSpec = flinkApp.getSpec().getJob(); if (lastReconciledSpec == null) { - if (!jobSpec.getState().equals(JobState.RUNNING)) { - throw new InvalidDeploymentException("Job must start in running state"); - } deployFlinkJob( flinkApp, effectiveConfig, @@ -74,9 +71,6 @@ public class JobReconciler { boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec); if (specChanged) { - if (lastReconciledSpec.getJob() == null) { - throw new InvalidDeploymentException("Cannot switch from session to job cluster"); - } JobState currentJobState = lastReconciledSpec.getJob().getState(); JobState desiredJobState = jobSpec.getState(); @@ -129,13 +123,7 @@ public class JobReconciler { private void restoreFromLastSavepoint(FlinkDeployment flinkApp, Configuration effectiveConfig) throws Exception { JobStatus jobStatus = flinkApp.getStatus().getJobStatus(); - - String savepointLocation = jobStatus.getSavepointLocation(); - if (savepointLocation == null) { - throw new InvalidDeploymentException( - "Cannot perform stateful restore without a valid savepoint"); - } - deployFlinkJob(flinkApp, effectiveConfig, Optional.of(savepointLocation)); + deployFlinkJob(flinkApp, effectiveConfig, Optional.of(jobStatus.getSavepointLocation())); } private Optional<String> suspendJob(FlinkDeployment flinkApp, Configuration effectiveConfig) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java index 2e36fc5..820e31f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java @@ -20,7 +20,6 @@ package org.apache.flink.kubernetes.operator.reconciler; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; -import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.IngressUtils; @@ -61,9 +60,6 @@ public class SessionReconciler { boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec); if (specChanged) { - if (lastReconciledSpec.getJob() != null) { - throw new InvalidDeploymentException("Cannot switch from job to session cluster"); - } upgradeSessionCluster(flinkApp, effectiveConfig); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java index d17ce1e..c8edf01 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java @@ -32,6 +32,7 @@ import org.apache.flink.configuration.RestOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.runtime.client.JobStatusMessage; @@ -65,10 +66,11 @@ public class FlinkService { final ApplicationDeployer deployer = new ApplicationClusterDeployer(clusterClientServiceLoader); + JobSpec jobSpec = deployment.getSpec().getJob(); final ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration( - deployment.getSpec().getJob().getArgs(), - deployment.getSpec().getJob().getEntryClass()); + jobSpec.getArgs() != null ? jobSpec.getArgs() : new String[0], + jobSpec.getEntryClass()); deployer.run(conf, applicationConfiguration); LOG.info("Application cluster {} deployed", deployment.getMetadata().getName()); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java new file mode 100644 index 0000000..e090ec4 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.validation; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; +import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec; +import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; +import org.apache.flink.kubernetes.operator.crd.spec.JobState; +import org.apache.flink.kubernetes.operator.crd.spec.Resource; +import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec; +import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; + +import java.util.Map; +import java.util.Optional; + +/** Default validator implementation. */ +public class DefaultDeploymentValidator implements FlinkDeploymentValidator { + + private static final String[] FORBIDDEN_CONF_KEYS = + new String[] {"kubernetes.namespace", "kubernetes.cluster-id"}; + + @Override + public Optional<String> validate(FlinkDeployment deployment) { + FlinkDeploymentSpec spec = deployment.getSpec(); + return firstPresent( + validateFlinkConfig(spec.getFlinkConfiguration()), + validateJobSpec(spec.getJob()), + validateJmSpec(spec.getJobManager()), + validateTmSpec(spec.getTaskManager()), + validateSpecChange(deployment)); + } + + private static Optional<String> firstPresent(Optional<String>... errOpts) { + for (Optional<String> opt : errOpts) { + if (opt.isPresent()) { + return opt; + } + } + return Optional.empty(); + } + + private Optional<String> validateFlinkConfig(Map<String, String> confMap) { + if (confMap == null) { + return Optional.empty(); + } + Configuration conf = Configuration.fromMap(confMap); + for (String key : FORBIDDEN_CONF_KEYS) { + if (conf.containsKey(key)) { + return Optional.of("Forbidden Flink config key: " + key); + } + } + return Optional.empty(); + } + + private Optional<String> validateJobSpec(JobSpec job) { + if (job == null) { + return Optional.empty(); + } + + if (job.getParallelism() < 1) { + return Optional.of("Job parallelism must be larger than 0"); + } + + if (job.getJarURI() == null) { + return Optional.of("Jar URI must be defined"); + } + + return Optional.empty(); + } + + private Optional<String> validateJmSpec(JobManagerSpec jmSpec) { + if (jmSpec == null) { + return Optional.empty(); + } + + return validateResources("JobManager", jmSpec.getResource()); + } + + private Optional<String> validateTmSpec(TaskManagerSpec tmSpec) { + if (tmSpec == null) { + return Optional.empty(); + } + + return validateResources("TaskManager", tmSpec.getResource()); + } + + private Optional<String> validateResources(String component, Resource resource) { + if (resource == null) { + return Optional.empty(); + } + + String memory = resource.getMemory(); + if (memory == null) { + return Optional.of(component + " resource memory must be defined."); + } + + try { + MemorySize.parse(memory); + } catch (IllegalArgumentException iae) { + return Optional.of(component + " resource memory parse error: " + iae.getMessage()); + } + + return Optional.empty(); + } + + private Optional<String> validateSpecChange(FlinkDeployment deployment) { + FlinkDeploymentSpec newSpec = deployment.getSpec(); + + if (deployment.getStatus() == null + || deployment.getStatus().getReconciliationStatus() == null + || deployment.getStatus().getReconciliationStatus().getLastReconciledSpec() + == null) { + // New deployment + if (newSpec.getJob() != null && !newSpec.getJob().getState().equals(JobState.RUNNING)) { + return Optional.of("Job must start in running state"); + } + + return Optional.empty(); + } + + FlinkDeploymentSpec oldSpec = + deployment.getStatus().getReconciliationStatus().getLastReconciledSpec(); + + if (newSpec.getJob() != null && oldSpec.getJob() == null) { + return Optional.of("Cannot switch from session to job cluster"); + } + + if (newSpec.getJob() == null && oldSpec.getJob() != null) { + return Optional.of("Cannot switch from job to session cluster"); + } + + JobSpec oldJob = oldSpec.getJob(); + JobSpec newJob = newSpec.getJob(); + if (oldJob != null && newJob != null) { + if (oldJob.getState() == JobState.SUSPENDED + && newJob.getState() == JobState.RUNNING + && newJob.getUpgradeMode() == UpgradeMode.SAVEPOINT + && deployment.getStatus().getJobStatus().getSavepointLocation() == null) { + return Optional.of("Cannot perform savepoint restore without a valid savepoint"); + } + } + + return Optional.empty(); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java new file mode 100644 index 0000000..732224d --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.validation; + +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; + +import java.util.Optional; + +/** Validator for {@link FlinkDeployment} resources. */ +public interface FlinkDeploymentValidator { + + /** + * Validate and return optional error. + * + * @param deployment + * @return Optional error string, should be present iff validation resulted in an error + */ + Optional<String> validate(FlinkDeployment deployment); +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java index 601f160..9ada6f8 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java @@ -65,7 +65,12 @@ public class TestUtils { FlinkDeployment deployment = buildSessionCluster(); deployment .getSpec() - .setJob(JobSpec.builder().jarURI(SAMPLE_JAR).state(JobState.RUNNING).build()); + .setJob( + JobSpec.builder() + .jarURI(SAMPLE_JAR) + .parallelism(1) + .state(JobState.RUNNING) + .build()); return deployment; } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 6dae546..88fcfe2 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -29,6 +29,7 @@ import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; import org.apache.flink.kubernetes.operator.reconciler.JobReconciler; import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; +import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator; import org.apache.flink.runtime.client.JobStatusMessage; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; @@ -189,6 +190,7 @@ public class FlinkDeploymentControllerTest { FlinkUtils.loadDefaultConfig(), null, "test", + new DefaultDeploymentValidator(), observer, jobReconciler, sessionReconciler); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java new file mode 100644 index 0000000..030d4e5 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.validation; + +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.operator.TestUtils; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.spec.JobState; +import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; +import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; +import org.apache.flink.kubernetes.operator.crd.status.JobStatus; +import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; + +import org.junit.Assert; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.function.Consumer; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Test deployment validation logic. */ +public class DeploymentValidatorTest { + + private final DefaultDeploymentValidator validator = new DefaultDeploymentValidator(); + + @Test + public void testValidation() { + testSuccess(dep -> {}); + + // Test job validation + testError(dep -> dep.getSpec().getJob().setJarURI(null), "Jar URI must be defined"); + testError( + dep -> dep.getSpec().getJob().setState(JobState.SUSPENDED), + "Job must start in running state"); + + testError( + dep -> dep.getSpec().getJob().setParallelism(0), + "Job parallelism must be larger than 0"); + testError( + dep -> dep.getSpec().getJob().setParallelism(-1), + "Job parallelism must be larger than 0"); + + // Test conf validation + testSuccess( + dep -> + dep.getSpec() + .setFlinkConfiguration( + Collections.singletonMap("random", "config"))); + testError( + dep -> + dep.getSpec() + .setFlinkConfiguration( + Collections.singletonMap( + KubernetesConfigOptions.NAMESPACE.key(), "myns")), + "Forbidden Flink config key"); + + // Test resource validation + testSuccess(dep -> dep.getSpec().getTaskManager().getResource().setMemory("1G")); + testSuccess(dep -> dep.getSpec().getTaskManager().getResource().setMemory("100")); + + testError( + dep -> dep.getSpec().getTaskManager().getResource().setMemory("invalid"), + "TaskManager resource memory parse error"); + testError( + dep -> dep.getSpec().getJobManager().getResource().setMemory("invalid"), + "JobManager resource memory parse error"); + + testError( + dep -> dep.getSpec().getTaskManager().getResource().setMemory(null), + "TaskManager resource memory must be defined"); + testError( + dep -> dep.getSpec().getJobManager().getResource().setMemory(null), + "JobManager resource memory must be defined"); + + // Test savepoint restore validation + testSuccess( + dep -> { + dep.setStatus(new FlinkDeploymentStatus()); + dep.getStatus().setJobStatus(new JobStatus()); + dep.getStatus().getJobStatus().setSavepointLocation("sp"); + + dep.getStatus().setReconciliationStatus(new ReconciliationStatus()); + dep.getStatus() + .getReconciliationStatus() + .setLastReconciledSpec(TestUtils.clone(dep.getSpec())); + dep.getStatus() + .getReconciliationStatus() + .getLastReconciledSpec() + .getJob() + .setState(JobState.SUSPENDED); + + dep.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT); + }); + + testError( + dep -> { + dep.setStatus(new FlinkDeploymentStatus()); + dep.getStatus().setJobStatus(new JobStatus()); + + dep.getStatus().setReconciliationStatus(new ReconciliationStatus()); + dep.getStatus() + .getReconciliationStatus() + .setLastReconciledSpec(TestUtils.clone(dep.getSpec())); + dep.getStatus() + .getReconciliationStatus() + .getLastReconciledSpec() + .getJob() + .setState(JobState.SUSPENDED); + + dep.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT); + }, + "Cannot perform savepoint restore without a valid savepoint"); + + // Test cluster type validation + testError( + dep -> { + dep.setStatus(new FlinkDeploymentStatus()); + dep.getStatus().setJobStatus(new JobStatus()); + + dep.getStatus().setReconciliationStatus(new ReconciliationStatus()); + dep.getStatus() + .getReconciliationStatus() + .setLastReconciledSpec(TestUtils.clone(dep.getSpec())); + dep.getSpec().setJob(null); + }, + "Cannot switch from job to session cluster"); + + testError( + dep -> { + dep.setStatus(new FlinkDeploymentStatus()); + dep.getStatus().setJobStatus(new JobStatus()); + + dep.getStatus().setReconciliationStatus(new ReconciliationStatus()); + dep.getStatus() + .getReconciliationStatus() + .setLastReconciledSpec(TestUtils.clone(dep.getSpec())); + dep.getStatus().getReconciliationStatus().getLastReconciledSpec().setJob(null); + }, + "Cannot switch from session to job cluster"); + } + + private void testSuccess(Consumer<FlinkDeployment> deploymentModifier) { + FlinkDeployment deployment = TestUtils.buildApplicationCluster(); + deploymentModifier.accept(deployment); + validator.validate(deployment).ifPresent(Assert::fail); + } + + private void testError(Consumer<FlinkDeployment> deploymentModifier, String expectedErr) { + FlinkDeployment deployment = TestUtils.buildApplicationCluster(); + deploymentModifier.accept(deployment); + Optional<String> error = validator.validate(deployment); + if (error.isPresent()) { + assertTrue(error.get(), error.get().startsWith(expectedErr)); + } else { + fail("Did not get expected error: " + expectedErr); + } + } +} diff --git a/flink-kubernetes-webhook/pom.xml b/flink-kubernetes-webhook/pom.xml index 7f9a90c..0a82987 100644 --- a/flink-kubernetes-webhook/pom.xml +++ b/flink-kubernetes-webhook/pom.xml @@ -41,15 +41,17 @@ under the License. <groupId>org.apache.flink</groupId> <artifactId>*</artifactId> </exclusion> - <exclusion> - <groupId>org.takes</groupId> - <artifactId>takes</artifactId> - </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-netty</artifactId> <version>4.1.70.Final-${flink.shaded.version}</version> </dependency> diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java index 35cb046..c534273 100644 --- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java +++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java @@ -18,6 +18,7 @@ package org.apache.flink.kubernetes.operator.admission; import org.apache.flink.kubernetes.operator.utils.EnvUtils; +import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator; import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; import org.apache.flink.shaded.netty4.io.netty.channel.Channel; @@ -54,7 +55,8 @@ public class FlinkOperatorWebhook { public static void main(String[] args) throws Exception { LOG.info("Starting Flink Kubernetes Webhook"); - AdmissionHandler endpoint = new AdmissionHandler(new FlinkDeploymentValidator()); + AdmissionHandler endpoint = + new AdmissionHandler(new FlinkValidator(new DefaultDeploymentValidator())); ChannelInitializer<SocketChannel> initializer = createChannelInitializer(endpoint); NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkDeploymentValidator.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java similarity index 75% rename from flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkDeploymentValidator.java rename to flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java index 4df7d49..838e618 100644 --- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkDeploymentValidator.java +++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java @@ -21,19 +21,26 @@ import org.apache.flink.kubernetes.operator.admission.admissioncontroller.NotAll import org.apache.flink.kubernetes.operator.admission.admissioncontroller.Operation; import org.apache.flink.kubernetes.operator.admission.admissioncontroller.validation.Validator; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; -import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; -import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; +import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator; import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.GenericKubernetesResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; + /** Validator for FlinkDeployment creation and updates. */ -public class FlinkDeploymentValidator implements Validator<GenericKubernetesResource> { - private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentValidator.class); +public class FlinkValidator implements Validator<GenericKubernetesResource> { + private static final Logger LOG = LoggerFactory.getLogger(FlinkValidator.class); private static final ObjectMapper objectMapper = new ObjectMapper(); + private final FlinkDeploymentValidator deploymentValidator; + + public FlinkValidator(FlinkDeploymentValidator deploymentValidator) { + this.deploymentValidator = deploymentValidator; + } + @Override public void validate(GenericKubernetesResource resource, Operation operation) throws NotAllowedException { @@ -41,13 +48,10 @@ public class FlinkDeploymentValidator implements Validator<GenericKubernetesReso FlinkDeployment flinkDeployment = objectMapper.convertValue(resource, FlinkDeployment.class); - FlinkDeploymentSpec spec = flinkDeployment.getSpec(); - JobSpec job = spec.getJob(); - if (job != null) { - if (job.getParallelism() < 1) { - throw new NotAllowedException("Job parallelism must be larger than 0"); - } + Optional<String> validationError = deploymentValidator.validate(flinkDeployment); + if (validationError.isPresent()) { + throw new NotAllowedException(validationError.get()); } } }