asfgit closed pull request #6727: [FLINK-10371] Allow to enable SSL mutual 
authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/security_configuration.html 
b/docs/_includes/generated/security_configuration.html
index 19d0287df99..680c1c02434 100644
--- a/docs/_includes/generated/security_configuration.html
+++ b/docs/_includes/generated/security_configuration.html
@@ -62,6 +62,11 @@
             <td style="word-wrap: break-word;">"TLSv1.2"</td>
             <td>The SSL protocol version to be supported for the ssl 
transport. Note that it doesn’t support comma separated list.</td>
         </tr>
+        <tr>
+            <td><h5>security.ssl.rest.authentication-enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Turns on mutual SSL authentication for external communication 
via the REST endpoints.</td>
+        </tr>
         <tr>
             <td><h5>security.ssl.rest.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/docs/ops/security-ssl.md b/docs/ops/security-ssl.md
index ed5f4d771bc..6ea686203ee 100644
--- a/docs/ops/security-ssl.md
+++ b/docs/ops/security-ssl.md
@@ -43,11 +43,11 @@ Internal connectivity includes:
 
   - Control messages: RPC between JobManager / TaskManager / Dispatcher / 
ResourceManager
   - The data plane: The connections between TaskManagers to exchange data 
during shuffles, broadcasts, redistribution, etc.
-  - The Blob Service (distribution of libraries and other artifacts). 
+  - The Blob Service (distribution of libraries and other artifacts).
 
 All internal connections are SSL authenticated and encrypted. The connections 
use **mutual authentication**, meaning both server
 and client side of each connection need to present the certificate to each 
other. The certificate acts effectively as a shared
-secret. 
+secret.
 
 A common setup is to generate a dedicated certificate (may be self-signed) for 
a Flink deployment. The certificate for internal communication
 is not needed by any other party to interact with Flink, and can be simply 
added to the container images, or attached to the YARN deployment.
@@ -61,15 +61,14 @@ All external connectivity is exposed via an HTTP/REST 
endpoint, used for example
   - Communication with the *Dispatcher* to submit jobs (session clusters)
   - Communication with the *JobManager* to inspect and modify a running 
job/application
 
-The REST endpoints can be configured to require SSL connections. The server 
will, however, accept connections from any client, meaning the REST endpoint 
does not authenticate the client.
+The REST endpoints can be configured to require SSL connections. The server 
will, however, accept connections from any client by default, meaning the REST 
endpoint does not authenticate the client.
 
-If authentication of connections to the REST endpoint is required, we 
recommend to deploy a "side car proxy":
+Simple mutual authentication may be enabled by configuration if authentication 
of connections to the REST endpoint is required, but we recommend to deploy a 
"side car proxy":
 Bind the REST endpoint to the loopback interface (or the pod-local interface 
in Kubernetes) and start a REST proxy that authenticates and forwards the 
requests to Flink.
 Examples for proxies that Flink users have deployed are [Envoy 
Proxy](https://www.envoyproxy.io/) or
 [NGINX with 
MOD_AUTH](http://nginx.org/en/docs/http/ngx_http_auth_request_module.html).
 
-The rationale behind delegating authentication to a proxy is that such proxies 
offer many more authentication options than the Flink project could reasonably 
implement itself,
-and thus offer better integration into existing infrastructures.
+The rationale behind delegating authentication to a proxy is that such proxies 
offer a wide variety of authentication options and thus better integration into 
existing infrastructures.
 
 
 #### Queryable State
@@ -115,10 +114,12 @@ security.ssl.internal.truststore-password: 
truststore_password
 
 **REST Endpoints (external connectivity)**
 
-For REST endpoints, the keystore is used by the server endpoint, and the 
truststore is used by the REST clients (including the CLI client)
+For REST endpoints, by default the keystore is used by the server endpoint, 
and the truststore is used by the REST clients (including the CLI client)
 to accept the server's certificate. In the case where the REST keystore has a 
self-signed certificate, the truststore must trust that certificate directly.
 If the REST endpoint uses a certificate that is signed through a proper 
certification hierarchy, the roots of that hierarchy should
-be in the trust store. 
+be in the trust store.
+
+If mutual authentication is enabled, the keystore and the truststore are used 
by both, the server endpoint and the REST clients as with internal connectivity.
 
 {% highlight yaml %}
 security.ssl.rest.keystore: /path/to/file.keystore
@@ -126,6 +127,7 @@ security.ssl.rest.keystore-password: keystore_password
 security.ssl.rest.key-password: key_password
 security.ssl.rest.truststore: /path/to/file.truststore
 security.ssl.rest.truststore-password: truststore_password
+security.ssl.rest.authentication-enabled: false
 {% endhighlight %}
 
 **IMPORTANT**
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 750170c80bb..be413cb7264 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -120,6 +120,14 @@
                        .defaultValue(false)
                        .withDescription("Turns on SSL for external 
communication via the REST endpoints.");
 
+       /**
+        * Enable mututal SSL authentication for external REST endpoints.
+        */
+       public static final ConfigOption<Boolean> 
SSL_REST_AUTHENTICATION_ENABLED =
+               key("security.ssl.rest.authentication-enabled")
+                       .defaultValue(false)
+                       .withDescription("Turns on mutual SSL authentication 
for external communication via the REST endpoints.");
+
        // ----------------- certificates (internal + external) 
-------------------
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index d209f5fd5bf..e0b208d4b8d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -26,10 +26,12 @@
 import javax.annotation.Nullable;
 import javax.net.ServerSocketFactory;
 import javax.net.SocketFactory;
+import javax.net.ssl.KeyManager;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLServerSocket;
 import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 
 import java.io.File;
@@ -65,6 +67,15 @@ public static boolean isRestSSLEnabled(Configuration 
sslConfig) {
                return sslConfig.getBoolean(SecurityOptions.SSL_REST_ENABLED, 
fallbackFlag);
        }
 
+       /**
+        * Checks whether mutual SSL authentication for the external REST 
endpoint is enabled.
+        */
+       public static boolean isRestSSLAuthenticationEnabled(Configuration 
sslConfig) {
+               checkNotNull(sslConfig, "sslConfig");
+               return isRestSSLEnabled(sslConfig) &&
+                       
sslConfig.getBoolean(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED);
+       }
+
        /**
         * Creates a factory for SSL Server Sockets from the given 
configuration.
         * SSL Server Sockets are always part of internal communication.
@@ -145,7 +156,7 @@ public static SSLEngineFactory 
createRestServerSSLEngineFactory(final Configurat
                                getEnabledProtocols(config),
                                getEnabledCipherSuites(config),
                                false,
-                               false);
+                               isRestSSLAuthenticationEnabled(config));
        }
 
        /**
@@ -164,7 +175,7 @@ public static SSLEngineFactory 
createRestClientSSLEngineFactory(final Configurat
                                getEnabledProtocols(config),
                                getEnabledCipherSuites(config),
                                true,
-                               false);
+                               isRestSSLAuthenticationEnabled(config));
        }
 
        private static String[] getEnabledProtocols(final Configuration config) 
{
@@ -228,73 +239,100 @@ public static SSLContext 
createInternalSSLContext(Configuration config) throws E
                return sslContext;
        }
 
+       private enum RestSSLContextConfigMode {
+               CLIENT,
+               SERVER,
+               MUTUAL
+       }
+
        /**
-        * Creates an SSL context for the external REST endpoint server.
+        * Creates an SSL context for the external REST SSL.
+        * If mutual authentication is configured the client and the server 
side configuration are identical.
         */
        @Nullable
-       public static SSLContext createRestServerSSLContext(Configuration 
config) throws Exception {
+       private static SSLContext createRestSSLContext(Configuration config, 
RestSSLContextConfigMode configMode) throws Exception {
                checkNotNull(config, "config");
 
                if (!isRestSSLEnabled(config)) {
                        return null;
                }
 
-               String keystoreFilePath = getAndCheckOption(
+               KeyManager[] keyManagers = null;
+               if (configMode == RestSSLContextConfigMode.SERVER || configMode 
== RestSSLContextConfigMode.MUTUAL) {
+                       String keystoreFilePath = getAndCheckOption(
                                config, SecurityOptions.SSL_REST_KEYSTORE, 
SecurityOptions.SSL_KEYSTORE);
 
-               String keystorePassword = getAndCheckOption(
+                       String keystorePassword = getAndCheckOption(
                                config, 
SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, 
SecurityOptions.SSL_KEYSTORE_PASSWORD);
 
-               String certPassword = getAndCheckOption(
+                       String certPassword = getAndCheckOption(
                                config, SecurityOptions.SSL_REST_KEY_PASSWORD, 
SecurityOptions.SSL_KEY_PASSWORD);
 
-               String sslProtocolVersion = 
config.getString(SecurityOptions.SSL_PROTOCOL);
+                       KeyStore keyStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
+                       try (InputStream keyStoreFile = 
Files.newInputStream(new File(keystoreFilePath).toPath())) {
+                               keyStore.load(keyStoreFile, 
keystorePassword.toCharArray());
+                       }
 
-               KeyStore keyStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
-               try (InputStream keyStoreFile = Files.newInputStream(new 
File(keystoreFilePath).toPath())) {
-                       keyStore.load(keyStoreFile, 
keystorePassword.toCharArray());
+                       KeyManagerFactory kmf = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+                       kmf.init(keyStore, certPassword.toCharArray());
+
+                       keyManagers = kmf.getKeyManagers();
                }
 
-               KeyManagerFactory kmf = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
-               kmf.init(keyStore, certPassword.toCharArray());
+               TrustManager[] trustManagers = null;
+               if (configMode == RestSSLContextConfigMode.CLIENT || configMode 
== RestSSLContextConfigMode.MUTUAL) {
+                       String trustStoreFilePath = getAndCheckOption(
+                               config, SecurityOptions.SSL_REST_TRUSTSTORE, 
SecurityOptions.SSL_TRUSTSTORE);
+
+                       String trustStorePassword = getAndCheckOption(
+                               config, 
SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, 
SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
 
+                       KeyStore trustStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
+                       try (InputStream trustStoreFile = 
Files.newInputStream(new File(trustStoreFilePath).toPath())) {
+                               trustStore.load(trustStoreFile, 
trustStorePassword.toCharArray());
+                       }
+
+                       TrustManagerFactory tmf = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+                       tmf.init(trustStore);
+
+                       trustManagers = tmf.getTrustManagers();
+               }
+
+               String sslProtocolVersion = 
config.getString(SecurityOptions.SSL_PROTOCOL);
                SSLContext sslContext = 
SSLContext.getInstance(sslProtocolVersion);
-               sslContext.init(kmf.getKeyManagers(), null, null);
+               sslContext.init(keyManagers, trustManagers, null);
 
                return sslContext;
        }
 
        /**
-        * Creates an SSL context for clients against the external REST 
endpoint.
+        * Creates an SSL context for the external REST endpoint server.
         */
        @Nullable
-       public static SSLContext createRestClientSSLContext(Configuration 
config) throws Exception {
-               checkNotNull(config, "config");
-
-               if (!isRestSSLEnabled(config)) {
-                       return null;
+       public static SSLContext createRestServerSSLContext(Configuration 
config) throws Exception {
+               final RestSSLContextConfigMode configMode;
+               if (isRestSSLAuthenticationEnabled(config)) {
+                       configMode = RestSSLContextConfigMode.MUTUAL;
+               } else {
+                       configMode = RestSSLContextConfigMode.SERVER;
                }
 
-               String trustStoreFilePath = getAndCheckOption(
-                               config, SecurityOptions.SSL_REST_TRUSTSTORE, 
SecurityOptions.SSL_TRUSTSTORE);
-
-               String trustStorePassword = getAndCheckOption(
-                               config, 
SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, 
SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
-
-               String sslProtocolVersion = 
config.getString(SecurityOptions.SSL_PROTOCOL);
+               return createRestSSLContext(config, configMode);
+       }
 
-               KeyStore trustStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
-               try (InputStream trustStoreFile = Files.newInputStream(new 
File(trustStoreFilePath).toPath())) {
-                       trustStore.load(trustStoreFile, 
trustStorePassword.toCharArray());
+       /**
+        * Creates an SSL context for clients against the external REST 
endpoint.
+        */
+       @Nullable
+       public static SSLContext createRestClientSSLContext(Configuration 
config) throws Exception {
+               final RestSSLContextConfigMode configMode;
+               if (isRestSSLAuthenticationEnabled(config)) {
+                       configMode = RestSSLContextConfigMode.MUTUAL;
+               } else {
+                       configMode = RestSSLContextConfigMode.CLIENT;
                }
 
-               TrustManagerFactory tmf = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
-               tmf.init(trustStore);
-
-               SSLContext sslContext = 
SSLContext.getInstance(sslProtocolVersion);
-               sslContext.init(null, tmf.getTrustManagers(), null);
-
-               return sslContext;
+               return createRestSSLContext(config, configMode);
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
index ca0be6b5bfd..9610e98b101 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
@@ -78,6 +78,28 @@ public void checkEnableSSL() {
                assertFalse(SSLUtils.isRestSSLEnabled(precedence));
        }
 
+       /**
+        * Tests whether activation of REST mutual SSL authentication evaluates 
the config flags correctly.
+        */
+       @Test
+       public void checkEnableRestSSLAuthentication() {
+               // SSL has to be enabled
+               Configuration noSSLOptions = new Configuration();
+               noSSLOptions.setBoolean(SecurityOptions.SSL_REST_ENABLED, 
false);
+               
noSSLOptions.setBoolean(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, true);
+               
assertFalse(SSLUtils.isRestSSLAuthenticationEnabled(noSSLOptions));
+
+               // authentication is disabled by default
+               Configuration defaultOptions = new Configuration();
+               defaultOptions.setBoolean(SecurityOptions.SSL_REST_ENABLED, 
true);
+               
assertFalse(SSLUtils.isRestSSLAuthenticationEnabled(defaultOptions));
+
+               Configuration options = new Configuration();
+               noSSLOptions.setBoolean(SecurityOptions.SSL_REST_ENABLED, true);
+               
noSSLOptions.setBoolean(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, true);
+               
assertTrue(SSLUtils.isRestSSLAuthenticationEnabled(noSSLOptions));
+       }
+
        @Test
        public void testSocketFactoriesWhenSslDisabled() throws Exception {
                Configuration config = new Configuration();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index b017610aa3a..962619160e4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -71,7 +71,9 @@
 import org.junit.runners.Parameterized;
 
 import javax.annotation.Nonnull;
+import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -126,6 +128,7 @@
 
        private final Configuration config;
        private SSLContext defaultSSLContext;
+       private SSLSocketFactory defaultSSLSocketFactory;
 
        public RestServerEndpointITCase(final Configuration config) {
                this.config = requireNonNull(config);
@@ -151,8 +154,11 @@ public RestServerEndpointITCase(final Configuration 
config) {
                sslConfig.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, 
"password");
                sslConfig.setString(SecurityOptions.SSL_REST_KEY_PASSWORD, 
"password");
 
+               final Configuration sslRestAuthConfig = new 
Configuration(sslConfig);
+               
sslRestAuthConfig.setBoolean(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, 
true);
+
                return Arrays.asList(new Object[][]{
-                       {config}, {sslConfig}
+                       {config}, {sslConfig}, {sslRestAuthConfig}
                });
        }
 
@@ -170,9 +176,11 @@ public void setup() throws Exception {
                config.setString(WebOptions.UPLOAD_DIR, 
temporaryFolder.newFolder().getCanonicalPath());
 
                defaultSSLContext = SSLContext.getDefault();
+               defaultSSLSocketFactory = 
HttpsURLConnection.getDefaultSSLSocketFactory();
                final SSLContext sslClientContext = 
SSLUtils.createRestClientSSLContext(config);
                if (sslClientContext != null) {
                        SSLContext.setDefault(sslClientContext);
+                       
HttpsURLConnection.setDefaultSSLSocketFactory(sslClientContext.getSocketFactory());
                }
 
                RestServerEndpointConfiguration serverConfig = 
RestServerEndpointConfiguration.fromConfiguration(config);
@@ -235,6 +243,7 @@ public void setup() throws Exception {
        public void teardown() throws Exception {
                if (defaultSSLContext != null) {
                        SSLContext.setDefault(defaultSSLContext);
+                       
HttpsURLConnection.setDefaultSSLSocketFactory(defaultSSLSocketFactory);
                }
 
                if (restClient != null) {
@@ -541,7 +550,7 @@ private static String createStringOfSize(int size) {
                return sb.toString();
        }
 
-       private static class TestRestServerEndpoint extends RestServerEndpoint {
+       static class TestRestServerEndpoint extends RestServerEndpoint {
 
                private final List<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> handlers;
 
@@ -602,7 +611,7 @@ protected void startInternal() {}
                }
        }
 
-       private static class TestRestClient extends RestClient {
+       static class TestRestClient extends RestClient {
 
                TestRestClient(RestClientConfiguration configuration) {
                        super(configuration, TestingUtils.defaultExecutor());
@@ -803,9 +812,9 @@ private TestUploadHandler(
                }
        }
 
-       private static class TestVersionHandler extends 
AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, 
EmptyMessageParameters> {
+       static class TestVersionHandler extends 
AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, 
EmptyMessageParameters> {
 
-               private TestVersionHandler(
+               TestVersionHandler(
                        final CompletableFuture<String> localRestAddress,
                        final GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                        final Time timeout) {
@@ -818,7 +827,7 @@ private TestVersionHandler(
                }
        }
 
-       private enum TestVersionHeaders implements 
MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+       enum TestVersionHeaders implements MessageHeaders<EmptyRequestBody, 
EmptyResponseBody, EmptyMessageParameters> {
                INSTANCE;
 
                @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
new file mode 100644
index 00000000000..5d75a300673
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.net.ssl.SSLHandshakeException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * This test validates that connections are failing when mutual auth is 
enabled but untrusted
+ * keys are used.
+ */
+public class RestServerSSLAuthITCase extends TestLogger {
+
+       private static final String KEY_STORE_FILE = 
RestServerSSLAuthITCase.class.getResource("/local127.keystore").getFile();
+       private static final String TRUST_STORE_FILE = 
RestServerSSLAuthITCase.class.getResource("/local127.truststore").getFile();
+       private static final String UNTRUSTED_KEY_STORE_FILE = 
RestServerSSLAuthITCase.class.getResource("/untrusted.keystore").getFile();
+
+       private static final Time timeout = Time.seconds(10L);
+
+       private RestfulGateway restfulGateway;
+
+       @Test
+       public void testConnectFailure() throws Exception {
+               RestClient restClient = null;
+               RestServerEndpoint serverEndpoint = null;
+
+               try {
+                       final Configuration baseConfig = new Configuration();
+                       baseConfig.setInteger(RestOptions.PORT, 0);
+                       baseConfig.setString(RestOptions.ADDRESS, "localhost");
+                       baseConfig.setBoolean(SecurityOptions.SSL_REST_ENABLED, 
true);
+                       
baseConfig.setBoolean(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, true);
+                       baseConfig.setString(SecurityOptions.SSL_ALGORITHMS, 
"TLS_RSA_WITH_AES_128_CBC_SHA");
+
+                       Configuration serverConfig = new 
Configuration(baseConfig);
+                       
serverConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE, TRUST_STORE_FILE);
+                       
serverConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, 
"password");
+                       
serverConfig.setString(SecurityOptions.SSL_REST_KEYSTORE, KEY_STORE_FILE);
+                       
serverConfig.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
+                       
serverConfig.setString(SecurityOptions.SSL_REST_KEY_PASSWORD, "password");
+
+                       Configuration clientConfig = new 
Configuration(baseConfig);
+                       
clientConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE, 
UNTRUSTED_KEY_STORE_FILE);
+                       
clientConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, 
"password");
+                       
clientConfig.setString(SecurityOptions.SSL_REST_KEYSTORE, KEY_STORE_FILE);
+                       
clientConfig.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
+                       
clientConfig.setString(SecurityOptions.SSL_REST_KEY_PASSWORD, "password");
+
+                       RestServerEndpointConfiguration restServerConfig = 
RestServerEndpointConfiguration.fromConfiguration(serverConfig);
+                       RestClientConfiguration restClientConfig = 
RestClientConfiguration.fromConfiguration(clientConfig);
+
+                       RestfulGateway restfulGateway = 
TestingRestfulGateway.newBuilder().build();
+                       RestServerEndpointITCase.TestVersionHandler 
testVersionHandler = new RestServerEndpointITCase.TestVersionHandler(
+                               
CompletableFuture.completedFuture("http://localhost:1234";),
+                               () -> 
CompletableFuture.completedFuture(restfulGateway),
+                               RpcUtils.INF_TIMEOUT);
+
+                       serverEndpoint = new 
RestServerEndpointITCase.TestRestServerEndpoint(
+                               restServerConfig,
+                               
Arrays.asList(Tuple2.of(testVersionHandler.getMessageHeaders(), 
testVersionHandler)));
+                       restClient = new 
RestServerEndpointITCase.TestRestClient(restClientConfig);
+                       serverEndpoint.start();
+
+                       CompletableFuture<EmptyResponseBody> response = 
restClient.sendRequest(
+                               serverEndpoint.getServerAddress().getHostName(),
+                               serverEndpoint.getServerAddress().getPort(),
+                               
RestServerEndpointITCase.TestVersionHeaders.INSTANCE,
+                               EmptyMessageParameters.getInstance(),
+                               EmptyRequestBody.getInstance(),
+                               Collections.emptyList()
+                       );
+                       response.get(60, TimeUnit.SECONDS);
+
+                       fail("should never complete normally");
+               } catch (ExecutionException exception) {
+                       // that is what we want
+                       assertTrue(ExceptionUtils.findThrowable(exception, 
SSLHandshakeException.class).isPresent());
+               } finally {
+                       if (restClient != null) {
+                               restClient.shutdown(timeout);
+                       }
+
+                       if (serverEndpoint != null) {
+                               serverEndpoint.close();
+                       }
+               }
+       }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to