This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 1177703 [FLINK-27812] Support Dynamic Change of Watched Namespaces 1177703 is described below commit 11777032e3c3011c843631471c1b52aecb52dabf Author: Matyas Orhidi <53612764+morh...@users.noreply.github.com> AuthorDate: Sun Jun 19 10:35:41 2022 +0200 [FLINK-27812] Support Dynamic Change of Watched Namespaces --- .../kubernetes_operator_config_configuration.html | 12 ++++ .../flink/kubernetes/operator/FlinkOperator.java | 80 +++++++++++---------- .../operator/config/FlinkConfigManager.java | 34 ++++++--- .../config/FlinkOperatorConfiguration.java | 22 +++++- .../config/KubernetesOperatorConfigOptions.java | 16 +++++ .../operator/informer/InformerManagerTest.java | 46 ------------ .../operator/utils/ReconciliationUtilsTest.java | 5 +- flink-kubernetes-webhook/pom.xml | 8 +++ .../operator/admission/FlinkOperatorWebhook.java | 13 +++- .../operator/admission/FlinkValidator.java | 12 +--- .../admission}/informer/InformerManager.java | 36 +++++++--- .../operator/admission/AdmissionHandlerTest.java | 5 +- .../admission/informer/InformerManagerTest.java | 84 ++++++++++++++++++++++ .../templates/flink-operator.yaml | 7 +- 14 files changed, 256 insertions(+), 124 deletions(-) diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html index 71cf1ac..1bfc9de 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html @@ -44,6 +44,12 @@ <td>Boolean</td> <td>Whether to enable on-the-fly config changes through the operator configmap.</td> </tr> + <tr> + <td><h5>kubernetes.operator.dynamic.namespaces.enabled</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Enables dynamic change of watched/monitored namespaces. Defaults to false</td> + </tr> <tr> <td><h5>kubernetes.operator.job.upgrade.ignore-pending-savepoint</h5></td> <td style="word-wrap: break-word;">false</td> @@ -146,5 +152,11 @@ <td>Map</td> <td>Custom HTTP header for HttpArtifactFetcher. The header will be applied when getting the session job artifacts. Expected format: headerKey1:headerValue1,headerKey2:headerValue2.</td> </tr> + <tr> + <td><h5>kubernetes.operator.watched.namespaces</h5></td> + <td style="word-wrap: break-word;">"JOSDK_ALL_NAMESPACES"</td> + <td>String</td> + <td>Comma separated list of namespaces the operator monitors for custom resources. Defaults to JOSDK_ALL_NAMESPACES</td> + </tr> </tbody> </table> diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index 4770617..507ab62 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -17,17 +17,13 @@ package org.apache.flink.kubernetes.operator; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.plugin.PluginManager; import org.apache.flink.core.plugin.PluginUtils; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; -import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; -import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig; import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController; import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController; -import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus; @@ -49,15 +45,18 @@ import org.apache.flink.metrics.MetricGroup; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.Operator; +import io.javaoperatorsdk.operator.RegisteredController; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider; +import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.util.HashSet; import java.util.Set; +import java.util.UUID; import java.util.concurrent.Executors; -import java.util.function.Consumer; /** Main Class for Flink native k8s operator. */ public class FlinkOperator { @@ -69,16 +68,16 @@ public class FlinkOperator { private final FlinkService flinkService; private final FlinkConfigManager configManager; private final Set<FlinkResourceValidator> validators; + private final Set<RegisteredController> registeredControllers = new HashSet<>(); private final MetricGroup metricGroup; public FlinkOperator(@Nullable Configuration conf) { this.client = new DefaultKubernetesClient(); - this.configManager = conf != null ? new FlinkConfigManager(conf) : new FlinkConfigManager(); - this.operator = - new Operator( - client, - getConfigurationServiceOverriderConsumer( - configManager.getOperatorConfiguration())); + this.configManager = + conf != null + ? new FlinkConfigManager(conf) // For testing only + : new FlinkConfigManager(this::handleNamespaceChanges); + this.operator = new Operator(client, this::overrideOperatorConfigs); this.flinkService = new FlinkService(client, configManager); this.validators = ValidatorUtils.discoverValidators(configManager); this.metricGroup = @@ -88,20 +87,25 @@ public class FlinkOperator { FileSystem.initialize(configManager.getDefaultConfig(), pluginManager); } - @VisibleForTesting - protected static Consumer<ConfigurationServiceOverrider> - getConfigurationServiceOverriderConsumer( - FlinkOperatorConfiguration operatorConfiguration) { - return overrider -> { - int parallelism = operatorConfiguration.getReconcilerMaxParallelism(); - if (parallelism == -1) { - LOG.info("Configuring operator with unbounded reconciliation thread pool."); - overrider.withExecutorService(Executors.newCachedThreadPool()); - } else { - LOG.info("Configuring operator with {} reconciliation threads.", parallelism); - overrider.withConcurrentReconciliationThreads(parallelism); - } - }; + private void handleNamespaceChanges(Set<String> namespaces) { + registeredControllers.forEach( + controller -> { + if (controller.allowsNamespaceChanges()) { + LOG.info("Changing namespaces on {} to {}", controller, namespaces); + controller.changeNamespaces(namespaces); + } + }); + } + + private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) { + int parallelism = configManager.getOperatorConfiguration().getReconcilerMaxParallelism(); + if (parallelism == -1) { + LOG.info("Configuring operator with unbounded reconciliation thread pool."); + overrider.withExecutorService(Executors.newCachedThreadPool()); + } else { + LOG.info("Configuring operator with {} reconciliation threads.", parallelism); + overrider.withConcurrentReconciliationThreads(parallelism); + } } private void registerDeploymentController() { @@ -120,12 +124,7 @@ public class FlinkOperator { observerFactory, new MetricManager<>(metricGroup), statusHelper); - - FlinkControllerConfig<FlinkDeployment> controllerConfig = - new FlinkControllerConfig<>( - controller, - configManager.getOperatorConfiguration().getWatchedNamespaces()); - operator.register(controller, controllerConfig); + registeredControllers.add(operator.register(controller, this::overrideControllerConfigs)); } private void registerSessionJobController() { @@ -143,11 +142,20 @@ public class FlinkOperator { new MetricManager<>(metricGroup), statusHelper); - FlinkControllerConfig<FlinkSessionJob> controllerConfig = - new FlinkControllerConfig<>( - controller, - configManager.getOperatorConfiguration().getWatchedNamespaces()); - operator.register(controller, controllerConfig); + registeredControllers.add(operator.register(controller, this::overrideControllerConfigs)); + } + + private void overrideControllerConfigs(ControllerConfigurationOverrider<?> overrider) { + // TODO: https://github.com/java-operator-sdk/java-operator-sdk/issues/1259 + String[] watchedNamespaces = + configManager + .getOperatorConfiguration() + .getWatchedNamespaces() + .toArray(String[]::new); + String fakeNs = UUID.randomUUID().toString(); + overrider.settingNamespace(fakeNs); + overrider.addingNamespaces(watchedNamespaces); + overrider.removingNamespaces(fakeNs); } public void run() { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java index 3a7c1c3..c8e4bc3 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java @@ -25,7 +25,6 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; -import org.apache.flink.kubernetes.operator.utils.EnvUtils; import org.apache.flink.shaded.guava30.com.google.common.cache.Cache; import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder; @@ -38,16 +37,19 @@ import io.fabric8.kubernetes.api.model.ObjectMeta; import lombok.Builder; import lombok.SneakyThrows; import lombok.Value; +import org.apache.commons.lang3.ObjectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED; @@ -61,15 +63,21 @@ public class FlinkConfigManager { private volatile Configuration defaultConfig; private volatile FlinkOperatorConfiguration operatorConfiguration; private final AtomicLong defaultConfigVersion = new AtomicLong(0); - private final LoadingCache<Key, Configuration> cache; - private final Set<String> namespaces = EnvUtils.getWatchedNamespaces(); + private final Consumer<Set<String>> namespaceListener; - public FlinkConfigManager() { - this(GlobalConfiguration.loadConfiguration()); + @VisibleForTesting + public FlinkConfigManager(Configuration defaultConfig) { + this(defaultConfig, ns -> {}); } - public FlinkConfigManager(Configuration defaultConfig) { + public FlinkConfigManager(Consumer<Set<String>> namespaceListener) { + this(GlobalConfiguration.loadConfiguration(), namespaceListener); + } + + public FlinkConfigManager( + Configuration defaultConfig, Consumer<Set<String>> namespaceListener) { + this.namespaceListener = namespaceListener; Duration cacheTimeout = defaultConfig.get(KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_TIMEOUT); this.cache = @@ -109,14 +117,22 @@ public class FlinkConfigManager { @VisibleForTesting public void updateDefaultConfig(Configuration newConf) { - if (newConf.equals(defaultConfig)) { + if (ObjectUtils.allNotNull(this.defaultConfig, newConf) + && this.defaultConfig.toMap().equals(newConf.toMap())) { LOG.info("Default configuration did not change, nothing to do..."); return; } LOG.info("Updating default configuration to {}", newConf); - this.operatorConfiguration = - FlinkOperatorConfiguration.fromConfiguration(newConf, namespaces); + var oldNs = + Optional.ofNullable(this.operatorConfiguration) + .map(FlinkOperatorConfiguration::getWatchedNamespaces) + .orElse(Set.of()); + this.operatorConfiguration = FlinkOperatorConfiguration.fromConfiguration(newConf); + var newNs = this.operatorConfiguration.getWatchedNamespaces(); + if (this.operatorConfiguration.getDynamicNamespacesEnabled() && !oldNs.equals(newNs)) { + this.namespaceListener.accept(operatorConfiguration.getWatchedNamespaces()); + } this.defaultConfig = newConf.clone(); // We do not invalidate the cache to avoid deleting currently used temp files, // simply bump the version diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java index 29e1264..c9b1f7e 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java @@ -24,12 +24,16 @@ import org.apache.flink.kubernetes.operator.utils.EnvUtils; import lombok.Value; import java.time.Duration; +import java.util.Arrays; +import java.util.HashSet; import java.util.Set; /** Configuration class for operator. */ @Value public class FlinkOperatorConfiguration { + private static final String NAMESPACES_SPLITTER_KEY = "\\s*,\\s*"; + Duration reconcileInterval; int reconcilerMaxParallelism; Duration progressCheckInterval; @@ -37,14 +41,14 @@ public class FlinkOperatorConfiguration { Duration flinkClientTimeout; String flinkServiceHostOverride; Set<String> watchedNamespaces; + Boolean dynamicNamespacesEnabled; Duration flinkCancelJobTimeout; Duration flinkShutdownClusterTimeout; String artifactsBaseDir; Integer savepointHistoryCountThreshold; Duration savepointHistoryAgeThreshold; - public static FlinkOperatorConfiguration fromConfiguration( - Configuration operatorConfig, Set<String> watchedNamespaces) { + public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) { Duration reconcileInterval = operatorConfig.get( KubernetesOperatorConfigOptions.OPERATOR_RECONCILER_RESCHEDULE_INTERVAL); @@ -94,6 +98,19 @@ public class FlinkOperatorConfiguration { flinkServiceHostOverride = "localhost"; } + var watchedNamespaces = + new HashSet<>( + Arrays.asList( + operatorConfig + .get( + KubernetesOperatorConfigOptions + .OPERATOR_WATCHED_NAMESPACES) + .split(NAMESPACES_SPLITTER_KEY))); + + boolean dynamicNamespacesEnabled = + operatorConfig.get( + KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_NAMESPACES_ENABLED); + return new FlinkOperatorConfiguration( reconcileInterval, reconcilerMaxParallelism, @@ -102,6 +119,7 @@ public class FlinkOperatorConfiguration { flinkClientTimeout, flinkServiceHostOverride, watchedNamespaces, + dynamicNamespacesEnabled, flinkCancelJobTimeout, flinkShutdownClusterTimeout, artifactsBaseDir, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java index f4eda21..5ad1655 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import io.javaoperatorsdk.operator.api.config.ConfigurationService; +import io.javaoperatorsdk.operator.api.reconciler.Constants; import java.time.Duration; import java.util.Map; @@ -187,4 +188,19 @@ public class KubernetesOperatorConfigOptions { .withDescription( "Interval at which periodic savepoints will be triggered. " + "The triggering schedule is not guaranteed, savepoints will be triggered as part of the regular reconcile loop."); + + public static final ConfigOption<String> OPERATOR_WATCHED_NAMESPACES = + ConfigOptions.key("kubernetes.operator.watched.namespaces") + .stringType() + .defaultValue(Constants.WATCH_ALL_NAMESPACES) + .withDescription( + "Comma separated list of namespaces the operator monitors for custom resources. Defaults to " + + Constants.WATCH_ALL_NAMESPACES); + + public static final ConfigOption<Boolean> OPERATOR_DYNAMIC_NAMESPACES_ENABLED = + ConfigOptions.key("kubernetes.operator.dynamic.namespaces.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Enables dynamic change of watched/monitored namespaces. Defaults to false"); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/informer/InformerManagerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/informer/InformerManagerTest.java deleted file mode 100644 index 08fcddd..0000000 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/informer/InformerManagerTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.kubernetes.operator.informer; - -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; -import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.util.Set; - -import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET; - -/** Test for {@link InformerManager}. */ -@EnableKubernetesMockClient(crud = true) -public class InformerManagerTest { - - private KubernetesMockServer mockServer; - private KubernetesClient kubernetesClient; - - @Test - public void testNamespacedInformerCreated() { - var informerManager = new InformerManager(DEFAULT_NAMESPACES_SET, kubernetesClient); - Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1")); - - informerManager = new InformerManager(Set.of("ns1", "ns2"), kubernetesClient); - Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1")); - Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns2")); - } -} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java index 7e1f21e..3b3f4b2 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java @@ -32,8 +32,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import org.junit.jupiter.api.Test; -import java.util.Collections; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -42,8 +40,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; public class ReconciliationUtilsTest { FlinkOperatorConfiguration operatorConfiguration = - FlinkOperatorConfiguration.fromConfiguration( - new Configuration(), Collections.emptySet()); + FlinkOperatorConfiguration.fromConfiguration(new Configuration()); @Test public void testRescheduleUpgradeImmediately() { diff --git a/flink-kubernetes-webhook/pom.xml b/flink-kubernetes-webhook/pom.xml index 0daa0f9..853092d 100644 --- a/flink-kubernetes-webhook/pom.xml +++ b/flink-kubernetes-webhook/pom.xml @@ -52,6 +52,14 @@ under the License. </dependency> <!-- Test --> + + <dependency> + <groupId>io.fabric8</groupId> + <artifactId>kubernetes-server-mock</artifactId> + <version>${fabric8.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java index a6b9960..890d6f7 100644 --- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java +++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java @@ -17,6 +17,7 @@ package org.apache.flink.kubernetes.operator.admission; +import org.apache.flink.kubernetes.operator.admission.informer.InformerManager; import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.utils.EnvUtils; @@ -37,6 +38,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContextBuilder; import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SupportedCipherSuiteFilter; import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,11 +61,18 @@ public class FlinkOperatorWebhook { public static void main(String[] args) throws Exception { EnvUtils.logEnvironmentInfo(LOG, "Flink Kubernetes Webhook", args); - FlinkConfigManager configManager = new FlinkConfigManager(); + var informerManager = new InformerManager(new DefaultKubernetesClient()); + var configManager = new FlinkConfigManager(informerManager::setNamespaces); + if (!configManager.getOperatorConfiguration().getDynamicNamespacesEnabled()) { + informerManager.setNamespaces( + configManager.getOperatorConfiguration().getWatchedNamespaces()); + } Set<FlinkResourceValidator> validators = ValidatorUtils.discoverValidators(configManager); + AdmissionHandler endpoint = new AdmissionHandler( - new FlinkValidator(validators, configManager), new FlinkMutator()); + new FlinkValidator(validators, informerManager), new FlinkMutator()); + ChannelInitializer<SocketChannel> initializer = createChannelInitializer(endpoint); NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java index a1cb2bb..8a4c58f 100644 --- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java +++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java @@ -17,17 +17,15 @@ package org.apache.flink.kubernetes.operator.admission; -import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.admission.informer.InformerManager; import org.apache.flink.kubernetes.operator.crd.CrdConstants; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; -import org.apache.flink.kubernetes.operator.informer.InformerManager; import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator; import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResource; -import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.informers.cache.Cache; import io.javaoperatorsdk.admissioncontroller.NotAllowedException; import io.javaoperatorsdk.admissioncontroller.Operation; @@ -46,13 +44,9 @@ public class FlinkValidator implements Validator<HasMetadata> { private final Set<FlinkResourceValidator> validators; private final InformerManager informerManager; - public FlinkValidator( - Set<FlinkResourceValidator> validators, FlinkConfigManager configManager) { + public FlinkValidator(Set<FlinkResourceValidator> validators, InformerManager informerManager) { this.validators = validators; - this.informerManager = - new InformerManager( - configManager.getOperatorConfiguration().getWatchedNamespaces(), - new DefaultKubernetesClient()); + this.informerManager = informerManager; } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/informer/InformerManager.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManager.java similarity index 78% rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/informer/InformerManager.java rename to flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManager.java index 453c41e..2915414 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/informer/InformerManager.java +++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManager.java @@ -15,10 +15,9 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.informer; +package org.apache.flink.kubernetes.operator.admission.informer; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; -import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; import org.apache.flink.util.Preconditions; import io.fabric8.kubernetes.api.model.HasMetadata; @@ -31,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET; @@ -38,16 +38,12 @@ import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMES public class InformerManager { private static final Logger LOG = LoggerFactory.getLogger(InformerManager.class); - public static final String CLUSTER_ID_INDEX = "clusterId_index"; - private final Set<String> watchedNamespaces; + private final Set<String> watchedNamespaces = ConcurrentHashMap.newKeySet(); private final KubernetesClient kubernetesClient; - private volatile Map<String, SharedIndexInformer<FlinkSessionJob>> sessionJobInformers; private volatile Map<String, SharedIndexInformer<FlinkDeployment>> flinkDepInformers; - public InformerManager(Set<String> watchedNamespaces, KubernetesClient kubernetesClient) { - this.watchedNamespaces = watchedNamespaces; + public InformerManager(KubernetesClient kubernetesClient) { this.kubernetesClient = kubernetesClient; - LOG.info("Created informer manager with watchedNamespaces: {}", watchedNamespaces); } public SharedIndexInformer<FlinkDeployment> getFlinkDepInformer(String namespace) { @@ -67,7 +63,7 @@ public class InformerManager { synchronized (this) { if (flinkDepInformers == null) { var runnableInformers = - createRunnableInformer( + createRunnableInformers( FlinkDeployment.class, watchedNamespaces, kubernetesClient); for (Map.Entry<String, SharedIndexInformer<FlinkDeployment>> runnableInformer : runnableInformers.entrySet()) { @@ -83,7 +79,7 @@ public class InformerManager { } private static <CR extends HasMetadata> - Map<String, SharedIndexInformer<CR>> createRunnableInformer( + Map<String, SharedIndexInformer<CR>> createRunnableInformers( Class<CR> resourceClass, Set<String> effectiveNamespaces, KubernetesClient kubernetesClient) { @@ -104,4 +100,24 @@ public class InformerManager { return informers; } } + + public void setNamespaces(Set<String> watchedNamespaces) { + LOG.info("Setting namespaces to {}", watchedNamespaces); + this.watchedNamespaces.clear(); + this.watchedNamespaces.addAll(watchedNamespaces); + if (flinkDepInformers != null) { + synchronized (this) { + if (flinkDepInformers != null) { + flinkDepInformers + .entrySet() + .forEach( + entry -> { + LOG.info("Stopping informer in {})", entry.getKey()); + entry.getValue().stop(); + }); + } + flinkDepInformers = null; + } + } + } } diff --git a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java index 327981a..e8f061b 100644 --- a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java +++ b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java @@ -17,6 +17,7 @@ package org.apache.flink.kubernetes.operator.admission; +import org.apache.flink.kubernetes.operator.admission.informer.InformerManager; import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.crd.CrdConstants; @@ -62,8 +63,8 @@ public class AdmissionHandlerTest { private final AdmissionHandler admissionHandler = new AdmissionHandler( new FlinkValidator( - ValidatorUtils.discoverValidators(new FlinkConfigManager()), - new FlinkConfigManager()), + ValidatorUtils.discoverValidators(new FlinkConfigManager(ns -> {})), + new InformerManager(null)), new FlinkMutator()); @Test diff --git a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManagerTest.java b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManagerTest.java new file mode 100644 index 0000000..d258d6c --- /dev/null +++ b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManagerTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.admission.informer; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.Set; + +import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET; +import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_NAMESPACES_ENABLED; +import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_WATCHED_NAMESPACES; + +/** Test for {@link InformerManager}. */ +@EnableKubernetesMockClient(crud = true) +public class InformerManagerTest { + + private KubernetesMockServer mockServer; + private KubernetesClient kubernetesClient; + + @Test + public void testNamespacedInformerCreated() { + var informerManager = new InformerManager(kubernetesClient); + informerManager.setNamespaces(DEFAULT_NAMESPACES_SET); + Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1")); + + informerManager.setNamespaces(Set.of("ns1", "ns2")); + Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1")); + Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns2")); + + informerManager.setNamespaces(Set.of("ns1", "ns2", "ns3")); + Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1")); + Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns2")); + Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns3")); + } + + @Test + public void testDynamicNamespaces() { + InformerManager informerManager = new InformerManager(kubernetesClient); + Configuration config = + Configuration.fromMap(Map.of(OPERATOR_WATCHED_NAMESPACES.key(), "ns1")); + FlinkConfigManager configManager = + new FlinkConfigManager(config, ns -> informerManager.setNamespaces(ns)); + informerManager.setNamespaces( + configManager.getOperatorConfiguration().getWatchedNamespaces()); + Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1")); + + // dynamic namespaces disabled + config.set(OPERATOR_WATCHED_NAMESPACES, "ns1,ns2"); + configManager.updateDefaultConfig(config); + Assertions.assertThrows( + NullPointerException.class, () -> informerManager.getFlinkDepInformer("ns2")); + + // dynamic namespaces enabled + config.set(OPERATOR_DYNAMIC_NAMESPACES_ENABLED, true); + config.set(OPERATOR_WATCHED_NAMESPACES, "ns1,ns2,ns3"); + configManager.updateDefaultConfig(config); + Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1")); + Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns2")); + Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns3")); + } +} diff --git a/helm/flink-kubernetes-operator/templates/flink-operator.yaml b/helm/flink-kubernetes-operator/templates/flink-operator.yaml index 4e9df19..fc4e699 100644 --- a/helm/flink-kubernetes-operator/templates/flink-operator.yaml +++ b/helm/flink-kubernetes-operator/templates/flink-operator.yaml @@ -68,8 +68,6 @@ spec: value: -Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties - name: JVM_ARGS value: {{ .Values.jvmArgs.operator }} - - name: FLINK_OPERATOR_WATCH_NAMESPACES - value: {{ join "," .Values.watchNamespaces }} securityContext: {{- toYaml .Values.operatorSecurityContext | nindent 12 }} volumeMounts: @@ -111,8 +109,6 @@ spec: value: /opt/flink/plugins - name: OPERATOR_NAMESPACE value: {{ .Release.Namespace }} - - name: FLINK_OPERATOR_WATCH_NAMESPACES - value: {{ join "," .Values.watchNamespaces }} securityContext: {{- toYaml .Values.webhookSecurityContext | nindent 12 }} volumeMounts: @@ -160,6 +156,9 @@ data: {{- end }} {{- if index (.Values.defaultConfiguration) "flink-conf.yaml" }} {{- index (.Values.defaultConfiguration) "flink-conf.yaml" | nindent 4 -}} +{{- end }} +{{- if .Values.watchNamespaces }} + kubernetes.operator.watched.namespaces: {{ join "," .Values.watchNamespaces }} {{- end }} log4j-operator.properties: |+ {{- if .Values.defaultConfiguration.append }}