[FLINK-9314] [security] (part 4) Add mutual authentication for internal Netty and Blob Server connections
This closes #6326. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a502f827 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a502f827 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a502f827 Branch: refs/heads/master Commit: a502f82777acc825aaea4de22ece6b4d247260b9 Parents: 3aeb00f Author: Stephan Ewen <se...@apache.org> Authored: Thu Jul 12 21:18:46 2018 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Mon Jul 16 08:10:46 2018 +0200 ---------------------------------------------------------------------- .../runtime/io/network/netty/NettyConfig.java | 4 + .../flink/runtime/net/SSLEngineFactory.java | 10 +- .../org/apache/flink/runtime/net/SSLUtils.java | 9 +- .../network/netty/NettyClientServerSslTest.java | 133 ++++++++++--------- 4 files changed, 90 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a502f827/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java index 46cdaab..59971e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java @@ -213,6 +213,10 @@ public class NettyConfig { return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); } + public Configuration getConfig() { + return config; + } + @Override public String toString() { String format = "NettyConfig [" + http://git-wip-us.apache.org/repos/asf/flink/blob/a502f827/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java index d842267..68f60b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLEngineFactory.java @@ -36,15 +36,20 @@ public class SSLEngineFactory { private final boolean clientMode; + final boolean clientAuthentication; + public SSLEngineFactory( final SSLContext sslContext, final String[] enabledProtocols, final String[] enabledCipherSuites, - final boolean clientMode) { + final boolean clientMode, + final boolean clientAuthentication) { + this.sslContext = requireNonNull(sslContext, "sslContext must not be null"); this.enabledProtocols = requireNonNull(enabledProtocols, "enabledProtocols must not be null"); this.enabledCipherSuites = requireNonNull(enabledCipherSuites, "cipherSuites must not be null"); this.clientMode = clientMode; + this.clientAuthentication = clientAuthentication; } public SSLEngine createSSLEngine() { @@ -63,5 +68,8 @@ public class SSLEngineFactory { sslEngine.setEnabledProtocols(enabledProtocols); sslEngine.setEnabledCipherSuites(enabledCipherSuites); sslEngine.setUseClientMode(clientMode); + if (!clientMode) { + sslEngine.setNeedClientAuth(clientAuthentication); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/a502f827/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java index 5c95535..d209f5f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java @@ -108,7 +108,8 @@ public class SSLUtils { sslContext, getEnabledProtocols(config), getEnabledCipherSuites(config), - false); + false, + true); } /** @@ -124,6 +125,7 @@ public class SSLUtils { sslContext, getEnabledProtocols(config), getEnabledCipherSuites(config), + true, true); } @@ -142,6 +144,7 @@ public class SSLUtils { sslContext, getEnabledProtocols(config), getEnabledCipherSuites(config), + false, false); } @@ -160,7 +163,8 @@ public class SSLUtils { sslContext, getEnabledProtocols(config), getEnabledCipherSuites(config), - true); + true, + false); } private static String[] getEnabledProtocols(final Configuration config) { @@ -352,6 +356,7 @@ public class SSLUtils { private void configureServerSocket(SSLServerSocket socket) { socket.setEnabledProtocols(protocols); socket.setEnabledCipherSuites(cipherSuites); + socket.setNeedClientAuth(true); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/a502f827/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java index 2750660..b1f3b48 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient; +import org.apache.flink.runtime.net.SSLUtilsTest; import org.apache.flink.util.NetUtils; import org.apache.flink.shaded.netty4.io.netty.channel.Channel; @@ -35,6 +37,10 @@ import java.net.InetAddress; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +/** + * Tests for the SSL connection between Netty Server and Client used for the + * data plane. + */ public class NettyClientServerSslTest { /** @@ -42,24 +48,9 @@ public class NettyClientServerSslTest { */ @Test public void testValidSslConnection() throws Exception { - NettyProtocol protocol = new NettyProtocol(null, null, true) { - @Override - public ChannelHandler[] getServerChannelHandlers() { - return new ChannelHandler[0]; - } - - @Override - public ChannelHandler[] getClientChannelHandlers() { - return new ChannelHandler[0]; - } - }; - - NettyConfig nettyConfig = new NettyConfig( - InetAddress.getLoopbackAddress(), - NetUtils.getAvailablePort(), - NettyTestUtil.DEFAULT_SEGMENT_SIZE, - 1, - createSslConfig()); + NettyProtocol protocol = new NoOpProtocol(); + + NettyConfig nettyConfig = createNettyConfig(createSslConfig()); NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig); @@ -77,28 +68,13 @@ public class NettyClientServerSslTest { */ @Test public void testInvalidSslConfiguration() throws Exception { - NettyProtocol protocol = new NettyProtocol(null, null, true) { - @Override - public ChannelHandler[] getServerChannelHandlers() { - return new ChannelHandler[0]; - } - - @Override - public ChannelHandler[] getClientChannelHandlers() { - return new ChannelHandler[0]; - } - }; + NettyProtocol protocol = new NoOpProtocol(); Configuration config = createSslConfig(); // Modify the keystore password to an incorrect one - config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "invalidpassword"); + config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, "invalidpassword"); - NettyConfig nettyConfig = new NettyConfig( - InetAddress.getLoopbackAddress(), - NetUtils.getAvailablePort(), - NettyTestUtil.DEFAULT_SEGMENT_SIZE, - 1, - config); + NettyConfig nettyConfig = createNettyConfig(config); NettyTestUtil.NettyServerAndClient serverAndClient = null; try { @@ -116,29 +92,14 @@ public class NettyClientServerSslTest { */ @Test public void testSslHandshakeError() throws Exception { - NettyProtocol protocol = new NettyProtocol(null, null, true) { - @Override - public ChannelHandler[] getServerChannelHandlers() { - return new ChannelHandler[0]; - } - - @Override - public ChannelHandler[] getClientChannelHandlers() { - return new ChannelHandler[0]; - } - }; + NettyProtocol protocol = new NoOpProtocol(); Configuration config = createSslConfig(); // Use a server certificate which is not present in the truststore - config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/untrusted.keystore"); + config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/untrusted.keystore"); - NettyConfig nettyConfig = new NettyConfig( - InetAddress.getLoopbackAddress(), - NetUtils.getAvailablePort(), - NettyTestUtil.DEFAULT_SEGMENT_SIZE, - 1, - config); + NettyConfig nettyConfig = createNettyConfig(config); NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig); @@ -151,14 +112,60 @@ public class NettyClientServerSslTest { NettyTestUtil.shutdown(serverAndClient); } - private static Configuration createSslConfig() throws Exception { - Configuration flinkConfig = new Configuration(); - flinkConfig.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true); - flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/local127.keystore"); - flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, "password"); - flinkConfig.setString(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, "password"); - flinkConfig.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE, "src/test/resources/local127.truststore"); - flinkConfig.setString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, "password"); - return flinkConfig; + @Test + public void testClientUntrustedCertificate() throws Exception { + final Configuration serverConfig = createSslConfig(); + final Configuration clientConfig = createSslConfig(); + + // give the client a different keystore / certificate + clientConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/untrusted.keystore"); + + final NettyConfig nettyServerConfig = createNettyConfig(serverConfig); + final NettyConfig nettyClientConfig = createNettyConfig(clientConfig); + + final NettyBufferPool bufferPool = new NettyBufferPool(1); + final NettyProtocol protocol = new NoOpProtocol(); + + final NettyServer server = NettyTestUtil.initServer(nettyServerConfig, protocol, bufferPool); + final NettyClient client = NettyTestUtil.initClient(nettyClientConfig, protocol, bufferPool); + final NettyServerAndClient serverAndClient = new NettyServerAndClient(server, client); + + final Channel ch = NettyTestUtil.connect(serverAndClient); + ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder()); + + // Attempting to write data over ssl should fail + assertFalse(ch.writeAndFlush("test").await().isSuccess()); + + NettyTestUtil.shutdown(serverAndClient); + } + + private static Configuration createSslConfig() { + return SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores(); + } + + private static NettyConfig createNettyConfig(Configuration config) { + return new NettyConfig( + InetAddress.getLoopbackAddress(), + NetUtils.getAvailablePort(), + NettyTestUtil.DEFAULT_SEGMENT_SIZE, + 1, + config); + } + + private static final class NoOpProtocol extends NettyProtocol { + + NoOpProtocol() { + super(null, null, true); + } + + @Override + public ChannelHandler[] getServerChannelHandlers() { + return new ChannelHandler[0]; + } + + @Override + public ChannelHandler[] getClientChannelHandlers() { + return new ChannelHandler[0]; + } } }