[FLINK-9314] [security] (part 4) Add mutual authentication for internal Netty 
and Blob Server connections

This closes #6326.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a502f827
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a502f827
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a502f827

Branch: refs/heads/master
Commit: a502f82777acc825aaea4de22ece6b4d247260b9
Parents: 3aeb00f
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 12 21:18:46 2018 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Jul 16 08:10:46 2018 +0200

----------------------------------------------------------------------
 .../runtime/io/network/netty/NettyConfig.java   |   4 +
 .../flink/runtime/net/SSLEngineFactory.java     |  10 +-
 .../org/apache/flink/runtime/net/SSLUtils.java  |   9 +-
 .../network/netty/NettyClientServerSslTest.java | 133 ++++++++++---------
 4 files changed, 90 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a502f827/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 46cdaab..59971e1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -213,6 +213,10 @@ public class NettyConfig {
                return 
config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
        }
 
+       public Configuration getConfig() {
+               return config;
+       }
+
        @Override
        public String toString() {
                String format = "NettyConfig [" +

http://git-wip-us.apache.org/repos/asf/flink/blob/a502f827/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
index d842267..68f60b8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java
@@ -36,15 +36,20 @@ public class SSLEngineFactory {
 
        private final boolean clientMode;
 
+       final boolean clientAuthentication;
+
        public SSLEngineFactory(
                        final SSLContext sslContext,
                        final String[] enabledProtocols,
                        final String[] enabledCipherSuites,
-                       final boolean clientMode) {
+                       final boolean clientMode,
+                       final boolean clientAuthentication) {
+
                this.sslContext = requireNonNull(sslContext, "sslContext must 
not be null");
                this.enabledProtocols = requireNonNull(enabledProtocols, 
"enabledProtocols must not be null");
                this.enabledCipherSuites = requireNonNull(enabledCipherSuites, 
"cipherSuites must not be null");
                this.clientMode = clientMode;
+               this.clientAuthentication = clientAuthentication;
        }
 
        public SSLEngine createSSLEngine() {
@@ -63,5 +68,8 @@ public class SSLEngineFactory {
                sslEngine.setEnabledProtocols(enabledProtocols);
                sslEngine.setEnabledCipherSuites(enabledCipherSuites);
                sslEngine.setUseClientMode(clientMode);
+               if (!clientMode) {
+                       sslEngine.setNeedClientAuth(clientAuthentication);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a502f827/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index 5c95535..d209f5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -108,7 +108,8 @@ public class SSLUtils {
                                sslContext,
                                getEnabledProtocols(config),
                                getEnabledCipherSuites(config),
-                               false);
+                               false,
+                               true);
        }
 
        /**
@@ -124,6 +125,7 @@ public class SSLUtils {
                                sslContext,
                                getEnabledProtocols(config),
                                getEnabledCipherSuites(config),
+                               true,
                                true);
        }
 
@@ -142,6 +144,7 @@ public class SSLUtils {
                                sslContext,
                                getEnabledProtocols(config),
                                getEnabledCipherSuites(config),
+                               false,
                                false);
        }
 
@@ -160,7 +163,8 @@ public class SSLUtils {
                                sslContext,
                                getEnabledProtocols(config),
                                getEnabledCipherSuites(config),
-                               true);
+                               true,
+                               false);
        }
 
        private static String[] getEnabledProtocols(final Configuration config) 
{
@@ -352,6 +356,7 @@ public class SSLUtils {
                private void configureServerSocket(SSLServerSocket socket) {
                        socket.setEnabledProtocols(protocols);
                        socket.setEnabledCipherSuites(cipherSuites);
+                       socket.setNeedClientAuth(true);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a502f827/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
index 2750660..b1f3b48 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
+import 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
+import org.apache.flink.runtime.net.SSLUtilsTest;
 import org.apache.flink.util.NetUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
@@ -35,6 +37,10 @@ import java.net.InetAddress;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the SSL connection between Netty Server and Client used for the
+ * data plane.
+ */
 public class NettyClientServerSslTest {
 
        /**
@@ -42,24 +48,9 @@ public class NettyClientServerSslTest {
         */
        @Test
        public void testValidSslConnection() throws Exception {
-               NettyProtocol protocol = new NettyProtocol(null, null, true) {
-                       @Override
-                       public ChannelHandler[] getServerChannelHandlers() {
-                               return new ChannelHandler[0];
-                       }
-
-                       @Override
-                       public ChannelHandler[] getClientChannelHandlers() {
-                               return new ChannelHandler[0];
-                       }
-               };
-
-               NettyConfig nettyConfig = new NettyConfig(
-                       InetAddress.getLoopbackAddress(),
-                       NetUtils.getAvailablePort(),
-                       NettyTestUtil.DEFAULT_SEGMENT_SIZE,
-                       1,
-                       createSslConfig());
+               NettyProtocol protocol = new NoOpProtocol();
+
+               NettyConfig nettyConfig = createNettyConfig(createSslConfig());
 
                NettyTestUtil.NettyServerAndClient serverAndClient = 
NettyTestUtil.initServerAndClient(protocol, nettyConfig);
 
@@ -77,28 +68,13 @@ public class NettyClientServerSslTest {
         */
        @Test
        public void testInvalidSslConfiguration() throws Exception {
-               NettyProtocol protocol = new NettyProtocol(null, null, true) {
-                       @Override
-                       public ChannelHandler[] getServerChannelHandlers() {
-                               return new ChannelHandler[0];
-                       }
-
-                       @Override
-                       public ChannelHandler[] getClientChannelHandlers() {
-                               return new ChannelHandler[0];
-                       }
-               };
+               NettyProtocol protocol = new NoOpProtocol();
 
                Configuration config = createSslConfig();
                // Modify the keystore password to an incorrect one
-               config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, 
"invalidpassword");
+               
config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, 
"invalidpassword");
 
-               NettyConfig nettyConfig = new NettyConfig(
-                       InetAddress.getLoopbackAddress(),
-                       NetUtils.getAvailablePort(),
-                       NettyTestUtil.DEFAULT_SEGMENT_SIZE,
-                       1,
-                       config);
+               NettyConfig nettyConfig = createNettyConfig(config);
 
                NettyTestUtil.NettyServerAndClient serverAndClient = null;
                try {
@@ -116,29 +92,14 @@ public class NettyClientServerSslTest {
         */
        @Test
        public void testSslHandshakeError() throws Exception {
-               NettyProtocol protocol = new NettyProtocol(null, null, true) {
-                       @Override
-                       public ChannelHandler[] getServerChannelHandlers() {
-                               return new ChannelHandler[0];
-                       }
-
-                       @Override
-                       public ChannelHandler[] getClientChannelHandlers() {
-                               return new ChannelHandler[0];
-                       }
-               };
+               NettyProtocol protocol = new NoOpProtocol();
 
                Configuration config = createSslConfig();
 
                // Use a server certificate which is not present in the 
truststore
-               config.setString(SecurityOptions.SSL_KEYSTORE, 
"src/test/resources/untrusted.keystore");
+               config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, 
"src/test/resources/untrusted.keystore");
 
-               NettyConfig nettyConfig = new NettyConfig(
-                       InetAddress.getLoopbackAddress(),
-                       NetUtils.getAvailablePort(),
-                       NettyTestUtil.DEFAULT_SEGMENT_SIZE,
-                       1,
-                       config);
+               NettyConfig nettyConfig = createNettyConfig(config);
 
                NettyTestUtil.NettyServerAndClient serverAndClient = 
NettyTestUtil.initServerAndClient(protocol, nettyConfig);
 
@@ -151,14 +112,60 @@ public class NettyClientServerSslTest {
                NettyTestUtil.shutdown(serverAndClient);
        }
 
-       private static Configuration createSslConfig() throws Exception {
-               Configuration flinkConfig = new Configuration();
-               flinkConfig.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, 
true);
-               flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, 
"src/test/resources/local127.keystore");
-               
flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, 
"password");
-               
flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, "password");
-               flinkConfig.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE, 
"src/test/resources/local127.truststore");
-               
flinkConfig.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, 
"password");
-               return flinkConfig;
+       @Test
+       public void testClientUntrustedCertificate() throws Exception {
+               final Configuration serverConfig = createSslConfig();
+               final Configuration clientConfig = createSslConfig();
+
+               // give the client a different keystore / certificate
+               clientConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, 
"src/test/resources/untrusted.keystore");
+
+               final NettyConfig nettyServerConfig = 
createNettyConfig(serverConfig);
+               final NettyConfig nettyClientConfig = 
createNettyConfig(clientConfig);
+
+               final NettyBufferPool bufferPool = new NettyBufferPool(1);
+               final NettyProtocol protocol = new NoOpProtocol();
+
+               final NettyServer server = 
NettyTestUtil.initServer(nettyServerConfig, protocol, bufferPool);
+               final NettyClient client = 
NettyTestUtil.initClient(nettyClientConfig, protocol, bufferPool);
+               final NettyServerAndClient serverAndClient = new 
NettyServerAndClient(server, client);
+
+               final Channel ch = NettyTestUtil.connect(serverAndClient);
+               ch.pipeline().addLast(new StringDecoder()).addLast(new 
StringEncoder());
+
+               // Attempting to write data over ssl should fail
+               assertFalse(ch.writeAndFlush("test").await().isSuccess());
+
+               NettyTestUtil.shutdown(serverAndClient);
+       }
+
+       private static Configuration createSslConfig() {
+               return 
SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores();
+       }
+
+       private static NettyConfig createNettyConfig(Configuration config) {
+               return new NettyConfig(
+                               InetAddress.getLoopbackAddress(),
+                               NetUtils.getAvailablePort(),
+                               NettyTestUtil.DEFAULT_SEGMENT_SIZE,
+                               1,
+                               config);
+       }
+
+       private static final class NoOpProtocol extends NettyProtocol {
+
+               NoOpProtocol() {
+                       super(null, null, true);
+               }
+
+               @Override
+               public ChannelHandler[] getServerChannelHandlers() {
+                       return new ChannelHandler[0];
+               }
+
+               @Override
+               public ChannelHandler[] getClientChannelHandlers() {
+                       return new ChannelHandler[0];
+               }
        }
 }

Reply via email to