This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 841346187bb [FLINK-38953] Cleanup configurations for SSL hostname 
verification
841346187bb is described below

commit 841346187bbabf3310f80f10cf5cb4b8d654948c
Author: Mate Czagany <[email protected]>
AuthorDate: Thu Feb 19 14:37:08 2026 +0100

    [FLINK-38953] Cleanup configurations for SSL hostname verification
---
 .../generated/security_configuration.html          |   4 +-
 .../shortcodes/generated/security_ssl_section.html |   4 +-
 .../flink/configuration/SecurityOptions.java       |  17 +-
 .../org/apache/flink/runtime/net/SSLUtils.java     |  18 +-
 .../rest/RestSSLEndpointVerificationITCase.java    | 206 +++++++++++++++++++++
 .../runtime/rest/RestServerEndpointITCase.java     |  11 +-
 6 files changed, 236 insertions(+), 24 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/security_configuration.html 
b/docs/layouts/shortcodes/generated/security_configuration.html
index 85109758ab9..2432b11e536 100644
--- a/docs/layouts/shortcodes/generated/security_configuration.html
+++ b/docs/layouts/shortcodes/generated/security_configuration.html
@@ -243,10 +243,10 @@
             <td>The type of the truststore for Flink's external REST 
endpoints.</td>
         </tr>
         <tr>
-            <td><h5>security.ssl.verify-hostname</h5></td>
+            <td><h5>security.ssl.rest.verify-hostname</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>Flag to enable peer’s hostname verification during ssl 
handshake.</td>
+            <td>Whether to verify the REST server's hostname against its SSL 
certificate.</td>
         </tr>
         <tr>
             <td><h5>zookeeper.sasl.disable</h5></td>
diff --git a/docs/layouts/shortcodes/generated/security_ssl_section.html 
b/docs/layouts/shortcodes/generated/security_ssl_section.html
index 8a4f869e308..7adb57c2b38 100644
--- a/docs/layouts/shortcodes/generated/security_ssl_section.html
+++ b/docs/layouts/shortcodes/generated/security_ssl_section.html
@@ -135,10 +135,10 @@
             <td>The type of the truststore for Flink's external REST 
endpoints.</td>
         </tr>
         <tr>
-            <td><h5>security.ssl.verify-hostname</h5></td>
+            <td><h5>security.ssl.rest.verify-hostname</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>Flag to enable peer’s hostname verification during ssl 
handshake.</td>
+            <td>Whether to verify the REST server's hostname against its SSL 
certificate.</td>
         </tr>
     </tbody>
 </table>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index d4ff4431335..348cc65251d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -482,6 +482,15 @@ public class SecurityOptions {
                                     + "This further protects the rest REST 
endpoints to present certificate which is only used by proxy server"
                                     + "This is necessary where once uses 
public CA or internal firm wide CA");
 
+    /** Flag to enable/disable hostname verification for the ssl connections. 
*/
+    @Documentation.Section(Documentation.Sections.SECURITY_SSL)
+    public static final ConfigOption<Boolean> SSL_REST_VERIFY_HOSTNAME =
+            key("security.ssl.rest.verify-hostname")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to verify the REST server's hostname 
against its SSL certificate.");
+
     // ------------------------ ssl parameters --------------------------------
 
     /** SSL protocol version to be supported. */
@@ -514,8 +523,12 @@ public class SecurityOptions {
                                                     "here"))
                                     .build());
 
-    /** Flag to enable/disable hostname verification for the ssl connections. 
*/
-    @Documentation.Section(Documentation.Sections.SECURITY_SSL)
+    /**
+     * @deprecated Use {@link SecurityOptions#SSL_REST_VERIFY_HOSTNAME}.
+     */
+    @Deprecated
+    @Documentation.ExcludeFromDocumentation(
+            "Hostname verification is only supported for REST connections.")
     public static final ConfigOption<Boolean> SSL_VERIFY_HOSTNAME =
             key("security.ssl.verify-hostname")
                     .booleanType()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index 898511b9556..c89dd5eef7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -367,8 +367,6 @@ public class SSLUtils {
             sslContextBuilder = SslContextBuilder.forServer(kmf);
         }
 
-        setHostnameVerification(sslContextBuilder, config, clientMode);
-
         Optional<TrustManagerFactory> tmf = getTrustManagerFactory(config, 
true);
         tmf.map(sslContextBuilder::trustManager);
 
@@ -430,13 +428,13 @@ public class SSLUtils {
                 KeyManagerFactory kmf = getKeyManagerFactory(config, false, 
provider);
                 sslContextBuilder.keyManager(kmf);
             }
+            sslContextBuilder.endpointIdentificationAlgorithm(
+                    config.get(SecurityOptions.SSL_REST_VERIFY_HOSTNAME) ? 
"HTTPS" : null);
         } else {
             KeyManagerFactory kmf = getKeyManagerFactory(config, false, 
provider);
             sslContextBuilder = SslContextBuilder.forServer(kmf);
         }
 
-        setHostnameVerification(sslContextBuilder, config, clientMode);
-
         if (clientMode || clientAuth != ClientAuth.NONE) {
             Optional<TrustManagerFactory> tmf = getTrustManagerFactory(config, 
false);
             tmf.map(
@@ -457,18 +455,6 @@ public class SSLUtils {
     //  Utilities
     // ------------------------------------------------------------------------
 
-    /**
-     * Set hostname verification. By default, Netty will enable hostname 
verification since 4.2.x
-     * for client-mode connections.
-     */
-    private static void setHostnameVerification(
-            SslContextBuilder sslContextBuilder, Configuration config, boolean 
clientMode) {
-        if (clientMode) {
-            sslContextBuilder.endpointIdentificationAlgorithm(
-                    config.get(SecurityOptions.SSL_VERIFY_HOSTNAME) ? "HTTPS" 
: null);
-        }
-    }
-
     private static String getAndCheckOption(
             Configuration config,
             ConfigOption<String> primaryOption,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestSSLEndpointVerificationITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestSSLEndpointVerificationITCase.java
new file mode 100644
index 00000000000..cd70827061d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestSSLEndpointVerificationITCase.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.util.TestMessageHeaders;
+import org.apache.flink.runtime.rest.util.TestRestHandler;
+import org.apache.flink.runtime.rest.util.TestRestServerEndpoint;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.Test;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * This test validates that REST SSL endpoint verification works as expected 
and will fail the
+ * request if the endpoint certificate does not match the REST server hostname.
+ */
+public class RestSSLEndpointVerificationITCase {
+
+    private static final File KEY_STORE_FILE =
+            new File(
+                    RestSSLEndpointVerificationITCase.class
+                            .getResource("/local127.keystore")
+                            .getFile());
+    private static final File TRUST_STORE_FILE =
+            new File(
+                    RestSSLEndpointVerificationITCase.class
+                            .getResource("/local127.truststore")
+                            .getFile());
+
+    private static final TestMessageHeaders<
+                    EmptyRequestBody, EmptyResponseBody, 
EmptyMessageParameters>
+            TEST_MESSAGE_HEADERS =
+                    TestMessageHeaders.emptyBuilder()
+                            .setTargetRestEndpointURL("/test-handler")
+                            .build();
+
+    private final Configuration config;
+
+    public RestSSLEndpointVerificationITCase() {
+        config = new Configuration();
+        config.set(RestOptions.BIND_PORT, "0");
+        config.set(RestOptions.ADDRESS, "localhost");
+        config.set(SecurityOptions.SSL_REST_ENABLED, true);
+        config.set(SecurityOptions.SSL_REST_TRUSTSTORE, 
TRUST_STORE_FILE.getAbsolutePath());
+        config.set(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, "password");
+        config.set(SecurityOptions.SSL_REST_KEYSTORE, 
KEY_STORE_FILE.getAbsolutePath());
+        config.set(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
+        config.set(SecurityOptions.SSL_REST_KEY_PASSWORD, "password");
+        config.set(SecurityOptions.SSL_REST_VERIFY_HOSTNAME, true);
+    }
+
+    /**
+     * With hostname verification turned on, this should execute fine as 
"127.0.0.1" is part of the
+     * Subject Alternative Names.
+     */
+    @Test
+    void testConnectSuccess() throws Exception {
+        try (RestServerEndpoint serverEndpoint = getRestServerEndpoint();
+                RestClient restClient = getRestClient("127.0.0.1", 
serverEndpoint.getRestPort())) {
+            assertSan(serverEndpoint);
+
+            var result =
+                    restClient
+                            .sendRequest(
+                                    
serverEndpoint.getServerAddress().getHostName(),
+                                    
serverEndpoint.getServerAddress().getPort(),
+                                    TEST_MESSAGE_HEADERS,
+                                    EmptyMessageParameters.getInstance(),
+                                    EmptyRequestBody.getInstance(),
+                                    Collections.emptyList())
+                            .get();
+            assertThat(result).isNotNull();
+        }
+    }
+
+    /** This should fail as "127.0.0.2" is not part of the Subject Alternative 
Names. */
+    @Test
+    void testConnectFailure() throws Exception {
+        try (RestServerEndpoint serverEndpoint = getRestServerEndpoint();
+                RestClient restClient = getRestClient("127.0.0.2", 
serverEndpoint.getRestPort())) {
+            assertSan(serverEndpoint);
+
+            assertThatFuture(
+                            restClient.sendRequest(
+                                    
serverEndpoint.getServerAddress().getHostName(),
+                                    
serverEndpoint.getServerAddress().getPort(),
+                                    TEST_MESSAGE_HEADERS,
+                                    EmptyMessageParameters.getInstance(),
+                                    EmptyRequestBody.getInstance(),
+                                    Collections.emptyList()))
+                    .failsWithin(60, TimeUnit.SECONDS)
+                    .withThrowableOfType(ExecutionException.class)
+                    .satisfies(anyCauseMatches(CertificateException.class))
+                    .withMessageContaining("No subject alternative names 
matching IP address");
+        }
+    }
+
+    /** Asserts that the SANs in the certificate are what we expect. */
+    private void assertSan(RestServerEndpoint serverEndpoint) throws Exception 
{
+        KeyStore ts = KeyStore.getInstance("JKS");
+        try (InputStream in = new FileInputStream(TRUST_STORE_FILE)) {
+            ts.load(in, null);
+        }
+
+        TrustManagerFactory tmf =
+                
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+        tmf.init(ts);
+
+        SSLContext ctx = SSLContext.getInstance("TLS");
+        ctx.init(null, tmf.getTrustManagers(), null);
+
+        SSLSocketFactory factory = ctx.getSocketFactory();
+        try (SSLSocket socket =
+                (SSLSocket)
+                        factory.createSocket(
+                                
serverEndpoint.getServerAddress().getHostName(),
+                                serverEndpoint.getServerAddress().getPort())) {
+
+            socket.startHandshake();
+            Certificate[] certs = socket.getSession().getPeerCertificates();
+            X509Certificate x509 = (X509Certificate) certs[0];
+
+            // Extract Subject Alternative Names
+            Collection<List<?>> sans = x509.getSubjectAlternativeNames();
+            List<String> ipAddresses =
+                    sans.stream()
+                            .filter(san -> (Integer) san.get(0) == 7) // Type 
7 is IP address
+                            .map(san -> (String) san.get(1))
+                            .collect(Collectors.toList());
+
+            // Assert that 127.0.0.1 is in the certificate
+            assertThat(ipAddresses).contains("127.0.0.1");
+        }
+    }
+
+    private RestServerEndpoint getRestServerEndpoint() throws Exception {
+        RestfulGateway restfulGateway = new 
TestingRestfulGateway.Builder().build();
+        final GatewayRetriever<RestfulGateway> gatewayRetriever =
+                () -> CompletableFuture.completedFuture(restfulGateway);
+
+        final TestRestHandler<
+                        RestfulGateway, EmptyRequestBody, EmptyResponseBody, 
EmptyMessageParameters>
+                testRestHandler =
+                        new TestRestHandler<>(
+                                gatewayRetriever,
+                                TEST_MESSAGE_HEADERS,
+                                
CompletableFuture.completedFuture(EmptyResponseBody.getInstance()));
+
+        return TestRestServerEndpoint.builder(config)
+                .withHandler(TEST_MESSAGE_HEADERS, testRestHandler)
+                .buildAndStart();
+    }
+
+    private RestClient getRestClient(String host, int port) throws 
ConfigurationException {
+        return new RestClient(config, Executors.directExecutor(), host, port, 
null);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index a7b0917903d..c78a3db344b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -162,6 +162,7 @@ public class RestServerEndpointITCase {
         sslConfig.set(SecurityOptions.SSL_REST_KEYSTORE, keystorePath);
         sslConfig.set(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
         sslConfig.set(SecurityOptions.SSL_REST_KEY_PASSWORD, "password");
+        sslConfig.set(SecurityOptions.SSL_REST_VERIFY_HOSTNAME, true);
 
         final Configuration sslRestAuthConfig = new Configuration(sslConfig);
         sslRestAuthConfig.set(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, 
true);
@@ -247,9 +248,15 @@ public class RestServerEndpointITCase {
                                 staticFileServerHandler)
                         .withHandler(new 
TestUnavailableHandler(mockGatewayRetriever))
                         .buildAndStart();
-        restClient = new RestClient(config, EXECUTOR_EXTENSION.getExecutor());
-
         serverAddress = serverEndpoint.getServerAddress();
+
+        restClient =
+                new RestClient(
+                        config,
+                        EXECUTOR_EXTENSION.getExecutor(),
+                        serverAddress.getHostName(),
+                        serverEndpoint.getRestPort(),
+                        null);
     }
 
     @AfterEach

Reply via email to