This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new c6769c7a [FLINK-39508][flink-kubernetes-webhook] Fix 
flink-kubernetes-webhook module tests
c6769c7a is described below

commit c6769c7a4243d7cfb8e116d8d476e7321aecb9b5
Author: Dennis-Mircea Ciupitu <[email protected]>
AuthorDate: Mon Apr 27 08:55:26 2026 +0300

    [FLINK-39508][flink-kubernetes-webhook] Fix flink-kubernetes-webhook module 
tests
---
 flink-kubernetes-webhook/pom.xml                   |  38 +-
 .../operator/admission/FlinkOperatorWebhook.java   |  50 +-
 .../admission/mutator/DefaultRequestMutator.java   |   2 +-
 .../src/test/assembly/test-plugins-assembly.xml    |   3 +-
 .../admission/FlinkOperatorWebhookTest.java        | 501 +++++++++++++++++++++
 .../operator/admission/FlinkValidatorTest.java     | 352 +++++++++++++++
 .../admission/informer/InformerManagerTest.java    |  55 +++
 .../mutator/DefaultRequestMutatorTest.java         | 141 ++++++
 .../admission/mutator/FlinkMutatorTest.java        | 446 ++++++++++++++++++
 .../kubernetes/operator/mutator/TestMutator.java   |  44 ++
 .../operator/validation/TestValidator.java         |  46 ++
 ...ubernetes.operator.mutator.FlinkResourceMutator |  17 +
 ...etes.operator.validation.FlinkResourceValidator |  17 +
 13 files changed, 1689 insertions(+), 23 deletions(-)

diff --git a/flink-kubernetes-webhook/pom.xml b/flink-kubernetes-webhook/pom.xml
index ed1f60d1..e5cc0e4d 100644
--- a/flink-kubernetes-webhook/pom.xml
+++ b/flink-kubernetes-webhook/pom.xml
@@ -32,10 +32,13 @@ under the License.
     <packaging>jar</packaging>
 
     <properties>
-        <surefire-plugin.version>2.22.2</surefire-plugin.version>
+        <plugins.tmp.dir>${project.build.directory}/plugins</plugins.tmp.dir>
+        <surefire.module.config>
+            <!-- required by FlinkOperatorWebhookTest -->
+            --add-opens=java.base/java.util=ALL-UNNAMED
+        </surefire.module.config>
     </properties>
 
-
     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
@@ -84,11 +87,37 @@ under the License.
             <version>${okhttp.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
 
         <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>create-test-plugin-jar</id>
+                        <phase>process-test-classes</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <configuration>
+                            <finalName>test-plugins</finalName>
+                            <attach>false</attach>
+                            <descriptors>
+                                
<descriptor>src/test/assembly/test-plugins-assembly.xml</descriptor>
+                            </descriptors>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
@@ -116,11 +145,6 @@ under the License.
                     </execution>
                 </executions>
             </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <version>${surefire-plugin.version}</version>
-            </plugin>
         </plugins>
     </build>
 </project>
diff --git 
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
 
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
index 196f6b40..c616372f 100644
--- 
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
+++ 
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.admission;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
 import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
@@ -45,6 +46,8 @@ import io.fabric8.kubernetes.client.KubernetesClientBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.file.Path;
@@ -58,26 +61,38 @@ public class FlinkOperatorWebhook {
 
     private static FileSystemWatchService fileSystemWatchService;
 
-    public static void main(String[] args) throws Exception {
-        EnvUtils.logEnvironmentInfo(LOG, "Flink Kubernetes Webhook", args);
-        var informerManager = new InformerManager(new 
KubernetesClientBuilder().build());
-        var configManager =
-                new FlinkConfigManager(
-                        informerManager::setNamespaces,
-                        
KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class));
+    @VisibleForTesting final Set<FlinkResourceValidator> validators;
+    @VisibleForTesting final Set<FlinkResourceMutator> mutators;
+    @VisibleForTesting final AdmissionHandler admissionHandler;
+
+    @VisibleForTesting
+    FlinkOperatorWebhook(
+            @Nullable InformerManager informerManager, @Nullable 
FlinkConfigManager configManager) {
+        if (informerManager == null) {
+            informerManager = new InformerManager(new 
KubernetesClientBuilder().build());
+        }
+        if (configManager == null) {
+            configManager =
+                    new FlinkConfigManager(
+                            informerManager::setNamespaces,
+                            
KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class));
+        }
+
         var operatorConfig = configManager.getOperatorConfiguration();
         if (!operatorConfig.isDynamicNamespacesEnabled()) {
             
informerManager.setNamespaces(operatorConfig.getWatchedNamespaces());
         }
-        Set<FlinkResourceValidator> validators = 
ValidatorUtils.discoverValidators(configManager);
-        Set<FlinkResourceMutator> mutators = 
MutatorUtils.discoverMutators(configManager);
 
-        AdmissionHandler endpoint =
+        this.validators = ValidatorUtils.discoverValidators(configManager);
+        this.mutators = MutatorUtils.discoverMutators(configManager);
+        this.admissionHandler =
                 new AdmissionHandler(
                         new FlinkValidator(validators, informerManager),
                         new FlinkMutator(mutators, informerManager));
+    }
 
-        ChannelInitializer<SocketChannel> initializer = 
createChannelInitializer(endpoint);
+    public void run() throws Exception {
+        ChannelInitializer<SocketChannel> initializer = 
createChannelInitializer(admissionHandler);
         NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
         NioEventLoopGroup workerGroup = new NioEventLoopGroup();
         try {
@@ -103,12 +118,18 @@ public class FlinkOperatorWebhook {
         }
     }
 
+    public static void main(String[] args) throws Exception {
+        EnvUtils.logEnvironmentInfo(LOG, "Flink Kubernetes Webhook", args);
+        new FlinkOperatorWebhook(null, null).run();
+    }
+
     private static int getPort() {
         String portString = 
EnvUtils.getRequired(EnvUtils.ENV_WEBHOOK_SERVER_PORT);
         return Integer.parseInt(portString);
     }
 
-    private static ChannelInitializer<SocketChannel> createChannelInitializer(
+    @VisibleForTesting
+    static ChannelInitializer<SocketChannel> createChannelInitializer(
             AdmissionHandler admissionHandler) throws Exception {
         SslContext sslContext = createSslContext();
 
@@ -150,7 +171,7 @@ public class FlinkOperatorWebhook {
         stopFileSystemWatchService();
         final String realKeystoreFileName =
                 
Path.of(keystorePathOpt.get()).toRealPath().getFileName().toString();
-        LOG.info("Keystore path is resolved to real filename: " + 
realKeystoreFileName);
+        LOG.info("Keystore path is resolved to real filename: {}", 
realKeystoreFileName);
         fileSystemWatchService =
                 new 
FileSystemWatchService(Path.of(keystorePathOpt.get()).getParent().toString()) {
                     @Override
@@ -160,7 +181,8 @@ public class FlinkOperatorWebhook {
                             reloadableSslContext.reload();
                             LOG.info("SSL context reloaded successfully");
                         } catch (Exception e) {
-                            LOG.error("SSL context reload received exception: 
" + e);
+                            LOG.error(
+                                    "SSL context reload received exception: 
{}", String.valueOf(e));
                         }
                     }
                 };
diff --git 
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutator.java
 
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutator.java
index 0e2d05f1..1365dafe 100644
--- 
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutator.java
+++ 
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutator.java
@@ -74,7 +74,7 @@ public class DefaultRequestMutator<T extends 
KubernetesResource>
         return admissionResponse;
     }
 
-    public static AdmissionResponse admissionResponseFromMutation(
+    private static AdmissionResponse admissionResponseFromMutation(
             KubernetesResource originalResource, KubernetesResource 
mutatedResource) {
         AdmissionResponse admissionResponse = new AdmissionResponse();
         admissionResponse.setAllowed(true);
diff --git 
a/flink-kubernetes-webhook/src/test/assembly/test-plugins-assembly.xml 
b/flink-kubernetes-webhook/src/test/assembly/test-plugins-assembly.xml
index b40333f1..8f974d93 100644
--- a/flink-kubernetes-webhook/src/test/assembly/test-plugins-assembly.xml
+++ b/flink-kubernetes-webhook/src/test/assembly/test-plugins-assembly.xml
@@ -28,7 +28,8 @@ under the License.
             <outputDirectory>/</outputDirectory>
             <!-- the service impl -->
             <includes>
-                
<include>org/apache/flink/kubernetes/operator/admission/TestMutator.java</include>
+                
<include>org/apache/flink/kubernetes/operator/mutator/TestMutator.class</include>
+                
<include>org/apache/flink/kubernetes/operator/validation/TestValidator.class</include>
             </includes>
         </fileSet>
         <fileSet>
diff --git 
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhookTest.java
 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhookTest.java
new file mode 100644
index 00000000..ba6590a1
--- /dev/null
+++ 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhookTest.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.admission;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
+import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
+import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.mutator.DefaultFlinkMutator;
+import org.apache.flink.kubernetes.operator.mutator.TestMutator;
+import org.apache.flink.kubernetes.operator.utils.EnvUtils;
+import org.apache.flink.kubernetes.operator.validation.DefaultValidator;
+import org.apache.flink.kubernetes.operator.validation.TestValidator;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.fabric8.kubernetes.api.model.GroupVersionKind;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.admission.v1.AdmissionRequest;
+import io.fabric8.kubernetes.api.model.admission.v1.AdmissionReview;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
+import static io.javaoperatorsdk.webhook.admission.Operation.CREATE;
+import static 
org.apache.flink.kubernetes.operator.admission.AdmissionHandler.MUTATOR_REQUEST_PATH;
+import static 
org.apache.flink.kubernetes.operator.admission.AdmissionHandler.VALIDATE_REQUEST_PATH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link FlinkOperatorWebhook}. */
+@EnableKubernetesMockClient(crud = true)
+class FlinkOperatorWebhookTest {
+
+    private static final ObjectMapper mapper = new ObjectMapper();
+    private static final String TEST_PLUGINS = "test-plugins";
+    private static final String PLUGINS_JAR = TEST_PLUGINS + "-test-jar.jar";
+
+    private KubernetesClient kubernetesClient;
+    private Channel serverChannel;
+    private NioEventLoopGroup bossGroup;
+    private NioEventLoopGroup workerGroup;
+    private int port;
+
+    @BeforeEach
+    void setup() throws Exception {
+        var informerManager = new InformerManager(kubernetesClient);
+        informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+        var validator = new FlinkValidator(Set.of(), informerManager);
+        var mutator = new FlinkMutator(Set.of(new DefaultFlinkMutator()), 
informerManager);
+        var admissionHandler = new AdmissionHandler(validator, mutator);
+
+        var initializer = 
FlinkOperatorWebhook.createChannelInitializer(admissionHandler);
+        bossGroup = new NioEventLoopGroup(1);
+        workerGroup = new NioEventLoopGroup();
+        serverChannel =
+                new ServerBootstrap()
+                        .group(bossGroup, workerGroup)
+                        .channel(NioServerSocketChannel.class)
+                        .childHandler(initializer)
+                        .bind(0)
+                        .sync()
+                        .channel();
+        port = ((InetSocketAddress) serverChannel.localAddress()).getPort();
+    }
+
+    @AfterEach
+    void teardown() {
+        if (serverChannel != null) {
+            serverChannel.close();
+        }
+        if (bossGroup != null) {
+            bossGroup.shutdownGracefully();
+        }
+        if (workerGroup != null) {
+            workerGroup.shutdownGracefully();
+        }
+    }
+
+    @Test
+    void validateEndpointAcceptsFlinkDeployment() throws Exception {
+        var review = createAdmissionReview(createDeployment());
+
+        var response = sendRequest(VALIDATE_REQUEST_PATH, review);
+
+        assertEquals(200, response.statusCode());
+        var responseReview = mapper.readValue(response.body(), 
AdmissionReview.class);
+        assertTrue(responseReview.getResponse().getAllowed());
+    }
+
+    @Test
+    void mutateEndpointAddsTargetSessionLabelToSessionJob() throws Exception {
+        var review = createAdmissionReview(createSessionJob());
+
+        var response = sendRequest(MUTATOR_REQUEST_PATH, review);
+
+        assertEquals(200, response.statusCode());
+        var responseReview = mapper.readValue(response.body(), 
AdmissionReview.class);
+        assertTrue(responseReview.getResponse().getAllowed());
+        var patch = new 
String(Base64.getDecoder().decode(responseReview.getResponse().getPatch()));
+        assertTrue(
+                patch.contains(CrdConstants.LABEL_TARGET_SESSION),
+                "Patch should contain the target-session label");
+    }
+
+    @Test
+    void mutateEndpointProducesEmptyPatchForNoOpDeployment() throws Exception {
+        var review = createAdmissionReview(createDeployment());
+
+        var response = sendRequest(MUTATOR_REQUEST_PATH, review);
+
+        assertEquals(200, response.statusCode());
+        var responseReview = mapper.readValue(response.body(), 
AdmissionReview.class);
+        assertTrue(responseReview.getResponse().getAllowed());
+        var patch = new 
String(Base64.getDecoder().decode(responseReview.getResponse().getPatch()));
+        assertEquals("[]", patch, "No-op mutation should produce an empty JSON 
patch");
+    }
+
+    @Test
+    void illegalPathReturnsInternalServerError() throws Exception {
+        var review = createAdmissionReview(createDeployment());
+
+        var response = sendRequest("/illegal-path", review);
+
+        assertEquals(500, response.statusCode());
+    }
+
+    @Test
+    void malformedBodyReturnsInternalServerError() throws IOException, 
InterruptedException {
+        var request =
+                HttpRequest.newBuilder()
+                        .uri(URI.create("http://localhost:"; + port + 
VALIDATE_REQUEST_PATH))
+                        .POST(HttpRequest.BodyPublishers.ofString("not-json"))
+                        .build();
+
+        var response =
+                HttpClient.newHttpClient().send(request, 
HttpResponse.BodyHandlers.ofString());
+
+        assertEquals(500, response.statusCode());
+    }
+
+    @Test
+    void serverDoesNotUseSslWhenNoKeystoreConfigured() throws Exception {
+        var review = createAdmissionReview(createDeployment());
+
+        var response = sendRequest(VALIDATE_REQUEST_PATH, review);
+
+        assertEquals(200, response.statusCode(), "Plain HTTP should work 
without SSL configured");
+    }
+
+    @Test
+    void webhookDiscoversCustomValidatorFromPlugins(@TempDir Path 
temporaryFolder)
+            throws Exception {
+        Map<String, String> originalEnv = System.getenv();
+        try {
+            Map<String, String> systemEnv = new HashMap<>(originalEnv);
+            systemEnv.put(
+                    ConfigConstants.ENV_FLINK_PLUGINS_DIR, 
getTestPluginsRootDir(temporaryFolder));
+            setEnv(systemEnv);
+
+            var informerManager = new InformerManager(kubernetesClient);
+            informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+            var configManager = new FlinkConfigManager(new Configuration());
+
+            var webhook = new FlinkOperatorWebhook(informerManager, 
configManager);
+
+            assertEquals(
+                    new HashSet<>(
+                            Arrays.asList(
+                                    DefaultValidator.class.getName(),
+                                    TestValidator.class.getName())),
+                    webhook.validators.stream()
+                            .map(v -> v.getClass().getName())
+                            .collect(Collectors.toSet()),
+                    "Should discover both DefaultValidator and TestValidator 
from plugins");
+        } finally {
+            setEnv(originalEnv);
+        }
+    }
+
+    @Test
+    void webhookDiscoversCustomMutatorFromPlugins(@TempDir Path 
temporaryFolder) throws Exception {
+        Map<String, String> originalEnv = System.getenv();
+        try {
+            Map<String, String> systemEnv = new HashMap<>(originalEnv);
+            systemEnv.put(
+                    ConfigConstants.ENV_FLINK_PLUGINS_DIR, 
getTestPluginsRootDir(temporaryFolder));
+            setEnv(systemEnv);
+
+            var informerManager = new InformerManager(kubernetesClient);
+            informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+            var configManager = new FlinkConfigManager(new Configuration());
+
+            var webhook = new FlinkOperatorWebhook(informerManager, 
configManager);
+
+            assertEquals(
+                    new HashSet<>(
+                            Arrays.asList(
+                                    DefaultFlinkMutator.class.getName(),
+                                    TestMutator.class.getName())),
+                    webhook.mutators.stream()
+                            .map(m -> m.getClass().getName())
+                            .collect(Collectors.toSet()),
+                    "Should discover both DefaultFlinkMutator and TestMutator 
from plugins");
+        } finally {
+            setEnv(originalEnv);
+        }
+    }
+
+    @Test
+    void createChannelInitializerWithSslContext(@TempDir Path temporaryFolder) 
throws Exception {
+        Path keystorePath = createTestKeystore(temporaryFolder);
+        Map<String, String> originalEnv = System.getenv();
+        try {
+            Map<String, String> systemEnv = new HashMap<>(originalEnv);
+            systemEnv.put(EnvUtils.ENV_WEBHOOK_KEYSTORE_FILE, 
keystorePath.toString());
+            systemEnv.put(EnvUtils.ENV_WEBHOOK_KEYSTORE_TYPE, "PKCS12");
+            systemEnv.put(EnvUtils.ENV_WEBHOOK_KEYSTORE_PASSWORD, "testpass");
+            setEnv(systemEnv);
+
+            var informerManager = new InformerManager(kubernetesClient);
+            informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+            var validator = new FlinkValidator(Set.of(), informerManager);
+            var mutator = new FlinkMutator(Set.of(new DefaultFlinkMutator()), 
informerManager);
+            var admissionHandler = new AdmissionHandler(validator, mutator);
+
+            var initializer = 
FlinkOperatorWebhook.createChannelInitializer(admissionHandler);
+            assertNotNull(initializer, "Channel initializer with SSL should 
not be null");
+
+            // Start a server with SSL and verify HTTPS works
+            NioEventLoopGroup sslBossGroup = new NioEventLoopGroup(1);
+            NioEventLoopGroup sslWorkerGroup = new NioEventLoopGroup();
+            try {
+                Channel sslChannel =
+                        new ServerBootstrap()
+                                .group(sslBossGroup, sslWorkerGroup)
+                                .channel(NioServerSocketChannel.class)
+                                .childHandler(initializer)
+                                .bind(0)
+                                .sync()
+                                .channel();
+                int sslPort = ((InetSocketAddress) 
sslChannel.localAddress()).getPort();
+
+                var review = createAdmissionReview(createDeployment());
+                var body = mapper.writeValueAsString(review);
+                var request =
+                        HttpRequest.newBuilder()
+                                .uri(
+                                        URI.create(
+                                                "https://localhost:";
+                                                        + sslPort
+                                                        + 
VALIDATE_REQUEST_PATH))
+                                
.POST(HttpRequest.BodyPublishers.ofString(body))
+                                .header("Content-Type", "application/json")
+                                .build();
+
+                var trustAllContext = createTrustAllSslContext();
+                var response =
+                        HttpClient.newBuilder()
+                                .sslContext(trustAllContext)
+                                .build()
+                                .send(request, 
HttpResponse.BodyHandlers.ofString());
+
+                assertEquals(
+                        200,
+                        response.statusCode(),
+                        "HTTPS request should succeed with SSL enabled");
+
+                sslChannel.close();
+            } finally {
+                sslBossGroup.shutdownGracefully();
+                sslWorkerGroup.shutdownGracefully();
+            }
+        } finally {
+            setEnv(originalEnv);
+        }
+    }
+
+    @Test
+    void createChannelInitializerWithSslContextMissingKeystoreType(@TempDir 
Path temporaryFolder)
+            throws Exception {
+        Path keystorePath = createTestKeystore(temporaryFolder);
+        Map<String, String> originalEnv = System.getenv();
+        try {
+            Map<String, String> systemEnv = new HashMap<>(originalEnv);
+            systemEnv.put(EnvUtils.ENV_WEBHOOK_KEYSTORE_FILE, 
keystorePath.toString());
+            // Missing KEYSTORE_TYPE and KEYSTORE_PASSWORD
+            systemEnv.remove(EnvUtils.ENV_WEBHOOK_KEYSTORE_TYPE);
+            systemEnv.remove(EnvUtils.ENV_WEBHOOK_KEYSTORE_PASSWORD);
+            setEnv(systemEnv);
+
+            var informerManager = new InformerManager(kubernetesClient);
+            informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+            var validator = new FlinkValidator(Set.of(), informerManager);
+            var mutator = new FlinkMutator(Set.of(new DefaultFlinkMutator()), 
informerManager);
+            var admissionHandler = new AdmissionHandler(validator, mutator);
+
+            assertThrows(
+                    java.util.NoSuchElementException.class,
+                    () -> 
FlinkOperatorWebhook.createChannelInitializer(admissionHandler),
+                    "Should throw when keystore type env var is missing");
+        } finally {
+            setEnv(originalEnv);
+        }
+    }
+
+    private static Path createTestKeystore(Path dir) throws Exception {
+        Path keystorePath = dir.resolve("test-keystore.p12");
+        ProcessBuilder pb =
+                new ProcessBuilder(
+                        "keytool",
+                        "-genkeypair",
+                        "-alias",
+                        "test",
+                        "-keyalg",
+                        "RSA",
+                        "-keysize",
+                        "2048",
+                        "-storetype",
+                        "PKCS12",
+                        "-keystore",
+                        keystorePath.toString(),
+                        "-storepass",
+                        "testpass",
+                        "-dname",
+                        "CN=localhost",
+                        "-validity",
+                        "365");
+        pb.inheritIO();
+        Process process = pb.start();
+        int exitCode = process.waitFor();
+        assertEquals(0, exitCode, "keytool should succeed");
+        assertTrue(keystorePath.toFile().exists(), "Keystore file should be 
created");
+        return keystorePath;
+    }
+
+    private static SSLContext createTrustAllSslContext() throws Exception {
+        TrustManager[] trustAll =
+                new TrustManager[] {
+                    new X509TrustManager() {
+                        public X509Certificate[] getAcceptedIssuers() {
+                            return new X509Certificate[0];
+                        }
+
+                        public void checkClientTrusted(X509Certificate[] 
certs, String authType) {}
+
+                        public void checkServerTrusted(X509Certificate[] 
certs, String authType) {}
+                    }
+                };
+        SSLContext sc = SSLContext.getInstance("TLS");
+        sc.init(null, trustAll, new java.security.SecureRandom());
+        return sc;
+    }
+
+    private static String getTestPluginsRootDir(Path temporaryFolder) throws 
IOException {
+        File testPluginFolder = new File(temporaryFolder.toFile(), 
TEST_PLUGINS);
+        assertTrue(testPluginFolder.mkdirs());
+        File testPluginJar = new File("target", PLUGINS_JAR);
+        assertTrue(
+                testPluginJar.exists(),
+                "Test plugin jar not found at "
+                        + testPluginJar.getAbsolutePath()
+                        + ". Run 'mvn process-test-classes' first.");
+        Files.copy(testPluginJar.toPath(), 
Paths.get(testPluginFolder.toString(), PLUGINS_JAR));
+        return temporaryFolder.toAbsolutePath().toString();
+    }
+
+    @SuppressWarnings({"unchecked", "JavaReflectionMemberAccess"})
+    private static void setEnv(Map<String, String> newEnv) {
+        try {
+            Map<String, String> env = System.getenv();
+            Class<?> clazz = env.getClass();
+            Field field = clazz.getDeclaredField("m");
+            field.setAccessible(true);
+            Map<String, String> map = (Map<String, String>) field.get(env);
+            map.clear();
+            map.putAll(newEnv);
+            Class<?> processEnvironmentClass = 
Class.forName("java.lang.ProcessEnvironment");
+            try {
+                Field theCaseInsensitiveEnvironmentField =
+                        
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
+                theCaseInsensitiveEnvironmentField.setAccessible(true);
+                Map<String, String> ciEnv =
+                        (Map<String, String>) 
theCaseInsensitiveEnvironmentField.get(null);
+                ciEnv.clear();
+                ciEnv.putAll(newEnv);
+            } catch (NoSuchFieldException ignored) {
+            }
+        } catch (Exception e1) {
+            throw new RuntimeException(e1);
+        }
+    }
+
+    private HttpResponse<String> sendRequest(String path, AdmissionReview 
review)
+            throws IOException, InterruptedException {
+        var body = mapper.writeValueAsString(review);
+        var request =
+                HttpRequest.newBuilder()
+                        .uri(URI.create("http://localhost:"; + port + path))
+                        .POST(HttpRequest.BodyPublishers.ofString(body))
+                        .header("Content-Type", "application/json")
+                        .build();
+        return HttpClient.newHttpClient().send(request, 
HttpResponse.BodyHandlers.ofString());
+    }
+
+    private AdmissionReview createAdmissionReview(Object resource) {
+        var admissionRequest = new AdmissionRequest();
+        admissionRequest.setOperation(CREATE.name());
+        admissionRequest.setObject(resource);
+        if (resource instanceof FlinkDeployment fd) {
+            admissionRequest.setKind(
+                    new GroupVersionKind(fd.getGroup(), fd.getVersion(), 
fd.getKind()));
+        } else if (resource instanceof FlinkSessionJob sj) {
+            admissionRequest.setKind(
+                    new GroupVersionKind(sj.getGroup(), sj.getVersion(), 
sj.getKind()));
+        }
+        var review = new AdmissionReview();
+        review.setRequest(admissionRequest);
+        return review;
+    }
+
+    private FlinkDeployment createDeployment() {
+        var deployment = new FlinkDeployment();
+        var meta = new ObjectMeta();
+        meta.setName("test-deployment");
+        meta.setNamespace("default");
+        deployment.setMetadata(meta);
+        deployment.setSpec(new FlinkDeploymentSpec());
+        return deployment;
+    }
+
+    private FlinkSessionJob createSessionJob() {
+        var sessionJob = new FlinkSessionJob();
+        var meta = new ObjectMeta();
+        meta.setName("test-job");
+        meta.setNamespace("default");
+        sessionJob.setMetadata(meta);
+        sessionJob.setSpec(
+                FlinkSessionJobSpec.builder()
+                        
.job(JobSpec.builder().jarURI("http://test-job.jar";).build())
+                        .deploymentName("test-deployment")
+                        .build());
+        return sessionJob;
+    }
+}
diff --git 
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/FlinkValidatorTest.java
 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/FlinkValidatorTest.java
new file mode 100644
index 00000000..d8311227
--- /dev/null
+++ 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/FlinkValidatorTest.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.admission;
+
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobKind;
+import org.apache.flink.kubernetes.operator.api.spec.JobReference;
+import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.javaoperatorsdk.webhook.admission.NotAllowedException;
+import io.javaoperatorsdk.webhook.admission.Operation;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link FlinkValidator}. */
+@EnableKubernetesMockClient(crud = true)
+class FlinkValidatorTest {
+
+    private KubernetesClient kubernetesClient;
+    private InformerManager informerManager;
+
+    @BeforeEach
+    void setup() {
+        informerManager = new InformerManager(kubernetesClient);
+        informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+    void canaryResourceSkipsValidation(Operation operation) {
+        var validator = createValidator(failingValidator("should not be 
called"));
+        var deployment = createDeployment();
+        var labels = new HashMap<String, String>();
+        labels.put("flink.apache.org/canary", "true");
+        deployment.getMetadata().setLabels(labels);
+
+        assertDoesNotThrow(() -> validator.validate(deployment, null, 
operation));
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+    void deploymentPassesValidation(Operation operation) {
+        var validator = createValidator(passingValidator());
+        var deployment = createDeployment();
+
+        assertDoesNotThrow(() -> validator.validate(deployment, null, 
operation));
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+    void deploymentFailsValidation(Operation operation) {
+        var validator = createValidator(failingValidator("deployment is 
invalid"));
+        var deployment = createDeployment();
+
+        var exception =
+                assertThrows(
+                        NotAllowedException.class,
+                        () -> validator.validate(deployment, null, operation));
+        assertEquals("deployment is invalid", exception.getMessage());
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+    void sessionJobPassesValidation(Operation operation) {
+        var validator = createValidator(passingValidator());
+        var sessionJob = createSessionJob();
+
+        assertDoesNotThrow(() -> validator.validate(sessionJob, null, 
operation));
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+    void sessionJobFailsValidation(Operation operation) {
+        var validator = createValidator(failingValidator("session job is 
invalid"));
+        var sessionJob = createSessionJob();
+
+        var exception =
+                assertThrows(
+                        NotAllowedException.class,
+                        () -> validator.validate(sessionJob, null, operation));
+        assertEquals("session job is invalid", exception.getMessage());
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+    void stateSnapshotPassesValidation(Operation operation) {
+        var validator = createValidator(passingValidator());
+        var snapshot = createStateSnapshot();
+
+        assertDoesNotThrow(() -> validator.validate(snapshot, null, 
operation));
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+    void stateSnapshotFailsValidation(Operation operation) {
+        var validator = createValidator(failingValidator("snapshot is 
invalid"));
+        var snapshot = createStateSnapshot();
+
+        var exception =
+                assertThrows(
+                        NotAllowedException.class,
+                        () -> validator.validate(snapshot, null, operation));
+        assertEquals("snapshot is invalid", exception.getMessage());
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+    void unexpectedResourceKindThrowsNotAllowedException(Operation operation) {
+        var validator = createValidator(passingValidator());
+        var pod = new io.fabric8.kubernetes.api.model.Pod();
+        pod.setMetadata(new ObjectMeta());
+        pod.getMetadata().setName("test-pod");
+        pod.getMetadata().setNamespace("default");
+
+        var exception =
+                assertThrows(
+                        NotAllowedException.class, () -> 
validator.validate(pod, null, operation));
+        assertEquals("Unexpected resource: Pod", exception.getMessage());
+    }
+
+    @Test
+    void firstValidatorErrorStopsValidation() {
+        FlinkResourceValidator first = failingValidator("first error");
+        FlinkResourceValidator second = failingValidator("second error");
+        var validator = new FlinkValidator(Set.of(first, second), 
informerManager);
+        var deployment = createDeployment();
+
+        var exception =
+                assertThrows(
+                        NotAllowedException.class,
+                        () -> validator.validate(deployment, null, 
Operation.CREATE));
+        // With Set ordering, we can't guarantee which fires first, but one of 
them must
+        assertTrue(
+                ("first error".equals(exception.getMessage())
+                                && !"second 
error".equals(exception.getMessage()))
+                        || (!"first error".equals(exception.getMessage())
+                                && "second 
error".equals(exception.getMessage())));
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+    void stateSnapshotWithFlinkDeploymentJobRefPassesValidation(Operation 
operation) {
+        var validator = createValidator(passingValidator());
+        var snapshot = createStateSnapshotWithJobRef(JobKind.FLINK_DEPLOYMENT, 
"test-deployment");
+
+        assertDoesNotThrow(() -> validator.validate(snapshot, null, 
operation));
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+    void stateSnapshotWithFlinkSessionJobJobRefPassesValidation(Operation 
operation) {
+        var validator = createValidator(passingValidator());
+        var snapshot = 
createStateSnapshotWithJobRef(JobKind.FLINK_SESSION_JOB, "test-session-job");
+
+        assertDoesNotThrow(() -> validator.validate(snapshot, null, 
operation));
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+    void 
stateSnapshotWithFlinkDeploymentJobRefThrowsWhenNamespaceMissing(Operation 
operation) {
+        var validator = createValidator(passingValidator());
+        var snapshot =
+                createStateSnapshotWithJobRefNoNamespace(
+                        JobKind.FLINK_DEPLOYMENT, "test-deployment");
+
+        var exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> validator.validate(snapshot, null, operation));
+        assertEquals("Cannot determine namespace for snapshot", 
exception.getMessage());
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE", "DELETE", "CONNECT"})
+    void 
stateSnapshotWithFlinkSessionJobJobRefThrowsWhenNamespaceMissing(Operation 
operation) {
+        var validator = createValidator(passingValidator());
+        var snapshot =
+                createStateSnapshotWithJobRefNoNamespace(
+                        JobKind.FLINK_SESSION_JOB, "test-session-job");
+
+        var exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> validator.validate(snapshot, null, operation));
+        assertEquals("Cannot determine namespace for snapshot", 
exception.getMessage());
+    }
+
+    private FlinkValidator createValidator(FlinkResourceValidator 
resourceValidator) {
+        return new FlinkValidator(Set.of(resourceValidator), informerManager);
+    }
+
+    private FlinkResourceValidator passingValidator() {
+        return new FlinkResourceValidator() {
+            @Override
+            public Optional<String> validateDeployment(FlinkDeployment 
deployment) {
+                return Optional.empty();
+            }
+
+            @Override
+            public Optional<String> validateSessionJob(
+                    FlinkSessionJob sessionJob, Optional<FlinkDeployment> 
session) {
+                return Optional.empty();
+            }
+
+            @Override
+            public Optional<String> validateStateSnapshot(
+                    FlinkStateSnapshot savepoint, 
Optional<AbstractFlinkResource<?, ?>> target) {
+                return Optional.empty();
+            }
+        };
+    }
+
+    private FlinkResourceValidator failingValidator(String errorMessage) {
+        return new FlinkResourceValidator() {
+            @Override
+            public Optional<String> validateDeployment(FlinkDeployment 
deployment) {
+                return Optional.of(errorMessage);
+            }
+
+            @Override
+            public Optional<String> validateSessionJob(
+                    FlinkSessionJob sessionJob, Optional<FlinkDeployment> 
session) {
+                return Optional.of(errorMessage);
+            }
+
+            @Override
+            public Optional<String> validateStateSnapshot(
+                    FlinkStateSnapshot savepoint, 
Optional<AbstractFlinkResource<?, ?>> target) {
+                return Optional.of(errorMessage);
+            }
+        };
+    }
+
+    private FlinkDeployment createDeployment() {
+        var deployment = new FlinkDeployment();
+        var meta = new ObjectMeta();
+        meta.setName("test-deployment");
+        meta.setNamespace("default");
+        deployment.setMetadata(meta);
+        deployment.setSpec(new FlinkDeploymentSpec());
+        return deployment;
+    }
+
+    private FlinkSessionJob createSessionJob() {
+        var sessionJob = new FlinkSessionJob();
+        var meta = new ObjectMeta();
+        meta.setName("test-job");
+        meta.setNamespace("default");
+        sessionJob.setMetadata(meta);
+        sessionJob.setSpec(
+                FlinkSessionJobSpec.builder()
+                        
.job(JobSpec.builder().jarURI("http://test-job.jar";).build())
+                        .deploymentName("test-deployment")
+                        .build());
+        return sessionJob;
+    }
+
+    private FlinkStateSnapshot createStateSnapshot() {
+        var snapshot = new FlinkStateSnapshot();
+        var meta = new ObjectMeta();
+        meta.setName("test-snapshot");
+        meta.setNamespace("default");
+        snapshot.setMetadata(meta);
+        snapshot.setSpec(new FlinkStateSnapshotSpec());
+        return snapshot;
+    }
+
+    private FlinkStateSnapshot createStateSnapshotWithJobRef(JobKind kind, 
String name) {
+        var snapshot = new FlinkStateSnapshot();
+        var meta = new ObjectMeta();
+        meta.setName("test-snapshot");
+        meta.setNamespace("default");
+        snapshot.setMetadata(meta);
+        var spec = new FlinkStateSnapshotSpec();
+        
spec.setJobReference(JobReference.builder().kind(kind).name(name).build());
+        snapshot.setSpec(spec);
+        return snapshot;
+    }
+
+    private FlinkStateSnapshot 
createStateSnapshotWithJobRefNoNamespace(JobKind kind, String name) {
+        var snapshot = new FlinkStateSnapshot();
+        var meta = new ObjectMeta();
+        meta.setName("test-snapshot");
+        snapshot.setMetadata(meta);
+        var spec = new FlinkStateSnapshotSpec();
+        
spec.setJobReference(JobReference.builder().kind(kind).name(name).build());
+        snapshot.setSpec(spec);
+        return snapshot;
+    }
+}
diff --git 
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManagerTest.java
 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManagerTest.java
index 924ed406..1f167fe2 100644
--- 
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManagerTest.java
+++ 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManagerTest.java
@@ -56,6 +56,61 @@ public class InformerManagerTest {
         Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns3"));
     }
 
+    @Test
+    public void testSessionJobNamespacedInformerCreated() {
+        var informerManager = new InformerManager(kubernetesClient);
+        informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+        
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns1"));
+
+        informerManager.setNamespaces(Set.of("ns1", "ns2"));
+        
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns1"));
+        
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns2"));
+
+        informerManager.setNamespaces(Set.of("ns1", "ns2", "ns3"));
+        
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns1"));
+        
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns2"));
+        
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns3"));
+    }
+
+    @Test
+    public void testSetNamespacesStopsSessionJobInformers() {
+        var informerManager = new InformerManager(kubernetesClient);
+        informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+
+        // Initialize session job informers by calling 
getFlinkSessionJobInformer
+        
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns1"));
+
+        // Change namespaces — this should stop existing session job informers 
and reset them
+        informerManager.setNamespaces(Set.of("ns1", "ns2"));
+
+        // Verify new informers are created for the updated namespaces
+        
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns1"));
+        
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns2"));
+    }
+
+    @Test
+    public void testSetNamespacesStopsBothDepAndSessionJobInformers() {
+        var informerManager = new InformerManager(kubernetesClient);
+        informerManager.setNamespaces(Set.of("ns1"));
+
+        // Initialize both types of informers
+        Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1"));
+        
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns1"));
+
+        // Change namespaces — should stop and recreate both
+        informerManager.setNamespaces(Set.of("ns2"));
+
+        Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns2"));
+        
Assertions.assertNotNull(informerManager.getFlinkSessionJobInformer("ns2"));
+
+        // Old namespace should no longer have informers
+        Assertions.assertThrows(
+                NullPointerException.class, () -> 
informerManager.getFlinkDepInformer("ns1"));
+        Assertions.assertThrows(
+                NullPointerException.class,
+                () -> informerManager.getFlinkSessionJobInformer("ns1"));
+    }
+
     @Test
     public void testDynamicNamespaces() {
         InformerManager informerManager = new 
InformerManager(kubernetesClient);
diff --git 
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutatorTest.java
 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutatorTest.java
new file mode 100644
index 00000000..16caa339
--- /dev/null
+++ 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutatorTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.admission.mutator;
+
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.admission.v1.AdmissionRequest;
+import io.javaoperatorsdk.webhook.admission.NotAllowedException;
+import io.javaoperatorsdk.webhook.admission.Operation;
+import io.javaoperatorsdk.webhook.admission.mutation.Mutator;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link DefaultRequestMutator}. */
+class DefaultRequestMutatorTest {
+
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    @Test
+    void handleWithNoMutationProducesEmptyPatch() {
+        var requestMutator = createRequestMutator((resource, operation) -> 
resource);
+        var request = createAdmissionRequest(createDeployment());
+
+        var response = requestMutator.handle(request);
+
+        assertTrue(response.getAllowed());
+        assertEquals("JSONPatch", response.getPatchType());
+        var patch = decodePatch(response.getPatch());
+        assertEquals("[]", patch, "No-op mutation should produce an empty JSON 
patch array");
+    }
+
+    @Test
+    void handleWithLabelMutationProducesPatchWithAddOperation() throws 
Exception {
+        var requestMutator =
+                createRequestMutator(
+                        (resource, operation) -> {
+                            var labels = resource.getMetadata().getLabels();
+                            if (labels == null) {
+                                labels = new HashMap<>();
+                            }
+                            labels.put("injected", "true");
+                            resource.getMetadata().setLabels(labels);
+                            return resource;
+                        });
+        var request = createAdmissionRequest(createDeployment());
+
+        var response = requestMutator.handle(request);
+
+        assertTrue(response.getAllowed());
+        var patch = decodePatch(response.getPatch());
+        var patchNodes = mapper.readTree(patch);
+        assertFalse(patchNodes.isEmpty(), "Mutation should produce at least 
one patch operation");
+        assertTrue(patch.contains("injected"));
+    }
+
+    @Test
+    void handleWithSpecMutationProducesPatchWithReplaceOperation() {
+        var requestMutator =
+                createRequestMutator(
+                        (resource, operation) -> {
+                            ((FlinkDeployment) 
resource).getSpec().setImage("mutated:latest");
+                            return resource;
+                        });
+        var deployment = createDeployment();
+        deployment.getSpec().setImage("original:1.0");
+        var request = createAdmissionRequest(deployment);
+
+        var response = requestMutator.handle(request);
+
+        assertTrue(response.getAllowed());
+        
assertTrue(decodePatch(response.getPatch()).contains("mutated:latest"));
+    }
+
+    @Test
+    void handleWithNotAllowedExceptionReturnsNotAllowedResponse() {
+        var requestMutator =
+                createRequestMutator(
+                        (resource, operation) -> {
+                            throw new NotAllowedException("rejected for 
testing");
+                        });
+        var request = createAdmissionRequest(createDeployment());
+
+        var response = requestMutator.handle(request);
+
+        assertFalse(response.getAllowed());
+        assertNotNull(response.getStatus());
+        assertTrue(response.getStatus().getMessage().contains("rejected for 
testing"));
+    }
+
+    private FlinkDeployment createDeployment() {
+        var deployment = new FlinkDeployment();
+        var meta = new ObjectMeta();
+        meta.setName("test-deployment");
+        meta.setNamespace("default");
+        deployment.setMetadata(meta);
+        deployment.setSpec(new FlinkDeploymentSpec());
+        return deployment;
+    }
+
+    private AdmissionRequest createAdmissionRequest(FlinkDeployment resource) {
+        var request = new AdmissionRequest();
+        request.setOperation(Operation.CREATE.name());
+        request.setObject(resource);
+        return request;
+    }
+
+    private String decodePatch(String base64Patch) {
+        return new String(Base64.getDecoder().decode(base64Patch), 
StandardCharsets.UTF_8);
+    }
+
+    private DefaultRequestMutator<HasMetadata> 
createRequestMutator(Mutator<HasMetadata> mutator) {
+        return new DefaultRequestMutator<>(mutator);
+    }
+}
diff --git 
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutatorTest.java
 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutatorTest.java
new file mode 100644
index 00000000..84c65b82
--- /dev/null
+++ 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutatorTest.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.admission.mutator;
+
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
+import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.mutator.DefaultFlinkMutator;
+import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.javaoperatorsdk.webhook.admission.Operation;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Tests for {@link FlinkMutator}. */
+@EnableKubernetesMockClient(crud = true)
+class FlinkMutatorTest {
+
+    private KubernetesClient kubernetesClient;
+    private InformerManager informerManager;
+    private FlinkMutator mutator;
+
+    @BeforeEach
+    void setup() {
+        informerManager = new InformerManager(kubernetesClient);
+        informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+        mutator = new FlinkMutator(Set.of(new DefaultFlinkMutator()), 
informerManager);
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE"})
+    void deploymentNoMutationReturnsSameInstance(Operation operation) {
+        var deployment = createDeployment();
+
+        var result = mutator.mutate(deployment, operation);
+
+        assertSame(
+                deployment,
+                result,
+                "Should return the original resource when no mutation is 
applied");
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE"})
+    void sessionJobWithoutLabelReturnsMutatedInstance(Operation operation) {
+        var sessionJob = createSessionJob();
+
+        var result = mutator.mutate(sessionJob, operation);
+
+        assertNotSame(sessionJob, result, "Should return a mutated instance 
when label is added");
+        assertInstanceOf(FlinkSessionJob.class, result);
+        assertEquals(
+                "test-deployment",
+                
result.getMetadata().getLabels().get(CrdConstants.LABEL_TARGET_SESSION));
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE"})
+    void sessionJobWithCorrectLabelReturnsSameInstance(Operation operation) {
+        var sessionJob = createSessionJob();
+        var labels = new HashMap<String, String>();
+        labels.put(CrdConstants.LABEL_TARGET_SESSION, "test-deployment");
+        sessionJob.getMetadata().setLabels(labels);
+
+        var result = mutator.mutate(sessionJob, operation);
+
+        assertSame(
+                sessionJob,
+                result,
+                "Should return the original resource when the label is already 
correct");
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE"})
+    void sessionJobWithWrongLabelReturnsMutatedInstance(Operation operation) {
+        var sessionJob = createSessionJob();
+        var labels = new HashMap<String, String>();
+        labels.put(CrdConstants.LABEL_TARGET_SESSION, "wrong-session");
+        sessionJob.getMetadata().setLabels(labels);
+
+        var result = mutator.mutate(sessionJob, operation);
+
+        assertNotSame(sessionJob, result);
+        assertInstanceOf(FlinkSessionJob.class, result);
+        assertEquals(
+                "test-deployment",
+                
result.getMetadata().getLabels().get(CrdConstants.LABEL_TARGET_SESSION));
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE"})
+    void stateSnapshotNoMutationReturnsSameInstance(Operation operation) {
+        var snapshot = createStateSnapshot();
+
+        var result = mutator.mutate(snapshot, operation);
+
+        assertSame(
+                snapshot,
+                result,
+                "Should return the original resource when no mutation is 
applied");
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE"},
+            mode = EnumSource.Mode.EXCLUDE)
+    void nonMutatingOperationsReturnOriginalResource(Operation operation) {
+        var deployment = createDeployment();
+        var sessionJob = createSessionJob();
+        var snapshot = createStateSnapshot();
+
+        assertSame(deployment, mutator.mutate(deployment, operation));
+        assertSame(sessionJob, mutator.mutate(sessionJob, operation));
+        assertSame(snapshot, mutator.mutate(snapshot, operation));
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE"})
+    void deploymentMutatorExceptionIsWrappedInRuntimeException(Operation 
operation) {
+        var failingMutator =
+                new FlinkMutator(
+                        Set.of(
+                                new DefaultFlinkMutator() {
+                                    @Override
+                                    public FlinkDeployment mutateDeployment(
+                                            FlinkDeployment deployment) {
+                                        throw new 
IllegalStateException("deployment error");
+                                    }
+                                }),
+                        informerManager);
+        var deployment = createDeployment();
+
+        var exception =
+                assertThrows(
+                        RuntimeException.class, () -> 
failingMutator.mutate(deployment, operation));
+        assertInstanceOf(IllegalStateException.class, exception.getCause());
+        assertEquals("deployment error", exception.getCause().getMessage());
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE"})
+    void sessionJobMutatorExceptionIsWrappedInRuntimeException(Operation 
operation) {
+        var failingMutator =
+                new FlinkMutator(
+                        Set.of(
+                                new DefaultFlinkMutator() {
+                                    @Override
+                                    public FlinkSessionJob mutateSessionJob(
+                                            FlinkSessionJob sessionJob,
+                                            Optional<FlinkDeployment> session) 
{
+                                        throw new 
IllegalStateException("session job error");
+                                    }
+                                }),
+                        informerManager);
+        var sessionJob = createSessionJob();
+
+        var exception =
+                assertThrows(
+                        RuntimeException.class, () -> 
failingMutator.mutate(sessionJob, operation));
+        assertInstanceOf(IllegalStateException.class, exception.getCause());
+        assertEquals("session job error", exception.getCause().getMessage());
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE"})
+    void stateSnapshotMutatorExceptionIsWrappedInRuntimeException(Operation 
operation) {
+        var failingMutator =
+                new FlinkMutator(
+                        Set.of(
+                                new DefaultFlinkMutator() {
+                                    @Override
+                                    public FlinkStateSnapshot 
mutateStateSnapshot(
+                                            FlinkStateSnapshot stateSnapshot) {
+                                        throw new 
IllegalStateException("snapshot error");
+                                    }
+                                }),
+                        informerManager);
+        var snapshot = createStateSnapshot();
+
+        var exception =
+                assertThrows(
+                        RuntimeException.class, () -> 
failingMutator.mutate(snapshot, operation));
+        assertInstanceOf(IllegalStateException.class, exception.getCause());
+        assertEquals("snapshot error", exception.getCause().getMessage());
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE"})
+    void customLabelMutatorOnDeploymentReturnsMutatedInstance(Operation 
operation) {
+        var labelMutator = createMutatorWith(new LabelInjectingMutator());
+        var deployment = createDeployment();
+
+        var result = labelMutator.mutate(deployment, operation);
+
+        assertNotSame(deployment, result);
+        assertInstanceOf(FlinkDeployment.class, result);
+        assertEquals("injected", 
result.getMetadata().getLabels().get("custom-env"));
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE"})
+    void 
customLabelMutatorOnDeploymentWithMatchingLabelReturnsSameInstance(Operation 
operation) {
+        var labelMutator = createMutatorWith(new LabelInjectingMutator());
+        var deployment = createDeployment();
+        var labels = new HashMap<String, String>();
+        labels.put("custom-env", "injected");
+        deployment.getMetadata().setLabels(labels);
+
+        var result = labelMutator.mutate(deployment, operation);
+
+        assertSame(deployment, result);
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE"})
+    void customLabelMutatorOnSnapshotReturnsMutatedInstance(Operation 
operation) {
+        var labelMutator = createMutatorWith(new LabelInjectingMutator());
+        var snapshot = createStateSnapshot();
+
+        var result = labelMutator.mutate(snapshot, operation);
+
+        assertNotSame(snapshot, result);
+        assertInstanceOf(FlinkStateSnapshot.class, result);
+        assertEquals("injected", 
result.getMetadata().getLabels().get("custom-env"));
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE"})
+    void 
customLabelMutatorOnSnapshotWithMatchingLabelReturnsSameInstance(Operation 
operation) {
+        var labelMutator = createMutatorWith(new LabelInjectingMutator());
+        var snapshot = createStateSnapshot();
+        var labels = new HashMap<String, String>();
+        labels.put("custom-env", "injected");
+        snapshot.getMetadata().setLabels(labels);
+
+        var result = labelMutator.mutate(snapshot, operation);
+
+        assertSame(snapshot, result);
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE"})
+    void customSpecMutatorOnDeploymentReturnsMutatedInstance(Operation 
operation) {
+        var specMutator = createMutatorWith(new SpecModifyingMutator());
+        var deployment = createDeployment();
+
+        var result = specMutator.mutate(deployment, operation);
+
+        assertNotSame(deployment, result);
+        assertInstanceOf(FlinkDeployment.class, result);
+        assertEquals("mutated:latest", ((FlinkDeployment) 
result).getSpec().getImage());
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE"})
+    void customSpecMutatorOnSessionJobReturnsMutatedInstance(Operation 
operation) {
+        var specMutator = createMutatorWith(new SpecModifyingMutator());
+        var sessionJob = createSessionJob();
+
+        var result = specMutator.mutate(sessionJob, operation);
+
+        assertNotSame(sessionJob, result);
+        assertInstanceOf(FlinkSessionJob.class, result);
+        assertEquals(
+                "mutated-deployment", ((FlinkSessionJob) 
result).getSpec().getDeploymentName());
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = Operation.class,
+            names = {"CREATE", "UPDATE"})
+    void customSpecMutatorOnSnapshotReturnsMutatedInstance(Operation 
operation) {
+        var specMutator = createMutatorWith(new SpecModifyingMutator());
+        var snapshot = createStateSnapshot();
+
+        var result = specMutator.mutate(snapshot, operation);
+
+        assertNotSame(snapshot, result);
+        assertInstanceOf(FlinkStateSnapshot.class, result);
+        assertEquals(5, ((FlinkStateSnapshot) 
result).getSpec().getBackoffLimit());
+    }
+
+    private FlinkMutator createMutatorWith(FlinkResourceMutator 
resourceMutator) {
+        return new FlinkMutator(Set.of(resourceMutator), informerManager);
+    }
+
+    private FlinkDeployment createDeployment() {
+        var deployment = new FlinkDeployment();
+        var meta = new ObjectMeta();
+        meta.setName("test-deployment");
+        meta.setNamespace("default");
+        deployment.setMetadata(meta);
+        deployment.setSpec(new FlinkDeploymentSpec());
+        return deployment;
+    }
+
+    private FlinkSessionJob createSessionJob() {
+        var sessionJob = new FlinkSessionJob();
+        var meta = new ObjectMeta();
+        meta.setName("test-job");
+        meta.setNamespace("default");
+        sessionJob.setMetadata(meta);
+        sessionJob.setSpec(
+                FlinkSessionJobSpec.builder()
+                        
.job(JobSpec.builder().jarURI("http://test-job.jar";).build())
+                        .deploymentName("test-deployment")
+                        .build());
+        return sessionJob;
+    }
+
+    private FlinkStateSnapshot createStateSnapshot() {
+        var snapshot = new FlinkStateSnapshot();
+        var meta = new ObjectMeta();
+        meta.setName("test-snapshot");
+        meta.setNamespace("default");
+        snapshot.setMetadata(meta);
+        snapshot.setSpec(new FlinkStateSnapshotSpec());
+        return snapshot;
+    }
+
+    /**
+     * A custom mutator that injects a {@code custom-env=injected} label on 
FlinkDeployment and
+     * FlinkStateSnapshot if not already present. Mirrors the pattern of 
DefaultFlinkMutator's
+     * target-session label on FlinkSessionJob.
+     */
+    private static class LabelInjectingMutator implements FlinkResourceMutator 
{
+        @Override
+        public FlinkDeployment mutateDeployment(FlinkDeployment deployment) {
+            addLabelIfMissing(deployment.getMetadata());
+            return deployment;
+        }
+
+        @Override
+        public FlinkSessionJob mutateSessionJob(
+                FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) 
{
+            return sessionJob;
+        }
+
+        @Override
+        public FlinkStateSnapshot mutateStateSnapshot(FlinkStateSnapshot 
stateSnapshot) {
+            addLabelIfMissing(stateSnapshot.getMetadata());
+            return stateSnapshot;
+        }
+
+        private void addLabelIfMissing(ObjectMeta meta) {
+            var labels = meta.getLabels();
+            if (labels == null) {
+                labels = new HashMap<>();
+            }
+            if (!"injected".equals(labels.get("custom-env"))) {
+                labels.put("custom-env", "injected");
+                meta.setLabels(labels);
+            }
+        }
+    }
+
+    /**
+     * A custom mutator that modifies the spec of every CRD to verify that 
spec-level changes are
+     * always detected by the before/after tree comparison.
+     */
+    private static class SpecModifyingMutator implements FlinkResourceMutator {
+        @Override
+        public FlinkDeployment mutateDeployment(FlinkDeployment deployment) {
+            deployment.getSpec().setImage("mutated:latest");
+            return deployment;
+        }
+
+        @Override
+        public FlinkSessionJob mutateSessionJob(
+                FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) 
{
+            sessionJob.getSpec().setDeploymentName("mutated-deployment");
+            return sessionJob;
+        }
+
+        @Override
+        public FlinkStateSnapshot mutateStateSnapshot(FlinkStateSnapshot 
stateSnapshot) {
+            stateSnapshot.getSpec().setBackoffLimit(5);
+            return stateSnapshot;
+        }
+    }
+}
diff --git 
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/mutator/TestMutator.java
 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/mutator/TestMutator.java
new file mode 100644
index 00000000..88959ee3
--- /dev/null
+++ 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/mutator/TestMutator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.mutator;
+
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+
+import java.util.Optional;
+
+/** Test mutator implementation of {@link FlinkResourceMutator}. */
+public class TestMutator implements FlinkResourceMutator {
+
+    @Override
+    public FlinkDeployment mutateDeployment(FlinkDeployment deployment) {
+        return deployment;
+    }
+
+    @Override
+    public FlinkSessionJob mutateSessionJob(
+            FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) {
+        return sessionJob;
+    }
+
+    @Override
+    public FlinkStateSnapshot mutateStateSnapshot(FlinkStateSnapshot 
stateSnapshot) {
+        return stateSnapshot;
+    }
+}
diff --git 
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/validation/TestValidator.java
 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/validation/TestValidator.java
new file mode 100644
index 00000000..2a461b0c
--- /dev/null
+++ 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/validation/TestValidator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.validation;
+
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+
+import java.util.Optional;
+
+/** Test validator implementation of {@link FlinkResourceValidator}. */
+public class TestValidator implements FlinkResourceValidator {
+
+    @Override
+    public Optional<String> validateDeployment(FlinkDeployment deployment) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<String> validateSessionJob(
+            FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<String> validateStateSnapshot(
+            FlinkStateSnapshot savepoint, Optional<AbstractFlinkResource<?, 
?>> target) {
+        return Optional.empty();
+    }
+}
diff --git 
a/flink-kubernetes-webhook/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator
 
b/flink-kubernetes-webhook/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator
new file mode 100644
index 00000000..467475ab
--- /dev/null
+++ 
b/flink-kubernetes-webhook/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.kubernetes.operator.mutator.TestMutator
+
diff --git 
a/flink-kubernetes-webhook/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator
 
b/flink-kubernetes-webhook/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator
new file mode 100644
index 00000000..2dec4639
--- /dev/null
+++ 
b/flink-kubernetes-webhook/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.kubernetes.operator.validation.TestValidator
+

Reply via email to