[ 
https://issues.apache.org/jira/browse/FLINK-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623256#comment-16623256
 ] 

ASF GitHub Bot commented on FLINK-10371:
----------------------------------------

tillrohrmann commented on a change in pull request #6727: [FLINK-10371] Allow 
to enable SSL mutual authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727#discussion_r219417533
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
 ##########
 @@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.DecoderException;
+
+import org.junit.Test;
+
+import javax.net.ssl.SSLHandshakeException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * This test validates that connections are failing when mutual auth is 
enabled but untrusted
+ * keys are used.
+ */
+public class RestServerSSLAuthITCase extends TestLogger {
+
+       private static final String KEY_STORE_FILE = 
RestServerSSLAuthITCase.class.getResource("/local127.keystore").getFile();
+       private static final String TRUST_STORE_FILE = 
RestServerSSLAuthITCase.class.getResource("/local127.truststore").getFile();
+       private static final String UNTRUSTED_KEY_STORE_FILE = 
RestServerSSLAuthITCase.class.getResource("/untrusted.keystore").getFile();
+
+       private static final Time timeout = Time.seconds(10L);
+
+       @Test
+       public void testConnectFailure() throws Exception {
+               RestClient restClient = null;
+               RestServerEndpoint serverEndpoint = null;
+
+               try {
+                       final Configuration baseConfig = new Configuration();
+                       baseConfig.setInteger(RestOptions.PORT, 0);
+                       baseConfig.setString(RestOptions.ADDRESS, "localhost");
+                       baseConfig.setBoolean(SecurityOptions.SSL_REST_ENABLED, 
true);
+                       
baseConfig.setBoolean(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, true);
+                       baseConfig.setString(SecurityOptions.SSL_ALGORITHMS, 
"TLS_RSA_WITH_AES_128_CBC_SHA");
+
+                       Configuration serverConfig = new 
Configuration(baseConfig);
+                       
serverConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE, TRUST_STORE_FILE);
+                       
serverConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, 
"password");
+                       
serverConfig.setString(SecurityOptions.SSL_REST_KEYSTORE, KEY_STORE_FILE);
+                       
serverConfig.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
+                       
serverConfig.setString(SecurityOptions.SSL_REST_KEY_PASSWORD, "password");
+
+                       Configuration clientConfig = new 
Configuration(baseConfig);
+                       
clientConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE, 
UNTRUSTED_KEY_STORE_FILE);
+                       
clientConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, 
"password");
+                       
clientConfig.setString(SecurityOptions.SSL_REST_KEYSTORE, KEY_STORE_FILE);
+                       
clientConfig.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
+                       
clientConfig.setString(SecurityOptions.SSL_REST_KEY_PASSWORD, "password");
+
+                       RestServerEndpointConfiguration restServerConfig = 
RestServerEndpointConfiguration.fromConfiguration(serverConfig);
+                       RestClientConfiguration restClientConfig = 
RestClientConfiguration.fromConfiguration(clientConfig);
+
+                       final String restAddress = "http://localhost:1234";;
+                       RestfulGateway mockRestfulGateway = 
mock(RestfulGateway.class);
+                       
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+
+                       final GatewayRetriever<RestfulGateway> 
mockGatewayRetriever = () ->
+                               
CompletableFuture.completedFuture(mockRestfulGateway);
+
+                       RestServerEndpointITCase.TestVersionHandler 
testVersionHandler = new RestServerEndpointITCase.TestVersionHandler(
+                               CompletableFuture.completedFuture(restAddress),
+                               mockGatewayRetriever,
+                               RpcUtils.INF_TIMEOUT);
+
+                       serverEndpoint = new 
RestServerEndpointITCase.TestRestServerEndpoint(
+                               restServerConfig,
+                               
Arrays.asList(Tuple2.of(testVersionHandler.getMessageHeaders(), 
testVersionHandler)));
+                       restClient = new 
RestServerEndpointITCase.TestRestClient(restClientConfig);
+                       serverEndpoint.start();
+
+                       CompletableFuture<EmptyResponseBody> response = 
restClient.sendRequest(
+                               serverEndpoint.getServerAddress().getHostName(),
+                               serverEndpoint.getServerAddress().getPort(),
+                               
RestServerEndpointITCase.TestVersionHeaders.INSTANCE,
+                               EmptyMessageParameters.getInstance(),
+                               EmptyRequestBody.getInstance(),
+                               Collections.emptyList()
+                       );
+                       response.get(60, TimeUnit.SECONDS);
+
+                       fail("should never complete normally");
+               } catch (ExecutionException executionException) {
+                       // that is what we want
+                       Throwable exception = executionException.getCause();
+                       assertTrue(exception instanceof DecoderException);
+                       assertTrue(exception.getCause() instanceof 
SSLHandshakeException);
 
 Review comment:
   Here you could use `ExceptionUtils.findThrowable`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow to enable SSL mutual authentication on REST endpoints by configuration
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-10371
>                 URL: https://issues.apache.org/jira/browse/FLINK-10371
>             Project: Flink
>          Issue Type: Improvement
>          Components: Client, REST, Security
>    Affects Versions: 1.6.0, 1.7.0
>            Reporter: Johannes Dillmann
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0, 1.6.2
>
>
> With Flink 1.6 SSL mutual authentication was introduced for internal 
> connectivity in FLINK-9312. 
>  SSL support for external connectivity was also introduced in regard to 
> encryption of the connection and verification of the Flink REST endpoint from 
> the client side.
> But _mutual authentication between the REST endpoint and clients is not 
> supported yet_.
>  The [documentation suggests 
> |https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/security-ssl.html]
>  using a side car proxy to enable SSL mutual auth on the REST endpoint and 
> points out the advantages of using a feature rich proxy.
> While this is a good rationale, there are still important use cases for 
> support of  simple mutual authentication directly in Flink: Mainly support 
> for using standard images in a containerized environment.
> There are tools used to setup Flink Jobs (for example on Kubernetes clusters) 
> and act as gateways to the Flink REST endpoint and the Flink web interface. 
> To prevent unauthorised access to Flink the connectivity has to be secured. 
> As the tools acts as gateway it is easy to create and pass a shared keystore  
> and truststore used for mutual authentication to the Flink instances 
> configurations.
> To enable for SSL mutual authentication on REST endpoints, I am suggesting to 
> add a the configuration parameter `security.ssl.rest.authentication-enabled` 
> which defaults to `false`.
>  If it is set to `true` the `SSLUtils` factories for creating the REST server 
> endpoint and the REST clients should set authentication to required and share 
> `security.ssl.rest.keystore` and `security.ssl.rest.truststore` to setup SSL 
> mutual authenticated connections.
>  
> I have a working prototype which I would gladly submit as a PR to get further 
> feedback. The changes to Flink are minimal and the default behaviour won't 
> change.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to