This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new e4c56b2b [FLINK-39688] Share a single PluginManager across operator
and webhook startup (#1112)
e4c56b2b is described below
commit e4c56b2be3d844d4c12176f281972a8cc3de49b2
Author: Dennis-Mircea Ciupitu <[email protected]>
AuthorDate: Thu Jun 4 16:43:00 2026 +0300
[FLINK-39688] Share a single PluginManager across operator and webhook
startup (#1112)
---
.../flink/kubernetes/operator/FlinkOperator.java | 11 ++--
.../operator/listener/ListenerUtils.java | 30 ++--------
.../operator/metrics/OperatorMetricUtils.java | 10 +++-
.../kubernetes/operator/utils/MutatorUtils.java | 14 ++---
.../operator/utils/OperatorPluginUtils.java | 67 ++++++++++++++++++++++
.../kubernetes/operator/utils/ValidatorUtils.java | 8 ++-
.../FlinkStateSnapshotControllerTest.java | 6 +-
.../TestingFlinkDeploymentController.java | 6 +-
.../TestingFlinkSessionJobController.java | 6 +-
.../operator/listener/ListenerUtilsTest.java | 8 ++-
.../operator/utils/MutatorUtilsTest.java | 6 +-
.../operator/utils/ValidatorUtilsTest.java | 6 +-
.../operator/admission/FlinkOperatorWebhook.java | 8 ++-
.../operator/admission/AdmissionHandlerTest.java | 17 +++---
14 files changed, 141 insertions(+), 62 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index cc15c5c8..d3ad2d14 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -21,7 +21,6 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginManager;
-import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
@@ -53,6 +52,7 @@ import
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorPluginUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
@@ -105,7 +105,8 @@ public class FlinkOperator {
KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class));
baseConfig = configManager.getDefaultConfig();
- this.metricGroup = OperatorMetricUtils.initOperatorMetrics(baseConfig);
+ PluginManager pluginManager =
OperatorPluginUtils.createPluginManager(baseConfig);
+ this.metricGroup = OperatorMetricUtils.initOperatorMetrics(baseConfig,
pluginManager);
if (client == null) {
this.client =
KubernetesClientUtils.getKubernetesClient(
@@ -114,12 +115,12 @@ public class FlinkOperator {
this.client = client;
}
this.operator = createOperator();
- this.validators = ValidatorUtils.discoverValidators(configManager);
- this.listeners = ListenerUtils.discoverListeners(configManager);
+ this.validators = ValidatorUtils.discoverValidators(configManager,
pluginManager);
+ this.listeners = ListenerUtils.discoverListeners(configManager,
pluginManager);
this.eventRecorder = EventRecorder.create(client, listeners);
this.ctxFactory =
new FlinkResourceContextFactory(configManager, metricGroup,
eventRecorder);
- PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(baseConfig);
+ LOG.info("Initializing file system factories from plugin directory.");
FileSystem.initialize(baseConfig, pluginManager);
this.operatorHealthService =
OperatorHealthService.fromConfig(configManager);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
index 8ab3997c..0ee23599 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
@@ -22,9 +22,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DelegatingConfiguration;
-import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
@@ -33,10 +32,8 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
@@ -53,8 +50,6 @@ public class ListenerUtils {
private static final String SUFFIX = ".class";
private static final Pattern PTN =
Pattern.compile(Pattern.quote(PREFIX) + "([\\S&&[^.]]*)" +
Pattern.quote(SUFFIX));
- private static final List<String> EXTRA_PARENT_FIRST_PATTERNS =
- List.of("io.fabric8", "com.fasterxml");
/**
* Load {@link FlinkResourceListener} implementations from the plugin
directory. Only listeners
@@ -64,15 +59,17 @@ public class ListenerUtils {
* kubernetes.operator.plugins.listeners.test.k1: v1
*
* @param configManager {@link FlinkConfigManager} to access plugin
configurations.
+ * @param pluginManager shared {@link PluginManager} used for plugin
discovery.
* @return Enabled listeners.
*/
public static Collection<FlinkResourceListener> discoverListeners(
- FlinkConfigManager configManager) {
+ FlinkConfigManager configManager, PluginManager pluginManager) {
var listeners = new ArrayList<FlinkResourceListener>();
- var conf = getListenerBaseConf(configManager);
+ var conf = configManager.getDefaultConfig();
Map<String, Configuration> listenerConfigs = loadListenerConfigs(conf);
- PluginUtils.createPluginManagerFromRootFolder(conf)
+ LOG.info("Loading FlinkResourceListener implementations from plugin
directory.");
+ pluginManager
.load(FlinkResourceListener.class)
.forEachRemaining(
listener -> {
@@ -96,21 +93,6 @@ public class ListenerUtils {
return listeners;
}
- private static Configuration getListenerBaseConf(FlinkConfigManager
configManager) {
- var conf = new Configuration(configManager.getDefaultConfig());
- List<String> additionalPatterns =
- new ArrayList<>(
- conf.getOptional(
- CoreOptions
-
.PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL)
- .orElse(Collections.emptyList()));
- additionalPatterns.addAll(EXTRA_PARENT_FIRST_PATTERNS);
- conf.set(
-
CoreOptions.PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
- additionalPatterns);
- return conf;
- }
-
@VisibleForTesting
protected static Map<String, Configuration>
loadListenerConfigs(Configuration configuration) {
Map<String, Configuration> listenerConfigs = new HashMap<>();
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
index 58c97e14..a28e3212 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
@@ -20,11 +20,11 @@ package org.apache.flink.kubernetes.operator.metrics;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.plugin.PluginManager;
-import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorPluginUtils;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
@@ -55,10 +55,16 @@ public class OperatorMetricUtils {
private static final String OPERATOR_METRICS_PREFIX = K8S_OP_CONF_PREFIX +
"metrics.";
private static final String METRICS_PREFIX = "metrics.";
+ @VisibleForTesting
public static KubernetesOperatorMetricGroup
initOperatorMetrics(Configuration defaultConfig) {
+ return initOperatorMetrics(
+ defaultConfig,
OperatorPluginUtils.createPluginManager(defaultConfig));
+ }
+
+ public static KubernetesOperatorMetricGroup initOperatorMetrics(
+ Configuration defaultConfig, PluginManager pluginManager) {
Configuration metricConfig = createMetricConfig(defaultConfig);
LOG.info("Initializing operator metrics using conf: {}", metricConfig);
- PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(metricConfig);
MetricRegistry metricRegistry = createMetricRegistry(metricConfig,
pluginManager);
KubernetesOperatorMetricGroup operatorMetricGroup =
KubernetesOperatorMetricGroup.create(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/MutatorUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/MutatorUtils.java
index 9f259379..c99f844e 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/MutatorUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/MutatorUtils.java
@@ -18,7 +18,7 @@
package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.mutator.DefaultFlinkMutator;
import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
@@ -34,19 +34,15 @@ public final class MutatorUtils {
private static final Logger LOG =
LoggerFactory.getLogger(MutatorUtils.class);
- /**
- * discovers mutators.
- *
- * @param configManager Flink Config manager
- * @return Set of FlinkResourceMutator
- */
- public static Set<FlinkResourceMutator>
discoverMutators(FlinkConfigManager configManager) {
+ public static Set<FlinkResourceMutator> discoverMutators(
+ FlinkConfigManager configManager, PluginManager pluginManager) {
var conf = configManager.getDefaultConfig();
Set<FlinkResourceMutator> flinkmutator = new HashSet<>();
DefaultFlinkMutator defaultFlinkMutator = new DefaultFlinkMutator();
defaultFlinkMutator.configure(conf);
flinkmutator.add(defaultFlinkMutator);
- PluginUtils.createPluginManagerFromRootFolder(conf)
+ LOG.info("Loading FlinkResourceMutator implementations from plugin
directory.");
+ pluginManager
.load(FlinkResourceMutator.class)
.forEachRemaining(
mutator -> {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorPluginUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorPluginUtils.java
new file mode 100644
index 00000000..b10884e0
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorPluginUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.PluginUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Builds the single, process-wide {@link PluginManager} used by the operator
and webhook for plugin
+ * discovery (metric reporters, file systems, validators, mutators, listeners).
+ *
+ * <p>Plugin classloaders delegate {@code io.fabric8} and {@code
com.fasterxml} to the parent so all
+ * plugins share the operator's fabric8 client and Jackson, avoiding duplicate
informers and version
+ * skew.
+ */
+public final class OperatorPluginUtils {
+
+ private static final List<String> EXTRA_PARENT_FIRST_PATTERNS =
+ List.of("io.fabric8", "com.fasterxml");
+
+ private OperatorPluginUtils() {}
+
+ /** Returns a copy of {@code baseConf} with the operator's parent-first
patterns merged in. */
+ private static Configuration
enrichWithPluginParentFirstPatterns(Configuration baseConf) {
+ var conf = new Configuration(baseConf);
+ var patterns =
+ new ArrayList<>(
+ conf.getOptional(
+ CoreOptions
+
.PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL)
+ .orElse(Collections.emptyList()));
+ for (var p : EXTRA_PARENT_FIRST_PATTERNS) {
+ if (!patterns.contains(p)) {
+ patterns.add(p);
+ }
+ }
+
conf.set(CoreOptions.PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
patterns);
+ return conf;
+ }
+
+ /** Creates the shared {@link PluginManager} from {@code baseConf},
enriching it as needed. */
+ public static PluginManager createPluginManager(Configuration baseConf) {
+ return PluginUtils.createPluginManagerFromRootFolder(
+ enrichWithPluginParentFirstPatterns(baseConf));
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
index 0f21c81a..2cf97e08 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
@@ -18,7 +18,7 @@
package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
@@ -36,13 +36,15 @@ public final class ValidatorUtils {
private static final Logger LOG =
LoggerFactory.getLogger(ValidatorUtils.class);
- public static Set<FlinkResourceValidator>
discoverValidators(FlinkConfigManager configManager) {
+ public static Set<FlinkResourceValidator> discoverValidators(
+ FlinkConfigManager configManager, PluginManager pluginManager) {
var conf = configManager.getDefaultConfig();
Set<FlinkResourceValidator> resourceValidators = new HashSet<>();
DefaultValidator defaultValidator = new
DefaultValidator(configManager);
defaultValidator.configure(conf);
resourceValidators.add(defaultValidator);
- PluginUtils.createPluginManagerFromRootFolder(conf)
+ LOG.info("Loading FlinkResourceValidator implementations from plugin
directory.");
+ pluginManager
.load(FlinkResourceValidator.class)
.forEachRemaining(
validator -> {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
index cfe69473..ce36f426 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
@@ -44,6 +44,7 @@ import
org.apache.flink.kubernetes.operator.reconciler.snapshot.StateSnapshotRec
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkResourceEventCollector;
import
org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotEventCollector;
+import org.apache.flink.kubernetes.operator.utils.OperatorPluginUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
@@ -122,7 +123,10 @@ public class FlinkStateSnapshotControllerTest {
statusRecorder = new StatusRecorder<>(metricManager,
statusUpdateCounter);
controller =
new FlinkStateSnapshotController(
- ValidatorUtils.discoverValidators(configManager),
+ ValidatorUtils.discoverValidators(
+ configManager,
+ OperatorPluginUtils.createPluginManager(
+ configManager.getDefaultConfig())),
ctxFactory,
new StateSnapshotReconciler(ctxFactory, eventRecorder),
new StateSnapshotObserver(ctxFactory, eventRecorder),
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index af04e069..7c380727 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -36,6 +36,7 @@ import
org.apache.flink.kubernetes.operator.resources.ClusterResourceManager;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkResourceEventCollector;
import
org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotEventCollector;
+import org.apache.flink.kubernetes.operator.utils.OperatorPluginUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
@@ -104,7 +105,10 @@ public class TestingFlinkDeploymentController
canaryResourceManager = new CanaryResourceManager<>(configManager);
flinkDeploymentController =
new FlinkDeploymentController(
- ValidatorUtils.discoverValidators(configManager),
+ ValidatorUtils.discoverValidators(
+ configManager,
+ OperatorPluginUtils.createPluginManager(
+ configManager.getDefaultConfig())),
contextFactory,
reconcilerFactory,
new FlinkDeploymentObserverFactory(eventRecorder),
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
index b135fba0..e2a42710 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
@@ -35,6 +35,7 @@ import
org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReco
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkResourceEventCollector;
import
org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotEventCollector;
+import org.apache.flink.kubernetes.operator.utils.OperatorPluginUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
@@ -90,7 +91,10 @@ public class TestingFlinkSessionJobController
flinkSessionJobController =
new FlinkSessionJobController(
- ValidatorUtils.discoverValidators(configManager),
+ ValidatorUtils.discoverValidators(
+ configManager,
+ OperatorPluginUtils.createPluginManager(
+ configManager.getDefaultConfig())),
ctxFactory,
new SessionJobReconciler(
eventRecorder, statusRecorder, new
NoopJobAutoscaler<>()),
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/ListenerUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/ListenerUtilsTest.java
index ba5ddc33..4325dacd 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/ListenerUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/ListenerUtilsTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.utils.OperatorPluginUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -56,10 +57,11 @@ public class ListenerUtilsTest {
ConfigConstants.ENV_FLINK_PLUGINS_DIR,
TestUtils.getTestPluginsRootDir(temporaryFolder));
TestUtils.setEnv(systemEnv);
+ var configManager = new
FlinkConfigManager(Configuration.fromMap(testConfig));
+ var pluginManager =
+
OperatorPluginUtils.createPluginManager(configManager.getDefaultConfig());
var listeners =
- new ArrayList<>(
- ListenerUtils.discoverListeners(
- new
FlinkConfigManager(Configuration.fromMap(testConfig))));
+ new
ArrayList<>(ListenerUtils.discoverListeners(configManager, pluginManager));
assertEquals(1, listeners.size());
var testingListener = (TestingListener) listeners.get(0);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/MutatorUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/MutatorUtilsTest.java
index 4b840738..ce721bbb 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/MutatorUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/MutatorUtilsTest.java
@@ -52,13 +52,15 @@ public class MutatorUtilsTest {
ConfigConstants.ENV_FLINK_PLUGINS_DIR,
TestUtils.getTestPluginsRootDir(temporaryFolder));
TestUtils.setEnv(systemEnv);
+ var configManager = new FlinkConfigManager(new Configuration());
+ var pluginManager =
+
OperatorPluginUtils.createPluginManager(configManager.getDefaultConfig());
assertEquals(
new HashSet<>(
Arrays.asList(
DefaultFlinkMutator.class.getName(),
TestMutator.class.getName())),
- MutatorUtils.discoverMutators(new FlinkConfigManager(new
Configuration()))
- .stream()
+ MutatorUtils.discoverMutators(configManager,
pluginManager).stream()
.map(v -> v.getClass().getName())
.collect(Collectors.toSet()));
} finally {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtilsTest.java
index 70d8c9fd..b5959f64 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtilsTest.java
@@ -52,13 +52,15 @@ public class ValidatorUtilsTest {
ConfigConstants.ENV_FLINK_PLUGINS_DIR,
TestUtils.getTestPluginsRootDir(temporaryFolder));
TestUtils.setEnv(systemEnv);
+ var configManager = new FlinkConfigManager(new Configuration());
+ var pluginManager =
+
OperatorPluginUtils.createPluginManager(configManager.getDefaultConfig());
assertEquals(
new HashSet<>(
Arrays.asList(
DefaultValidator.class.getName(),
TestValidator.class.getName())),
- ValidatorUtils.discoverValidators(new
FlinkConfigManager(new Configuration()))
- .stream()
+ ValidatorUtils.discoverValidators(configManager,
pluginManager).stream()
.map(v -> v.getClass().getName())
.collect(Collectors.toSet()));
} finally {
diff --git
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
index c616372f..92f0760f 100644
---
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
+++
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.admission;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
@@ -28,6 +29,7 @@ import
org.apache.flink.kubernetes.operator.ssl.ReloadableSslContext;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
import org.apache.flink.kubernetes.operator.utils.MutatorUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorPluginUtils;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
@@ -83,8 +85,10 @@ public class FlinkOperatorWebhook {
informerManager.setNamespaces(operatorConfig.getWatchedNamespaces());
}
- this.validators = ValidatorUtils.discoverValidators(configManager);
- this.mutators = MutatorUtils.discoverMutators(configManager);
+ PluginManager pluginManager =
+
OperatorPluginUtils.createPluginManager(configManager.getDefaultConfig());
+ this.validators = ValidatorUtils.discoverValidators(configManager,
pluginManager);
+ this.mutators = MutatorUtils.discoverMutators(configManager,
pluginManager);
this.admissionHandler =
new AdmissionHandler(
new FlinkValidator(validators, informerManager),
diff --git
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
index e912baf1..a51ed7c3 100644
---
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
+++
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.admission;
+import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
import org.apache.flink.kubernetes.operator.api.CrdConstants;
@@ -27,6 +28,7 @@ import
org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.utils.MutatorUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorPluginUtils;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -67,15 +69,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@EnableKubernetesMockClient(crud = true)
public class AdmissionHandlerTest {
+ private static final FlinkConfigManager CONFIG_MANAGER = new
FlinkConfigManager(ns -> {}, true);
+ private static final PluginManager PLUGIN_MANAGER =
+
OperatorPluginUtils.createPluginManager(CONFIG_MANAGER.getDefaultConfig());
+
private KubernetesClient kubernetesClient;
private AdmissionHandler admissionHandler =
new AdmissionHandler(
new FlinkValidator(
- ValidatorUtils.discoverValidators(
- new FlinkConfigManager(ns -> {}, true)),
+ ValidatorUtils.discoverValidators(CONFIG_MANAGER,
PLUGIN_MANAGER),
new InformerManager(null)),
new FlinkMutator(
- MutatorUtils.discoverMutators(new
FlinkConfigManager(ns -> {}, true)),
+ MutatorUtils.discoverMutators(CONFIG_MANAGER,
PLUGIN_MANAGER),
new InformerManager(kubernetesClient)));
@Test
@@ -149,12 +154,10 @@ public class AdmissionHandlerTest {
admissionHandler =
new AdmissionHandler(
new FlinkValidator(
- ValidatorUtils.discoverValidators(
- new FlinkConfigManager(ns -> {},
true)),
+
ValidatorUtils.discoverValidators(CONFIG_MANAGER, PLUGIN_MANAGER),
new InformerManager(null)),
new FlinkMutator(
- MutatorUtils.discoverMutators(
- new FlinkConfigManager(ns -> {},
true)),
+ MutatorUtils.discoverMutators(CONFIG_MANAGER,
PLUGIN_MANAGER),
informerManager));
final EmbeddedChannel embeddedChannel = new
EmbeddedChannel(admissionHandler);