This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new c6769c7a [FLINK-39508][flink-kubernetes-webhook] Fix
flink-kubernetes-webhook module tests
c6769c7a is described below
commit c6769c7a4243d7cfb8e116d8d476e7321aecb9b5
Author: Dennis-Mircea Ciupitu <[email protected]>
AuthorDate: Mon Apr 27 08:55:26 2026 +0300
[FLINK-39508][flink-kubernetes-webhook] Fix flink-kubernetes-webhook module
tests
---
flink-kubernetes-webhook/pom.xml | 38 +-
.../operator/admission/FlinkOperatorWebhook.java | 50 +-
.../admission/mutator/DefaultRequestMutator.java | 2 +-
.../src/test/assembly/test-plugins-assembly.xml | 3 +-
.../admission/FlinkOperatorWebhookTest.java | 501 +++++++++++++++++++++
.../operator/admission/FlinkValidatorTest.java | 352 +++++++++++++++
.../admission/informer/InformerManagerTest.java | 55 +++
.../mutator/DefaultRequestMutatorTest.java | 141 ++++++
.../admission/mutator/FlinkMutatorTest.java | 446 ++++++++++++++++++
.../kubernetes/operator/mutator/TestMutator.java | 44 ++
.../operator/validation/TestValidator.java | 46 ++
...ubernetes.operator.mutator.FlinkResourceMutator | 17 +
...etes.operator.validation.FlinkResourceValidator | 17 +
13 files changed, 1689 insertions(+), 23 deletions(-)
diff --git a/flink-kubernetes-webhook/pom.xml b/flink-kubernetes-webhook/pom.xml
index ed1f60d1..e5cc0e4d 100644
--- a/flink-kubernetes-webhook/pom.xml
+++ b/flink-kubernetes-webhook/pom.xml
@@ -32,10 +32,13 @@ under the License.
<packaging>jar</packaging>
<properties>
- <surefire-plugin.version>2.22.2</surefire-plugin.version>
+ <plugins.tmp.dir>${project.build.directory}/plugins</plugins.tmp.dir>
+ <surefire.module.config>
+ <!-- required by FlinkOperatorWebhookTest -->
+ --add-opens=java.base/java.util=ALL-UNNAMED
+ </surefire.module.config>
</properties>
-
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
@@ -84,11 +87,37 @@ under the License.
<version>${okhttp.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>create-test-plugin-jar</id>
+ <phase>process-test-classes</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <finalName>test-plugins</finalName>
+ <attach>false</attach>
+ <descriptors>
+
<descriptor>src/test/assembly/test-plugins-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
@@ -116,11 +145,6 @@ under the License.
</execution>
</executions>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>${surefire-plugin.version}</version>
- </plugin>
</plugins>
</build>
</project>
diff --git
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
index 196f6b40..c616372f 100644
---
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
+++
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.admission;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
@@ -45,6 +46,8 @@ import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.file.Path;
@@ -58,26 +61,38 @@ public class FlinkOperatorWebhook {
private static FileSystemWatchService fileSystemWatchService;
- public static void main(String[] args) throws Exception {
- EnvUtils.logEnvironmentInfo(LOG, "Flink Kubernetes Webhook", args);
- var informerManager = new InformerManager(new
KubernetesClientBuilder().build());
- var configManager =
- new FlinkConfigManager(
- informerManager::setNamespaces,
-
KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class));
+ @VisibleForTesting final Set<FlinkResourceValidator> validators;
+ @VisibleForTesting final Set<FlinkResourceMutator> mutators;
+ @VisibleForTesting final AdmissionHandler admissionHandler;
+
+ @VisibleForTesting
+ FlinkOperatorWebhook(
+ @Nullable InformerManager informerManager, @Nullable
FlinkConfigManager configManager) {
+ if (informerManager == null) {
+ informerManager = new InformerManager(new
KubernetesClientBuilder().build());
+ }
+ if (configManager == null) {
+ configManager =
+ new FlinkConfigManager(
+ informerManager::setNamespaces,
+
KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class));
+ }
+
var operatorConfig = configManager.getOperatorConfiguration();
if (!operatorConfig.isDynamicNamespacesEnabled()) {
informerManager.setNamespaces(operatorConfig.getWatchedNamespaces());
}
- Set<FlinkResourceValidator> validators =
ValidatorUtils.discoverValidators(configManager);
- Set<FlinkResourceMutator> mutators =
MutatorUtils.discoverMutators(configManager);
- AdmissionHandler endpoint =
+ this.validators = ValidatorUtils.discoverValidators(configManager);
+ this.mutators = MutatorUtils.discoverMutators(configManager);
+ this.admissionHandler =
new AdmissionHandler(
new FlinkValidator(validators, informerManager),
new FlinkMutator(mutators, informerManager));
+ }
- ChannelInitializer<SocketChannel> initializer =
createChannelInitializer(endpoint);
+ public void run() throws Exception {
+ ChannelInitializer<SocketChannel> initializer =
createChannelInitializer(admissionHandler);
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
@@ -103,12 +118,18 @@ public class FlinkOperatorWebhook {
}
}
+ public static void main(String[] args) throws Exception {
+ EnvUtils.logEnvironmentInfo(LOG, "Flink Kubernetes Webhook", args);
+ new FlinkOperatorWebhook(null, null).run();
+ }
+
private static int getPort() {
String portString =
EnvUtils.getRequired(EnvUtils.ENV_WEBHOOK_SERVER_PORT);
return Integer.parseInt(portString);
}
- private static ChannelInitializer<SocketChannel> createChannelInitializer(
+ @VisibleForTesting
+ static ChannelInitializer<SocketChannel> createChannelInitializer(
AdmissionHandler admissionHandler) throws Exception {
SslContext sslContext = createSslContext();
@@ -150,7 +171,7 @@ public class FlinkOperatorWebhook {
stopFileSystemWatchService();
final String realKeystoreFileName =
Path.of(keystorePathOpt.get()).toRealPath().getFileName().toString();
- LOG.info("Keystore path is resolved to real filename: " +
realKeystoreFileName);
+ LOG.info("Keystore path is resolved to real filename: {}",
realKeystoreFileName);
fileSystemWatchService =
new
FileSystemWatchService(Path.of(keystorePathOpt.get()).getParent().toString()) {
@Override
@@ -160,7 +181,8 @@ public class FlinkOperatorWebhook {
reloadableSslContext.reload();
LOG.info("SSL context reloaded successfully");
} catch (Exception e) {
- LOG.error("SSL context reload received exception:
" + e);
+ LOG.error(
+ "SSL context reload received exception:
{}", String.valueOf(e));
}
}
};
diff --git
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutator.java
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutator.java
index 0e2d05f1..1365dafe 100644
---
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutator.java
+++
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutator.java
@@ -74,7 +74,7 @@ public class DefaultRequestMutator<T extends
KubernetesResource>
return admissionResponse;
}
- public static AdmissionResponse admissionResponseFromMutation(
+ private static AdmissionResponse admissionResponseFromMutation(
KubernetesResource originalResource, KubernetesResource
mutatedResource) {
AdmissionResponse admissionResponse = new AdmissionResponse();
admissionResponse.setAllowed(true);
diff --git
a/flink-kubernetes-webhook/src/test/assembly/test-plugins-assembly.xml
b/flink-kubernetes-webhook/src/test/assembly/test-plugins-assembly.xml
index b40333f1..8f974d93 100644
--- a/flink-kubernetes-webhook/src/test/assembly/test-plugins-assembly.xml
+++ b/flink-kubernetes-webhook/src/test/assembly/test-plugins-assembly.xml
@@ -28,7 +28,8 @@ under the License.
<outputDirectory>/</outputDirectory>
<!-- the service impl -->
<includes>
-
<include>org/apache/flink/kubernetes/operator/admission/TestMutator.java</include>
+
<include>org/apache/flink/kubernetes/operator/mutator/TestMutator.class</include>
+
<include>org/apache/flink/kubernetes/operator/validation/TestValidator.class</include>
</includes>
</fileSet>
<fileSet>
diff --git
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhookTest.java
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhookTest.java
new file mode 100644
index 00000000..ba6590a1
--- /dev/null
+++
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhookTest.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.admission;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
+import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
+import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.mutator.DefaultFlinkMutator;
+import org.apache.flink.kubernetes.operator.mutator.TestMutator;
+import org.apache.flink.kubernetes.operator.utils.EnvUtils;
+import org.apache.flink.kubernetes.operator.validation.DefaultValidator;
+import org.apache.flink.kubernetes.operator.validation.TestValidator;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.fabric8.kubernetes.api.model.GroupVersionKind;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.admission.v1.AdmissionRequest;
+import io.fabric8.kubernetes.api.model.admission.v1.AdmissionReview;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
+import static io.javaoperatorsdk.webhook.admission.Operation.CREATE;
+import static
org.apache.flink.kubernetes.operator.admission.AdmissionHandler.MUTATOR_REQUEST_PATH;
+import static
org.apache.flink.kubernetes.operator.admission.AdmissionHandler.VALIDATE_REQUEST_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link FlinkOperatorWebhook}. */
+@EnableKubernetesMockClient(crud = true)
+class FlinkOperatorWebhookTest {
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+ private static final String TEST_PLUGINS = "test-plugins";
+ private static final String PLUGINS_JAR = TEST_PLUGINS + "-test-jar.jar";
+
+ private KubernetesClient kubernetesClient;
+ private Channel serverChannel;
+ private NioEventLoopGroup bossGroup;
+ private NioEventLoopGroup workerGroup;
+ private int port;
+
+ @BeforeEach
+ void setup() throws Exception {
+ var informerManager = new InformerManager(kubernetesClient);
+ informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+ var validator = new FlinkValidator(Set.of(), informerManager);
+ var mutator = new FlinkMutator(Set.of(new DefaultFlinkMutator()),
informerManager);
+ var admissionHandler = new AdmissionHandler(validator, mutator);
+
+ var initializer =
FlinkOperatorWebhook.createChannelInitializer(admissionHandler);
+ bossGroup = new NioEventLoopGroup(1);
+ workerGroup = new NioEventLoopGroup();
+ serverChannel =
+ new ServerBootstrap()
+ .group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(initializer)
+ .bind(0)
+ .sync()
+ .channel();
+ port = ((InetSocketAddress) serverChannel.localAddress()).getPort();
+ }
+
+ @AfterEach
+ void teardown() {
+ if (serverChannel != null) {
+ serverChannel.close();
+ }
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
+ }
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
+ }
+ }
+
+ @Test
+ void validateEndpointAcceptsFlinkDeployment() throws Exception {
+ var review = createAdmissionReview(createDeployment());
+
+ var response = sendRequest(VALIDATE_REQUEST_PATH, review);
+
+ assertEquals(200, response.statusCode());
+ var responseReview = mapper.readValue(response.body(),
AdmissionReview.class);
+ assertTrue(responseReview.getResponse().getAllowed());
+ }
+
+ @Test
+ void mutateEndpointAddsTargetSessionLabelToSessionJob() throws Exception {
+ var review = createAdmissionReview(createSessionJob());
+
+ var response = sendRequest(MUTATOR_REQUEST_PATH, review);
+
+ assertEquals(200, response.statusCode());
+ var responseReview = mapper.readValue(response.body(),
AdmissionReview.class);
+ assertTrue(responseReview.getResponse().getAllowed());
+ var patch = new
String(Base64.getDecoder().decode(responseReview.getResponse().getPatch()));
+ assertTrue(
+ patch.contains(CrdConstants.LABEL_TARGET_SESSION),
+ "Patch should contain the target-session label");
+ }
+
+ @Test
+ void mutateEndpointProducesEmptyPatchForNoOpDeployment() throws Exception {
+ var review = createAdmissionReview(createDeployment());
+
+ var response = sendRequest(MUTATOR_REQUEST_PATH, review);
+
+ assertEquals(200, response.statusCode());
+ var responseReview = mapper.readValue(response.body(),
AdmissionReview.class);
+ assertTrue(responseReview.getResponse().getAllowed());
+ var patch = new
String(Base64.getDecoder().decode(responseReview.getResponse().getPatch()));
+ assertEquals("[]", patch, "No-op mutation should produce an empty JSON
patch");
+ }
+
+ @Test
+ void illegalPathReturnsInternalServerError() throws Exception {
+ var review = createAdmissionReview(createDeployment());
+
+ var response = sendRequest("/illegal-path", review);
+
+ assertEquals(500, response.statusCode());
+ }
+
+ @Test
+ void malformedBodyReturnsInternalServerError() throws IOException,
InterruptedException {
+ var request =
+ HttpRequest.newBuilder()
+ .uri(URI.create("http://localhost:" + port +
VALIDATE_REQUEST_PATH))
+ .POST(HttpRequest.BodyPublishers.ofString("not-json"))
+ .build();
+
+ var response =
+ HttpClient.newHttpClient().send(request,
HttpResponse.BodyHandlers.ofString());
+
+ assertEquals(500, response.statusCode());
+ }
+
+ @Test
+ void serverDoesNotUseSslWhenNoKeystoreConfigured() throws Exception {
+ var review = createAdmissionReview(createDeployment());
+
+ var response = sendRequest(VALIDATE_REQUEST_PATH, review);
+
+ assertEquals(200, response.statusCode(), "Plain HTTP should work
without SSL configured");
+ }
+
+ @Test
+ void webhookDiscoversCustomValidatorFromPlugins(@TempDir Path
temporaryFolder)
+ throws Exception {
+ Map<String, String> originalEnv = System.getenv();
+ try {
+ Map<String, String> systemEnv = new HashMap<>(originalEnv);
+ systemEnv.put(
+ ConfigConstants.ENV_FLINK_PLUGINS_DIR,
getTestPluginsRootDir(temporaryFolder));
+ setEnv(systemEnv);
+
+ var informerManager = new InformerManager(kubernetesClient);
+ informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+ var configManager = new FlinkConfigManager(new Configuration());
+
+ var webhook = new FlinkOperatorWebhook(informerManager,
configManager);
+
+ assertEquals(
+ new HashSet<>(
+ Arrays.asList(
+ DefaultValidator.class.getName(),
+ TestValidator.class.getName())),
+ webhook.validators.stream()
+ .map(v -> v.getClass().getName())
+ .collect(Collectors.toSet()),
+ "Should discover both DefaultValidator and TestValidator
from plugins");
+ } finally {
+ setEnv(originalEnv);
+ }
+ }
+
+ @Test
+ void webhookDiscoversCustomMutatorFromPlugins(@TempDir Path
temporaryFolder) throws Exception {
+ Map<String, String> originalEnv = System.getenv();
+ try {
+ Map<String, String> systemEnv = new HashMap<>(originalEnv);
+ systemEnv.put(
+ ConfigConstants.ENV_FLINK_PLUGINS_DIR,
getTestPluginsRootDir(temporaryFolder));
+ setEnv(systemEnv);
+
+ var informerManager = new InformerManager(kubernetesClient);
+ informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+ var configManager = new FlinkConfigManager(new Configuration());
+
+ var webhook = new FlinkOperatorWebhook(informerManager,
configManager);
+
+ assertEquals(
+ new HashSet<>(
+ Arrays.asList(
+ DefaultFlinkMutator.class.getName(),
+ TestMutator.class.getName())),
+ webhook.mutators.stream()
+ .map(m -> m.getClass().getName())
+ .collect(Collectors.toSet()),
+ "Should discover both DefaultFlinkMutator and TestMutator
from plugins");
+ } finally {
+ setEnv(originalEnv);
+ }
+ }
+
+ @Test
+ void createChannelInitializerWithSslContext(@TempDir Path temporaryFolder)
throws Exception {
+ Path keystorePath = createTestKeystore(temporaryFolder);
+ Map<String, String> originalEnv = System.getenv();
+ try {
+ Map<String, String> systemEnv = new HashMap<>(originalEnv);
+ systemEnv.put(EnvUtils.ENV_WEBHOOK_KEYSTORE_FILE,
keystorePath.toString());
+ systemEnv.put(EnvUtils.ENV_WEBHOOK_KEYSTORE_TYPE, "PKCS12");
+ systemEnv.put(EnvUtils.ENV_WEBHOOK_KEYSTORE_PASSWORD, "testpass");
+ setEnv(systemEnv);
+
+ var informerManager = new InformerManager(kubernetesClient);
+ informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+ var validator = new FlinkValidator(Set.of(), informerManager);
+ var mutator = new FlinkMutator(Set.of(new DefaultFlinkMutator()),
informerManager);
+ var admissionHandler = new AdmissionHandler(validator, mutator);
+
+ var initializer =
FlinkOperatorWebhook.createChannelInitializer(admissionHandler);
+ assertNotNull(initializer, "Channel initializer with SSL should
not be null");
+
+ // Start a server with SSL and verify HTTPS works
+ NioEventLoopGroup sslBossGroup = new NioEventLoopGroup(1);
+ NioEventLoopGroup sslWorkerGroup = new NioEventLoopGroup();
+ try {
+ Channel sslChannel =
+ new ServerBootstrap()
+ .group(sslBossGroup, sslWorkerGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(initializer)
+ .bind(0)
+ .sync()
+ .channel();
+ int sslPort = ((InetSocketAddress)
sslChannel.localAddress()).getPort();
+
+ var review = createAdmissionReview(createDeployment());
+ var body = mapper.writeValueAsString(review);
+ var request =
+ HttpRequest.newBuilder()
+ .uri(
+ URI.create(
+ "https://localhost:"
+ + sslPort
+ +
VALIDATE_REQUEST_PATH))
+
.POST(HttpRequest.BodyPublishers.ofString(body))
+ .header("Content-Type", "application/json")
+ .build();
+
+ var trustAllContext = createTrustAllSslContext();
+ var response =
+ HttpClient.newBuilder()
+ .sslContext(trustAllContext)
+ .build()
+ .send(request,
HttpResponse.BodyHandlers.ofString());
+
+ assertEquals(
+ 200,
+ response.statusCode(),
+ "HTTPS request should succeed with SSL enabled");
+
+ sslChannel.close();
+ } finally {
+ sslBossGroup.shutdownGracefully();
+ sslWorkerGroup.shutdownGracefully();
+ }
+ } finally {
+ setEnv(originalEnv);
+ }
+ }
+
+ @Test
+ void createChannelInitializerWithSslContextMissingKeystoreType(@TempDir
Path temporaryFolder)
+ throws Exception {
+ Path keystorePath = createTestKeystore(temporaryFolder);
+ Map<String, String> originalEnv = System.getenv();
+ try {
+ Map<String, String> systemEnv = new HashMap<>(originalEnv);
+ systemEnv.put(EnvUtils.ENV_WEBHOOK_KEYSTORE_FILE,
keystorePath.toString());
+ // Missing KEYSTORE_TYPE and KEYSTORE_PASSWORD
+ systemEnv.remove(EnvUtils.ENV_WEBHOOK_KEYSTORE_TYPE);
+ systemEnv.remove(EnvUtils.ENV_WEBHOOK_KEYSTORE_PASSWORD);
+ setEnv(systemEnv);
+
+ var informerManager = new InformerManager(kubernetesClient);
+ informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+ var validator = new FlinkValidator(Set.of(), informerManager);
+ var mutator = new FlinkMutator(Set.of(new DefaultFlinkMutator()),
informerManager);
+ var admissionHandler = new AdmissionHandler(validator, mutator);
+
+ assertThrows(
+ java.util.NoSuchElementException.class,
+ () ->
FlinkOperatorWebhook.createChannelInitializer(admissionHandler),
+ "Should throw when keystore type env var is missing");
+ } finally {
+ setEnv(originalEnv);
+ }
+ }
+
+ private static Path createTestKeystore(Path dir) throws Exception {
+ Path keystorePath = dir.resolve("test-keystore.p12");
+ ProcessBuilder pb =
+ new ProcessBuilder(
+ "keytool",
+ "-genkeypair",
+ "-alias",
+ "test",
+ "-keyalg",
+ "RSA",
+ "-keysize",
+ "2048",
+ "-storetype",
+ "PKCS12",
+ "-keystore",
+ keystorePath.toString(),
+ "-storepass",
+ "testpass",
+ "-dname",
+ "CN=localhost",
+ "-validity",
+ "365");
+ pb.inheritIO();
+ Process process = pb.start();
+ int exitCode = process.waitFor();
+ assertEquals(0, exitCode, "keytool should succeed");
+ assertTrue(keystorePath.toFile().exists(), "Keystore file should be
created");
+ return keystorePath;
+ }
+
+ private static SSLContext createTrustAllSslContext() throws Exception {
+ TrustManager[] trustAll =
+ new TrustManager[] {
+ new X509TrustManager() {
+ public X509Certificate[] getAcceptedIssuers() {
+ return new X509Certificate[0];
+ }
+
+ public void checkClientTrusted(X509Certificate[]
certs, String authType) {}
+
+ public void checkServerTrusted(X509Certificate[]
certs, String authType) {}
+ }
+ };
+ SSLContext sc = SSLContext.getInstance("TLS");
+ sc.init(null, trustAll, new java.security.SecureRandom());
+ return sc;
+ }
+
+ private static String getTestPluginsRootDir(Path temporaryFolder) throws
IOException {
+ File testPluginFolder = new File(temporaryFolder.toFile(),
TEST_PLUGINS);
+ assertTrue(testPluginFolder.mkdirs());
+ File testPluginJar = new File("target", PLUGINS_JAR);
+ assertTrue(
+ testPluginJar.exists(),
+ "Test plugin jar not found at "
+ + testPluginJar.getAbsolutePath()
+ + ". Run 'mvn process-test-classes' first.");
+ Files.copy(testPluginJar.toPath(),
Paths.get(testPluginFolder.toString(), PLUGINS_JAR));
+ return temporaryFolder.toAbsolutePath().toString();
+ }
+
+ @SuppressWarnings({"unchecked", "JavaReflectionMemberAccess"})
+ private static void setEnv(Map<String, String> newEnv) {
+ try {
+ Map<String, String> env = System.getenv();
+ Class<?> clazz = env.getClass();
+ Field field = clazz.getDeclaredField("m");
+ field.setAccessible(true);
+ Map<String, String> map = (Map<String, String>) field.get(env);
+ map.clear();
+ map.putAll(newEnv);
+ Class<?> processEnvironmentClass =
Class.forName("java.lang.ProcessEnvironment");
+ try {
+ Field theCaseInsensitiveEnvironmentField =
+
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
+ theCaseInsensitiveEnvironmentField.setAccessible(true);
+ Map<String, String> ciEnv =
+ (Map<String, String>)
theCaseInsensitiveEnvironmentField.get(null);
+ ciEnv.clear();
+ ciEnv.putAll(newEnv);
+ } catch (NoSuchFieldException ignored) {
+ }
+ } catch (Exception e1) {
+ throw new RuntimeException(e1);
+ }
+ }
+
+ private HttpResponse<String> sendRequest(String path, AdmissionReview
review)
+ throws IOException, InterruptedException {
+ var body = mapper.writeValueAsString(review);
+ var request =
+ HttpRequest.newBuilder()
+ .uri(URI.create("http://localhost:" + port + path))
+ .POST(HttpRequest.BodyPublishers.ofString(body))
+ .header("Content-Type", "application/json")
+ .build();
+ return HttpClient.newHttpClient().send(request,
HttpResponse.BodyHandlers.ofString());
+ }
+
+ private AdmissionReview createAdmissionReview(Object resource) {
+ var admissionRequest = new AdmissionRequest();
+ admissionRequest.setOperation(CREATE.name());
+ admissionRequest.setObject(resource);
+ if (resource instanceof FlinkDeployment fd) {
+ admissionRequest.setKind(
+ new GroupVersionKind(fd.getGroup(), fd.getVersion(),
fd.getKind()));
+ } else if (resource instanceof FlinkSessionJob sj) {
+ admissionRequest.setKind(
+ new GroupVersionKind(sj.getGroup(), sj.getVersion(),
sj.getKind()));
+ }
+ var review = new AdmissionReview();
+ review.setRequest(admissionRequest);
+ return review;
+ }
+
+ private FlinkDeployment createDeployment() {
+ var deployment = new FlinkDeployment();
+ var meta = new ObjectMeta();
+ meta.setName("test-deployment");
+ meta.setNamespace("default");
+ deployment.setMetadata(meta);
+ deployment.setSpec(new FlinkDeploymentSpec());
+ return deployment;
+ }
+
+ private FlinkSessionJob createSessionJob() {
+ var sessionJob = new FlinkSessionJob();
+ var meta = new ObjectMeta();
+ meta.setName("test-job");
+ meta.setNamespace("default");
+ sessionJob.setMetadata(meta);
+ sessionJob.setSpec(
+ FlinkSessionJobSpec.builder()
+
.job(JobSpec.builder().jarURI("http://test-job.jar").build())
+ .deploymentName("test-deployment")
+ .build());
+ return sessionJob;
+ }
+}
diff --git
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/FlinkValidatorTest.java
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/FlinkValidatorTest.java
new file mode 100644
index 00000000..d8311227
--- /dev/null
+++
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/FlinkValidatorTest.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.admission;
+
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobKind;
+import org.apache.flink.kubernetes.operator.api.spec.JobReference;
+import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.javaoperatorsdk.webhook.admission.NotAllowedException;
+import io.javaoperatorsdk.webhook.admission.Operation;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.Set;
+
+import static
io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link FlinkValidator}. */
+@EnableKubernetesMockClient(crud = true)
+class FlinkValidatorTest {
+
+ private KubernetesClient kubernetesClient;
+ private InformerManager informerManager;
+
+ @BeforeEach
+ void setup() {
+ informerManager = new InformerManager(kubernetesClient);
+ informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+ void canaryResourceSkipsValidation(Operation operation) {
+ var validator = createValidator(failingValidator("should not be
called"));
+ var deployment = createDeployment();
+ var labels = new HashMap<String, String>();
+ labels.put("flink.apache.org/canary", "true");
+ deployment.getMetadata().setLabels(labels);
+
+ assertDoesNotThrow(() -> validator.validate(deployment, null,
operation));
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+ void deploymentPassesValidation(Operation operation) {
+ var validator = createValidator(passingValidator());
+ var deployment = createDeployment();
+
+ assertDoesNotThrow(() -> validator.validate(deployment, null,
operation));
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+ void deploymentFailsValidation(Operation operation) {
+ var validator = createValidator(failingValidator("deployment is
invalid"));
+ var deployment = createDeployment();
+
+ var exception =
+ assertThrows(
+ NotAllowedException.class,
+ () -> validator.validate(deployment, null, operation));
+ assertEquals("deployment is invalid", exception.getMessage());
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+ void sessionJobPassesValidation(Operation operation) {
+ var validator = createValidator(passingValidator());
+ var sessionJob = createSessionJob();
+
+ assertDoesNotThrow(() -> validator.validate(sessionJob, null,
operation));
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+ void sessionJobFailsValidation(Operation operation) {
+ var validator = createValidator(failingValidator("session job is
invalid"));
+ var sessionJob = createSessionJob();
+
+ var exception =
+ assertThrows(
+ NotAllowedException.class,
+ () -> validator.validate(sessionJob, null, operation));
+ assertEquals("session job is invalid", exception.getMessage());
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+ void stateSnapshotPassesValidation(Operation operation) {
+ var validator = createValidator(passingValidator());
+ var snapshot = createStateSnapshot();
+
+ assertDoesNotThrow(() -> validator.validate(snapshot, null,
operation));
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+ void stateSnapshotFailsValidation(Operation operation) {
+ var validator = createValidator(failingValidator("snapshot is
invalid"));
+ var snapshot = createStateSnapshot();
+
+ var exception =
+ assertThrows(
+ NotAllowedException.class,
+ () -> validator.validate(snapshot, null, operation));
+ assertEquals("snapshot is invalid", exception.getMessage());
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+ void unexpectedResourceKindThrowsNotAllowedException(Operation operation) {
+ var validator = createValidator(passingValidator());
+ var pod = new io.fabric8.kubernetes.api.model.Pod();
+ pod.setMetadata(new ObjectMeta());
+ pod.getMetadata().setName("test-pod");
+ pod.getMetadata().setNamespace("default");
+
+ var exception =
+ assertThrows(
+ NotAllowedException.class, () ->
validator.validate(pod, null, operation));
+ assertEquals("Unexpected resource: Pod", exception.getMessage());
+ }
+
+ @Test
+ void firstValidatorErrorStopsValidation() {
+ FlinkResourceValidator first = failingValidator("first error");
+ FlinkResourceValidator second = failingValidator("second error");
+ var validator = new FlinkValidator(Set.of(first, second),
informerManager);
+ var deployment = createDeployment();
+
+ var exception =
+ assertThrows(
+ NotAllowedException.class,
+ () -> validator.validate(deployment, null,
Operation.CREATE));
+ // With Set ordering, we can't guarantee which fires first, but one of
them must
+ assertTrue(
+ ("first error".equals(exception.getMessage())
+ && !"second
error".equals(exception.getMessage()))
+ || (!"first error".equals(exception.getMessage())
+ && "second
error".equals(exception.getMessage())));
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+ void stateSnapshotWithFlinkDeploymentJobRefPassesValidation(Operation
operation) {
+ var validator = createValidator(passingValidator());
+ var snapshot = createStateSnapshotWithJobRef(JobKind.FLINK_DEPLOYMENT,
"test-deployment");
+
+ assertDoesNotThrow(() -> validator.validate(snapshot, null,
operation));
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+ void stateSnapshotWithFlinkSessionJobJobRefPassesValidation(Operation
operation) {
+ var validator = createValidator(passingValidator());
+ var snapshot =
createStateSnapshotWithJobRef(JobKind.FLINK_SESSION_JOB, "test-session-job");
+
+ assertDoesNotThrow(() -> validator.validate(snapshot, null,
operation));
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+ void
stateSnapshotWithFlinkDeploymentJobRefThrowsWhenNamespaceMissing(Operation
operation) {
+ var validator = createValidator(passingValidator());
+ var snapshot =
+ createStateSnapshotWithJobRefNoNamespace(
+ JobKind.FLINK_DEPLOYMENT, "test-deployment");
+
+ var exception =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> validator.validate(snapshot, null, operation));
+ assertEquals("Cannot determine namespace for snapshot",
exception.getMessage());
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+ void
stateSnapshotWithFlinkSessionJobJobRefThrowsWhenNamespaceMissing(Operation
operation) {
+ var validator = createValidator(passingValidator());
+ var snapshot =
+ createStateSnapshotWithJobRefNoNamespace(
+ JobKind.FLINK_SESSION_JOB, "test-session-job");
+
+ var exception =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> validator.validate(snapshot, null, operation));
+ assertEquals("Cannot determine namespace for snapshot",
exception.getMessage());
+ }
+
+ private FlinkValidator createValidator(FlinkResourceValidator
resourceValidator) {
+ return new FlinkValidator(Set.of(resourceValidator), informerManager);
+ }
+
+ private FlinkResourceValidator passingValidator() {
+ return new FlinkResourceValidator() {
+ @Override
+ public Optional<String> validateDeployment(FlinkDeployment
deployment) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<String> validateSessionJob(
+ FlinkSessionJob sessionJob, Optional<FlinkDeployment>
session) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<String> validateStateSnapshot(
+ FlinkStateSnapshot savepoint,
Optional<AbstractFlinkResource<?, ?>> target) {
+ return Optional.empty();
+ }
+ };
+ }
+
+ private FlinkResourceValidator failingValidator(String errorMessage) {
+ return new FlinkResourceValidator() {
+ @Override
+ public Optional<String> validateDeployment(FlinkDeployment
deployment) {
+ return Optional.of(errorMessage);
+ }
+
+ @Override
+ public Optional<String> validateSessionJob(
+ FlinkSessionJob sessionJob, Optional<FlinkDeployment>
session) {
+ return Optional.of(errorMessage);
+ }
+
+ @Override
+ public Optional<String> validateStateSnapshot(
+ FlinkStateSnapshot savepoint,
Optional<AbstractFlinkResource<?, ?>> target) {
+ return Optional.of(errorMessage);
+ }
+ };
+ }
+
+ private FlinkDeployment createDeployment() {
+ var deployment = new FlinkDeployment();
+ var meta = new ObjectMeta();
+ meta.setName("test-deployment");
+ meta.setNamespace("default");
+ deployment.setMetadata(meta);
+ deployment.setSpec(new FlinkDeploymentSpec());
+ return deployment;
+ }
+
+ private FlinkSessionJob createSessionJob() {
+ var sessionJob = new FlinkSessionJob();
+ var meta = new ObjectMeta();
+ meta.setName("test-job");
+ meta.setNamespace("default");
+ sessionJob.setMetadata(meta);
+ sessionJob.setSpec(
+ FlinkSessionJobSpec.builder()
+
.job(JobSpec.builder().jarURI("http://test-job.jar").build())
+ .deploymentName("test-deployment")
+ .build());
+ return sessionJob;
+ }
+
+ private FlinkStateSnapshot createStateSnapshot() {
+ var snapshot = new FlinkStateSnapshot();
+ var meta = new ObjectMeta();
+ meta.setName("test-snapshot");
+ meta.setNamespace("default");
+ snapshot.setMetadata(meta);
+ snapshot.setSpec(new FlinkStateSnapshotSpec());
+ return snapshot;
+ }
+
+ private FlinkStateSnapshot createStateSnapshotWithJobRef(JobKind kind,
String name) {
+ var snapshot = new FlinkStateSnapshot();
+ var meta = new ObjectMeta();
+ meta.setName("test-snapshot");
+ meta.setNamespace("default");
+ snapshot.setMetadata(meta);
+ var spec = new FlinkStateSnapshotSpec();
+
spec.setJobReference(JobReference.builder().kind(kind).name(name).build());
+ snapshot.setSpec(spec);
+ return snapshot;
+ }
+
+ private FlinkStateSnapshot
createStateSnapshotWithJobRefNoNamespace(JobKind kind, String name) {
+ var snapshot = new FlinkStateSnapshot();
+ var meta = new ObjectMeta();
+ meta.setName("test-snapshot");
+ snapshot.setMetadata(meta);
+ var spec = new FlinkStateSnapshotSpec();
+
spec.setJobReference(JobReference.builder().kind(kind).name(name).build());
+ snapshot.setSpec(spec);
+ return snapshot;
+ }
+}
diff --git
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManagerTest.java
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManagerTest.java
index 924ed406..1f167fe2 100644
---
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManagerTest.java
+++
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManagerTest.java
@@ -56,6 +56,61 @@ public class InformerManagerTest {
Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns3"));
}
+ @Test
+ public void testSessionJobNamespacedInformerCreated() {
+ var informerManager = new InformerManager(kubernetesClient);
+ informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns1"));
+
+ informerManager.setNamespaces(Set.of("ns1", "ns2"));
+
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns1"));
+
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns2"));
+
+ informerManager.setNamespaces(Set.of("ns1", "ns2", "ns3"));
+
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns1"));
+
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns2"));
+
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns3"));
+ }
+
+ @Test
+ public void testSetNamespacesStopsSessionJobInformers() {
+ var informerManager = new InformerManager(kubernetesClient);
+ informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+
+ // Initialize session job informers by calling
getFlinkSessionJobInformer
+
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns1"));
+
+ // Change namespaces — this should stop existing session job informers
and reset them
+ informerManager.setNamespaces(Set.of("ns1", "ns2"));
+
+ // Verify new informers are created for the updated namespaces
+
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns1"));
+
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns2"));
+ }
+
+ @Test
+ public void testSetNamespacesStopsBothDepAndSessionJobInformers() {
+ var informerManager = new InformerManager(kubernetesClient);
+ informerManager.setNamespaces(Set.of("ns1"));
+
+ // Initialize both types of informers
+ Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1"));
+
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns1"));
+
+ // Change namespaces — should stop and recreate both
+ informerManager.setNamespaces(Set.of("ns2"));
+
+ Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns2"));
+
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns2"));
+
+ // Old namespace should no longer have informers
+ Assertions.assertThrows(
+ NullPointerException.class, () ->
informerManager.getFlinkDepInformer("ns1"));
+ Assertions.assertThrows(
+ NullPointerException.class,
+ () -> informerManager.getFlinkSessionJobInformer("ns1"));
+ }
+
@Test
public void testDynamicNamespaces() {
InformerManager informerManager = new
InformerManager(kubernetesClient);
diff --git
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutatorTest.java
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutatorTest.java
new file mode 100644
index 00000000..16caa339
--- /dev/null
+++
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutatorTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.admission.mutator;
+
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.admission.v1.AdmissionRequest;
+import io.javaoperatorsdk.webhook.admission.NotAllowedException;
+import io.javaoperatorsdk.webhook.admission.Operation;
+import io.javaoperatorsdk.webhook.admission.mutation.Mutator;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link DefaultRequestMutator}. */
+class DefaultRequestMutatorTest {
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ @Test
+ void handleWithNoMutationProducesEmptyPatch() {
+ var requestMutator = createRequestMutator((resource, operation) ->
resource);
+ var request = createAdmissionRequest(createDeployment());
+
+ var response = requestMutator.handle(request);
+
+ assertTrue(response.getAllowed());
+ assertEquals("JSONPatch", response.getPatchType());
+ var patch = decodePatch(response.getPatch());
+ assertEquals("[]", patch, "No-op mutation should produce an empty JSON
patch array");
+ }
+
+ @Test
+ void handleWithLabelMutationProducesPatchWithAddOperation() throws
Exception {
+ var requestMutator =
+ createRequestMutator(
+ (resource, operation) -> {
+ var labels = resource.getMetadata().getLabels();
+ if (labels == null) {
+ labels = new HashMap<>();
+ }
+ labels.put("injected", "true");
+ resource.getMetadata().setLabels(labels);
+ return resource;
+ });
+ var request = createAdmissionRequest(createDeployment());
+
+ var response = requestMutator.handle(request);
+
+ assertTrue(response.getAllowed());
+ var patch = decodePatch(response.getPatch());
+ var patchNodes = mapper.readTree(patch);
+ assertFalse(patchNodes.isEmpty(), "Mutation should produce at least
one patch operation");
+ assertTrue(patch.contains("injected"));
+ }
+
+ @Test
+ void handleWithSpecMutationProducesPatchWithReplaceOperation() {
+ var requestMutator =
+ createRequestMutator(
+ (resource, operation) -> {
+ ((FlinkDeployment)
resource).getSpec().setImage("mutated:latest");
+ return resource;
+ });
+ var deployment = createDeployment();
+ deployment.getSpec().setImage("original:1.0");
+ var request = createAdmissionRequest(deployment);
+
+ var response = requestMutator.handle(request);
+
+ assertTrue(response.getAllowed());
+
assertTrue(decodePatch(response.getPatch()).contains("mutated:latest"));
+ }
+
+ @Test
+ void handleWithNotAllowedExceptionReturnsNotAllowedResponse() {
+ var requestMutator =
+ createRequestMutator(
+ (resource, operation) -> {
+ throw new NotAllowedException("rejected for
testing");
+ });
+ var request = createAdmissionRequest(createDeployment());
+
+ var response = requestMutator.handle(request);
+
+ assertFalse(response.getAllowed());
+ assertNotNull(response.getStatus());
+ assertTrue(response.getStatus().getMessage().contains("rejected for
testing"));
+ }
+
+ private FlinkDeployment createDeployment() {
+ var deployment = new FlinkDeployment();
+ var meta = new ObjectMeta();
+ meta.setName("test-deployment");
+ meta.setNamespace("default");
+ deployment.setMetadata(meta);
+ deployment.setSpec(new FlinkDeploymentSpec());
+ return deployment;
+ }
+
+ private AdmissionRequest createAdmissionRequest(FlinkDeployment resource) {
+ var request = new AdmissionRequest();
+ request.setOperation(Operation.CREATE.name());
+ request.setObject(resource);
+ return request;
+ }
+
+ private String decodePatch(String base64Patch) {
+ return new String(Base64.getDecoder().decode(base64Patch),
StandardCharsets.UTF_8);
+ }
+
+ private DefaultRequestMutator<HasMetadata>
createRequestMutator(Mutator<HasMetadata> mutator) {
+ return new DefaultRequestMutator<>(mutator);
+ }
+}
diff --git
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutatorTest.java
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutatorTest.java
new file mode 100644
index 00000000..84c65b82
--- /dev/null
+++
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutatorTest.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.admission.mutator;
+
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
+import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.mutator.DefaultFlinkMutator;
+import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.javaoperatorsdk.webhook.admission.Operation;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.Set;
+
+import static
io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Tests for {@link FlinkMutator}. */
+@EnableKubernetesMockClient(crud = true)
+class FlinkMutatorTest {
+
+ private KubernetesClient kubernetesClient;
+ private InformerManager informerManager;
+ private FlinkMutator mutator;
+
+ @BeforeEach
+ void setup() {
+ informerManager = new InformerManager(kubernetesClient);
+ informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+ mutator = new FlinkMutator(Set.of(new DefaultFlinkMutator()),
informerManager);
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE"})
+ void deploymentNoMutationReturnsSameInstance(Operation operation) {
+ var deployment = createDeployment();
+
+ var result = mutator.mutate(deployment, operation);
+
+ assertSame(
+ deployment,
+ result,
+ "Should return the original resource when no mutation is
applied");
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE"})
+ void sessionJobWithoutLabelReturnsMutatedInstance(Operation operation) {
+ var sessionJob = createSessionJob();
+
+ var result = mutator.mutate(sessionJob, operation);
+
+ assertNotSame(sessionJob, result, "Should return a mutated instance
when label is added");
+ assertInstanceOf(FlinkSessionJob.class, result);
+ assertEquals(
+ "test-deployment",
+
result.getMetadata().getLabels().get(CrdConstants.LABEL_TARGET_SESSION));
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE"})
+ void sessionJobWithCorrectLabelReturnsSameInstance(Operation operation) {
+ var sessionJob = createSessionJob();
+ var labels = new HashMap<String, String>();
+ labels.put(CrdConstants.LABEL_TARGET_SESSION, "test-deployment");
+ sessionJob.getMetadata().setLabels(labels);
+
+ var result = mutator.mutate(sessionJob, operation);
+
+ assertSame(
+ sessionJob,
+ result,
+ "Should return the original resource when the label is already
correct");
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE"})
+ void sessionJobWithWrongLabelReturnsMutatedInstance(Operation operation) {
+ var sessionJob = createSessionJob();
+ var labels = new HashMap<String, String>();
+ labels.put(CrdConstants.LABEL_TARGET_SESSION, "wrong-session");
+ sessionJob.getMetadata().setLabels(labels);
+
+ var result = mutator.mutate(sessionJob, operation);
+
+ assertNotSame(sessionJob, result);
+ assertInstanceOf(FlinkSessionJob.class, result);
+ assertEquals(
+ "test-deployment",
+
result.getMetadata().getLabels().get(CrdConstants.LABEL_TARGET_SESSION));
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE"})
+ void stateSnapshotNoMutationReturnsSameInstance(Operation operation) {
+ var snapshot = createStateSnapshot();
+
+ var result = mutator.mutate(snapshot, operation);
+
+ assertSame(
+ snapshot,
+ result,
+ "Should return the original resource when no mutation is
applied");
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE"},
+ mode = EnumSource.Mode.EXCLUDE)
+ void nonMutatingOperationsReturnOriginalResource(Operation operation) {
+ var deployment = createDeployment();
+ var sessionJob = createSessionJob();
+ var snapshot = createStateSnapshot();
+
+ assertSame(deployment, mutator.mutate(deployment, operation));
+ assertSame(sessionJob, mutator.mutate(sessionJob, operation));
+ assertSame(snapshot, mutator.mutate(snapshot, operation));
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE"})
+ void deploymentMutatorExceptionIsWrappedInRuntimeException(Operation
operation) {
+ var failingMutator =
+ new FlinkMutator(
+ Set.of(
+ new DefaultFlinkMutator() {
+ @Override
+ public FlinkDeployment mutateDeployment(
+ FlinkDeployment deployment) {
+ throw new
IllegalStateException("deployment error");
+ }
+ }),
+ informerManager);
+ var deployment = createDeployment();
+
+ var exception =
+ assertThrows(
+ RuntimeException.class, () ->
failingMutator.mutate(deployment, operation));
+ assertInstanceOf(IllegalStateException.class, exception.getCause());
+ assertEquals("deployment error", exception.getCause().getMessage());
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE"})
+ void sessionJobMutatorExceptionIsWrappedInRuntimeException(Operation
operation) {
+ var failingMutator =
+ new FlinkMutator(
+ Set.of(
+ new DefaultFlinkMutator() {
+ @Override
+ public FlinkSessionJob mutateSessionJob(
+ FlinkSessionJob sessionJob,
+ Optional<FlinkDeployment> session)
{
+ throw new
IllegalStateException("session job error");
+ }
+ }),
+ informerManager);
+ var sessionJob = createSessionJob();
+
+ var exception =
+ assertThrows(
+ RuntimeException.class, () ->
failingMutator.mutate(sessionJob, operation));
+ assertInstanceOf(IllegalStateException.class, exception.getCause());
+ assertEquals("session job error", exception.getCause().getMessage());
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE"})
+ void stateSnapshotMutatorExceptionIsWrappedInRuntimeException(Operation
operation) {
+ var failingMutator =
+ new FlinkMutator(
+ Set.of(
+ new DefaultFlinkMutator() {
+ @Override
+ public FlinkStateSnapshot
mutateStateSnapshot(
+ FlinkStateSnapshot stateSnapshot) {
+ throw new
IllegalStateException("snapshot error");
+ }
+ }),
+ informerManager);
+ var snapshot = createStateSnapshot();
+
+ var exception =
+ assertThrows(
+ RuntimeException.class, () ->
failingMutator.mutate(snapshot, operation));
+ assertInstanceOf(IllegalStateException.class, exception.getCause());
+ assertEquals("snapshot error", exception.getCause().getMessage());
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE"})
+ void customLabelMutatorOnDeploymentReturnsMutatedInstance(Operation
operation) {
+ var labelMutator = createMutatorWith(new LabelInjectingMutator());
+ var deployment = createDeployment();
+
+ var result = labelMutator.mutate(deployment, operation);
+
+ assertNotSame(deployment, result);
+ assertInstanceOf(FlinkDeployment.class, result);
+ assertEquals("injected",
result.getMetadata().getLabels().get("custom-env"));
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE"})
+ void
customLabelMutatorOnDeploymentWithMatchingLabelReturnsSameInstance(Operation
operation) {
+ var labelMutator = createMutatorWith(new LabelInjectingMutator());
+ var deployment = createDeployment();
+ var labels = new HashMap<String, String>();
+ labels.put("custom-env", "injected");
+ deployment.getMetadata().setLabels(labels);
+
+ var result = labelMutator.mutate(deployment, operation);
+
+ assertSame(deployment, result);
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE"})
+ void customLabelMutatorOnSnapshotReturnsMutatedInstance(Operation
operation) {
+ var labelMutator = createMutatorWith(new LabelInjectingMutator());
+ var snapshot = createStateSnapshot();
+
+ var result = labelMutator.mutate(snapshot, operation);
+
+ assertNotSame(snapshot, result);
+ assertInstanceOf(FlinkStateSnapshot.class, result);
+ assertEquals("injected",
result.getMetadata().getLabels().get("custom-env"));
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE"})
+ void
customLabelMutatorOnSnapshotWithMatchingLabelReturnsSameInstance(Operation
operation) {
+ var labelMutator = createMutatorWith(new LabelInjectingMutator());
+ var snapshot = createStateSnapshot();
+ var labels = new HashMap<String, String>();
+ labels.put("custom-env", "injected");
+ snapshot.getMetadata().setLabels(labels);
+
+ var result = labelMutator.mutate(snapshot, operation);
+
+ assertSame(snapshot, result);
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE"})
+ void customSpecMutatorOnDeploymentReturnsMutatedInstance(Operation
operation) {
+ var specMutator = createMutatorWith(new SpecModifyingMutator());
+ var deployment = createDeployment();
+
+ var result = specMutator.mutate(deployment, operation);
+
+ assertNotSame(deployment, result);
+ assertInstanceOf(FlinkDeployment.class, result);
+ assertEquals("mutated:latest", ((FlinkDeployment)
result).getSpec().getImage());
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE"})
+ void customSpecMutatorOnSessionJobReturnsMutatedInstance(Operation
operation) {
+ var specMutator = createMutatorWith(new SpecModifyingMutator());
+ var sessionJob = createSessionJob();
+
+ var result = specMutator.mutate(sessionJob, operation);
+
+ assertNotSame(sessionJob, result);
+ assertInstanceOf(FlinkSessionJob.class, result);
+ assertEquals(
+ "mutated-deployment", ((FlinkSessionJob)
result).getSpec().getDeploymentName());
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = Operation.class,
+ names = {"CREATE", "UPDATE"})
+ void customSpecMutatorOnSnapshotReturnsMutatedInstance(Operation
operation) {
+ var specMutator = createMutatorWith(new SpecModifyingMutator());
+ var snapshot = createStateSnapshot();
+
+ var result = specMutator.mutate(snapshot, operation);
+
+ assertNotSame(snapshot, result);
+ assertInstanceOf(FlinkStateSnapshot.class, result);
+ assertEquals(5, ((FlinkStateSnapshot)
result).getSpec().getBackoffLimit());
+ }
+
+ private FlinkMutator createMutatorWith(FlinkResourceMutator
resourceMutator) {
+ return new FlinkMutator(Set.of(resourceMutator), informerManager);
+ }
+
+ private FlinkDeployment createDeployment() {
+ var deployment = new FlinkDeployment();
+ var meta = new ObjectMeta();
+ meta.setName("test-deployment");
+ meta.setNamespace("default");
+ deployment.setMetadata(meta);
+ deployment.setSpec(new FlinkDeploymentSpec());
+ return deployment;
+ }
+
+ private FlinkSessionJob createSessionJob() {
+ var sessionJob = new FlinkSessionJob();
+ var meta = new ObjectMeta();
+ meta.setName("test-job");
+ meta.setNamespace("default");
+ sessionJob.setMetadata(meta);
+ sessionJob.setSpec(
+ FlinkSessionJobSpec.builder()
+
.job(JobSpec.builder().jarURI("http://test-job.jar").build())
+ .deploymentName("test-deployment")
+ .build());
+ return sessionJob;
+ }
+
+ private FlinkStateSnapshot createStateSnapshot() {
+ var snapshot = new FlinkStateSnapshot();
+ var meta = new ObjectMeta();
+ meta.setName("test-snapshot");
+ meta.setNamespace("default");
+ snapshot.setMetadata(meta);
+ snapshot.setSpec(new FlinkStateSnapshotSpec());
+ return snapshot;
+ }
+
+ /**
+ * A custom mutator that injects a {@code custom-env=injected} label on
FlinkDeployment and
+ * FlinkStateSnapshot if not already present. Mirrors the pattern of
DefaultFlinkMutator's
+ * target-session label on FlinkSessionJob.
+ */
+ private static class LabelInjectingMutator implements FlinkResourceMutator
{
+ @Override
+ public FlinkDeployment mutateDeployment(FlinkDeployment deployment) {
+ addLabelIfMissing(deployment.getMetadata());
+ return deployment;
+ }
+
+ @Override
+ public FlinkSessionJob mutateSessionJob(
+ FlinkSessionJob sessionJob, Optional<FlinkDeployment> session)
{
+ return sessionJob;
+ }
+
+ @Override
+ public FlinkStateSnapshot mutateStateSnapshot(FlinkStateSnapshot
stateSnapshot) {
+ addLabelIfMissing(stateSnapshot.getMetadata());
+ return stateSnapshot;
+ }
+
+ private void addLabelIfMissing(ObjectMeta meta) {
+ var labels = meta.getLabels();
+ if (labels == null) {
+ labels = new HashMap<>();
+ }
+ if (!"injected".equals(labels.get("custom-env"))) {
+ labels.put("custom-env", "injected");
+ meta.setLabels(labels);
+ }
+ }
+ }
+
+ /**
+ * A custom mutator that modifies the spec of every CRD to verify that
spec-level changes are
+ * always detected by the before/after tree comparison.
+ */
+ private static class SpecModifyingMutator implements FlinkResourceMutator {
+ @Override
+ public FlinkDeployment mutateDeployment(FlinkDeployment deployment) {
+ deployment.getSpec().setImage("mutated:latest");
+ return deployment;
+ }
+
+ @Override
+ public FlinkSessionJob mutateSessionJob(
+ FlinkSessionJob sessionJob, Optional<FlinkDeployment> session)
{
+ sessionJob.getSpec().setDeploymentName("mutated-deployment");
+ return sessionJob;
+ }
+
+ @Override
+ public FlinkStateSnapshot mutateStateSnapshot(FlinkStateSnapshot
stateSnapshot) {
+ stateSnapshot.getSpec().setBackoffLimit(5);
+ return stateSnapshot;
+ }
+ }
+}
diff --git
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/mutator/TestMutator.java
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/mutator/TestMutator.java
new file mode 100644
index 00000000..88959ee3
--- /dev/null
+++
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/mutator/TestMutator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.mutator;
+
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+
+import java.util.Optional;
+
+/** Test mutator implementation of {@link FlinkResourceMutator}. */
+public class TestMutator implements FlinkResourceMutator {
+
+ @Override
+ public FlinkDeployment mutateDeployment(FlinkDeployment deployment) {
+ return deployment;
+ }
+
+ @Override
+ public FlinkSessionJob mutateSessionJob(
+ FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) {
+ return sessionJob;
+ }
+
+ @Override
+ public FlinkStateSnapshot mutateStateSnapshot(FlinkStateSnapshot
stateSnapshot) {
+ return stateSnapshot;
+ }
+}
diff --git
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/validation/TestValidator.java
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/validation/TestValidator.java
new file mode 100644
index 00000000..2a461b0c
--- /dev/null
+++
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/validation/TestValidator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.validation;
+
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+
+import java.util.Optional;
+
+/** Test validator implementation of {@link FlinkResourceValidator}. */
+public class TestValidator implements FlinkResourceValidator {
+
+ @Override
+ public Optional<String> validateDeployment(FlinkDeployment deployment) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<String> validateSessionJob(
+ FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<String> validateStateSnapshot(
+ FlinkStateSnapshot savepoint, Optional<AbstractFlinkResource<?,
?>> target) {
+ return Optional.empty();
+ }
+}
diff --git
a/flink-kubernetes-webhook/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator
b/flink-kubernetes-webhook/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator
new file mode 100644
index 00000000..467475ab
--- /dev/null
+++
b/flink-kubernetes-webhook/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.kubernetes.operator.mutator.TestMutator
+
diff --git
a/flink-kubernetes-webhook/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator
b/flink-kubernetes-webhook/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator
new file mode 100644
index 00000000..2dec4639
--- /dev/null
+++
b/flink-kubernetes-webhook/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.kubernetes.operator.validation.TestValidator
+