This is an automated email from the ASF dual-hosted git repository.
gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 841346187bb [FLINK-38953] Cleanup configurations for SSL hostname
verification
841346187bb is described below
commit 841346187bbabf3310f80f10cf5cb4b8d654948c
Author: Mate Czagany <[email protected]>
AuthorDate: Thu Feb 19 14:37:08 2026 +0100
[FLINK-38953] Cleanup configurations for SSL hostname verification
---
.../generated/security_configuration.html | 4 +-
.../shortcodes/generated/security_ssl_section.html | 4 +-
.../flink/configuration/SecurityOptions.java | 17 +-
.../org/apache/flink/runtime/net/SSLUtils.java | 18 +-
.../rest/RestSSLEndpointVerificationITCase.java | 206 +++++++++++++++++++++
.../runtime/rest/RestServerEndpointITCase.java | 11 +-
6 files changed, 236 insertions(+), 24 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/security_configuration.html
b/docs/layouts/shortcodes/generated/security_configuration.html
index 85109758ab9..2432b11e536 100644
--- a/docs/layouts/shortcodes/generated/security_configuration.html
+++ b/docs/layouts/shortcodes/generated/security_configuration.html
@@ -243,10 +243,10 @@
<td>The type of the truststore for Flink's external REST
endpoints.</td>
</tr>
<tr>
- <td><h5>security.ssl.verify-hostname</h5></td>
+ <td><h5>security.ssl.rest.verify-hostname</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
- <td>Flag to enable peer’s hostname verification during ssl
handshake.</td>
+ <td>Whether to verify the REST server's hostname against its SSL
certificate.</td>
</tr>
<tr>
<td><h5>zookeeper.sasl.disable</h5></td>
diff --git a/docs/layouts/shortcodes/generated/security_ssl_section.html
b/docs/layouts/shortcodes/generated/security_ssl_section.html
index 8a4f869e308..7adb57c2b38 100644
--- a/docs/layouts/shortcodes/generated/security_ssl_section.html
+++ b/docs/layouts/shortcodes/generated/security_ssl_section.html
@@ -135,10 +135,10 @@
<td>The type of the truststore for Flink's external REST
endpoints.</td>
</tr>
<tr>
- <td><h5>security.ssl.verify-hostname</h5></td>
+ <td><h5>security.ssl.rest.verify-hostname</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
- <td>Flag to enable peer’s hostname verification during ssl
handshake.</td>
+ <td>Whether to verify the REST server's hostname against its SSL
certificate.</td>
</tr>
</tbody>
</table>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index d4ff4431335..348cc65251d 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -482,6 +482,15 @@ public class SecurityOptions {
+ "This further protects the rest REST
endpoints to present certificate which is only used by proxy server"
+ "This is necessary where once uses
public CA or internal firm wide CA");
+ /** Flag to enable/disable hostname verification for the ssl connections.
*/
+ @Documentation.Section(Documentation.Sections.SECURITY_SSL)
+ public static final ConfigOption<Boolean> SSL_REST_VERIFY_HOSTNAME =
+ key("security.ssl.rest.verify-hostname")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to verify the REST server's hostname
against its SSL certificate.");
+
// ------------------------ ssl parameters --------------------------------
/** SSL protocol version to be supported. */
@@ -514,8 +523,12 @@ public class SecurityOptions {
"here"))
.build());
- /** Flag to enable/disable hostname verification for the ssl connections.
*/
- @Documentation.Section(Documentation.Sections.SECURITY_SSL)
+ /**
+ * @deprecated Use {@link SecurityOptions#SSL_REST_VERIFY_HOSTNAME}.
+ */
+ @Deprecated
+ @Documentation.ExcludeFromDocumentation(
+ "Hostname verification is only supported for REST connections.")
public static final ConfigOption<Boolean> SSL_VERIFY_HOSTNAME =
key("security.ssl.verify-hostname")
.booleanType()
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index 898511b9556..c89dd5eef7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -367,8 +367,6 @@ public class SSLUtils {
sslContextBuilder = SslContextBuilder.forServer(kmf);
}
- setHostnameVerification(sslContextBuilder, config, clientMode);
-
Optional<TrustManagerFactory> tmf = getTrustManagerFactory(config,
true);
tmf.map(sslContextBuilder::trustManager);
@@ -430,13 +428,13 @@ public class SSLUtils {
KeyManagerFactory kmf = getKeyManagerFactory(config, false,
provider);
sslContextBuilder.keyManager(kmf);
}
+ sslContextBuilder.endpointIdentificationAlgorithm(
+ config.get(SecurityOptions.SSL_REST_VERIFY_HOSTNAME) ?
"HTTPS" : null);
} else {
KeyManagerFactory kmf = getKeyManagerFactory(config, false,
provider);
sslContextBuilder = SslContextBuilder.forServer(kmf);
}
- setHostnameVerification(sslContextBuilder, config, clientMode);
-
if (clientMode || clientAuth != ClientAuth.NONE) {
Optional<TrustManagerFactory> tmf = getTrustManagerFactory(config,
false);
tmf.map(
@@ -457,18 +455,6 @@ public class SSLUtils {
// Utilities
// ------------------------------------------------------------------------
- /**
- * Set hostname verification. By default, Netty will enable hostname
verification since 4.2.x
- * for client-mode connections.
- */
- private static void setHostnameVerification(
- SslContextBuilder sslContextBuilder, Configuration config, boolean
clientMode) {
- if (clientMode) {
- sslContextBuilder.endpointIdentificationAlgorithm(
- config.get(SecurityOptions.SSL_VERIFY_HOSTNAME) ? "HTTPS"
: null);
- }
- }
-
private static String getAndCheckOption(
Configuration config,
ConfigOption<String> primaryOption,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestSSLEndpointVerificationITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestSSLEndpointVerificationITCase.java
new file mode 100644
index 00000000000..cd70827061d
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestSSLEndpointVerificationITCase.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.util.TestMessageHeaders;
+import org.apache.flink.runtime.rest.util.TestRestHandler;
+import org.apache.flink.runtime.rest.util.TestRestServerEndpoint;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.Test;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * This test validates that REST SSL endpoint verification works as expected
and will fail the
+ * request if the endpoint certificate does not match the REST server hostname.
+ */
+public class RestSSLEndpointVerificationITCase {
+
+ private static final File KEY_STORE_FILE =
+ new File(
+ RestSSLEndpointVerificationITCase.class
+ .getResource("/local127.keystore")
+ .getFile());
+ private static final File TRUST_STORE_FILE =
+ new File(
+ RestSSLEndpointVerificationITCase.class
+ .getResource("/local127.truststore")
+ .getFile());
+
+ private static final TestMessageHeaders<
+ EmptyRequestBody, EmptyResponseBody,
EmptyMessageParameters>
+ TEST_MESSAGE_HEADERS =
+ TestMessageHeaders.emptyBuilder()
+ .setTargetRestEndpointURL("/test-handler")
+ .build();
+
+ private final Configuration config;
+
+ public RestSSLEndpointVerificationITCase() {
+ config = new Configuration();
+ config.set(RestOptions.BIND_PORT, "0");
+ config.set(RestOptions.ADDRESS, "localhost");
+ config.set(SecurityOptions.SSL_REST_ENABLED, true);
+ config.set(SecurityOptions.SSL_REST_TRUSTSTORE,
TRUST_STORE_FILE.getAbsolutePath());
+ config.set(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, "password");
+ config.set(SecurityOptions.SSL_REST_KEYSTORE,
KEY_STORE_FILE.getAbsolutePath());
+ config.set(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
+ config.set(SecurityOptions.SSL_REST_KEY_PASSWORD, "password");
+ config.set(SecurityOptions.SSL_REST_VERIFY_HOSTNAME, true);
+ }
+
+ /**
+ * With hostname verification turned on, this should execute fine as
"127.0.0.1" is part of the
+ * Subject Alternative Names.
+ */
+ @Test
+ void testConnectSuccess() throws Exception {
+ try (RestServerEndpoint serverEndpoint = getRestServerEndpoint();
+ RestClient restClient = getRestClient("127.0.0.1",
serverEndpoint.getRestPort())) {
+ assertSan(serverEndpoint);
+
+ var result =
+ restClient
+ .sendRequest(
+
serverEndpoint.getServerAddress().getHostName(),
+
serverEndpoint.getServerAddress().getPort(),
+ TEST_MESSAGE_HEADERS,
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance(),
+ Collections.emptyList())
+ .get();
+ assertThat(result).isNotNull();
+ }
+ }
+
+ /** This should fail as "127.0.0.2" is not part of the Subject Alternative
Names. */
+ @Test
+ void testConnectFailure() throws Exception {
+ try (RestServerEndpoint serverEndpoint = getRestServerEndpoint();
+ RestClient restClient = getRestClient("127.0.0.2",
serverEndpoint.getRestPort())) {
+ assertSan(serverEndpoint);
+
+ assertThatFuture(
+ restClient.sendRequest(
+
serverEndpoint.getServerAddress().getHostName(),
+
serverEndpoint.getServerAddress().getPort(),
+ TEST_MESSAGE_HEADERS,
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance(),
+ Collections.emptyList()))
+ .failsWithin(60, TimeUnit.SECONDS)
+ .withThrowableOfType(ExecutionException.class)
+ .satisfies(anyCauseMatches(CertificateException.class))
+ .withMessageContaining("No subject alternative names
matching IP address");
+ }
+ }
+
+ /** Asserts that the SANs in the certificate are what we expect. */
+ private void assertSan(RestServerEndpoint serverEndpoint) throws Exception
{
+ KeyStore ts = KeyStore.getInstance("JKS");
+ try (InputStream in = new FileInputStream(TRUST_STORE_FILE)) {
+ ts.load(in, null);
+ }
+
+ TrustManagerFactory tmf =
+
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ tmf.init(ts);
+
+ SSLContext ctx = SSLContext.getInstance("TLS");
+ ctx.init(null, tmf.getTrustManagers(), null);
+
+ SSLSocketFactory factory = ctx.getSocketFactory();
+ try (SSLSocket socket =
+ (SSLSocket)
+ factory.createSocket(
+
serverEndpoint.getServerAddress().getHostName(),
+ serverEndpoint.getServerAddress().getPort())) {
+
+ socket.startHandshake();
+ Certificate[] certs = socket.getSession().getPeerCertificates();
+ X509Certificate x509 = (X509Certificate) certs[0];
+
+ // Extract Subject Alternative Names
+ Collection<List<?>> sans = x509.getSubjectAlternativeNames();
+ List<String> ipAddresses =
+ sans.stream()
+ .filter(san -> (Integer) san.get(0) == 7) // Type
7 is IP address
+ .map(san -> (String) san.get(1))
+ .collect(Collectors.toList());
+
+ // Assert that 127.0.0.1 is in the certificate
+ assertThat(ipAddresses).contains("127.0.0.1");
+ }
+ }
+
+ private RestServerEndpoint getRestServerEndpoint() throws Exception {
+ RestfulGateway restfulGateway = new
TestingRestfulGateway.Builder().build();
+ final GatewayRetriever<RestfulGateway> gatewayRetriever =
+ () -> CompletableFuture.completedFuture(restfulGateway);
+
+ final TestRestHandler<
+ RestfulGateway, EmptyRequestBody, EmptyResponseBody,
EmptyMessageParameters>
+ testRestHandler =
+ new TestRestHandler<>(
+ gatewayRetriever,
+ TEST_MESSAGE_HEADERS,
+
CompletableFuture.completedFuture(EmptyResponseBody.getInstance()));
+
+ return TestRestServerEndpoint.builder(config)
+ .withHandler(TEST_MESSAGE_HEADERS, testRestHandler)
+ .buildAndStart();
+ }
+
+ private RestClient getRestClient(String host, int port) throws
ConfigurationException {
+ return new RestClient(config, Executors.directExecutor(), host, port,
null);
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index a7b0917903d..c78a3db344b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -162,6 +162,7 @@ public class RestServerEndpointITCase {
sslConfig.set(SecurityOptions.SSL_REST_KEYSTORE, keystorePath);
sslConfig.set(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
sslConfig.set(SecurityOptions.SSL_REST_KEY_PASSWORD, "password");
+ sslConfig.set(SecurityOptions.SSL_REST_VERIFY_HOSTNAME, true);
final Configuration sslRestAuthConfig = new Configuration(sslConfig);
sslRestAuthConfig.set(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED,
true);
@@ -247,9 +248,15 @@ public class RestServerEndpointITCase {
staticFileServerHandler)
.withHandler(new
TestUnavailableHandler(mockGatewayRetriever))
.buildAndStart();
- restClient = new RestClient(config, EXECUTOR_EXTENSION.getExecutor());
-
serverAddress = serverEndpoint.getServerAddress();
+
+ restClient =
+ new RestClient(
+ config,
+ EXECUTOR_EXTENSION.getExecutor(),
+ serverAddress.getHostName(),
+ serverEndpoint.getRestPort(),
+ null);
}
@AfterEach