This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f2933f7 Always use SNI for TLS enabled Pulsar Java client. (#8117) f2933f7 is described below commit f2933f7da4850814d92fac0e54c5314c51c8fc32 Author: Rolf Arne Corneliussen <rac...@users.noreply.github.com> AuthorDate: Thu Sep 24 22:37:51 2020 +0200 Always use SNI for TLS enabled Pulsar Java client. (#8117) Co-authored-by: Rolf Arne Corneliussen <rolf.arne.cornelius...@addsecure.com> --- .../org/apache/pulsar/client/api/TlsSniTest.java | 66 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConnectionPool.java | 56 +++++++----------- .../client/impl/PulsarChannelInitializer.java | 58 +++++++++---------- .../util/keystoretls/KeyStoreSSLContext.java | 10 +++- 4 files changed, 124 insertions(+), 66 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java new file mode 100644 index 0000000..fc8c242 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.net.InetAddress; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.impl.auth.AuthenticationTls; +import org.testng.annotations.Test; + +import lombok.Cleanup; + +public class TlsSniTest extends TlsProducerConsumerBase { + + /** + * Verify that using an IP-address in the broker service URL will work with using the SNI capabilities + * of the client. If we try to create an {@link javax.net.ssl.SSLEngine} with a peer host that is an + * IP address, the peer host is ignored, see for example + * {@link io.netty.handler.ssl.ReferenceCountedOpenSslEngine}. + * + */ + @Test + public void testIpAddressInBrokerServiceUrl() throws Exception { + String topicName = "persistent://my-property/use/my-ns/my-topic1"; + + URI brokerServiceUrlTls = new URI(pulsar.getBrokerServiceUrlTls()); + + String brokerServiceIpAddressUrl = String.format("pulsar+ssl://%s:%d", + InetAddress.getByName(brokerServiceUrlTls.getHost()).getHostAddress(), + brokerServiceUrlTls.getPort()); + + ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceIpAddressUrl) + .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(false) + .enableTlsHostnameVerification(false) + .operationTimeout(1000, TimeUnit.MILLISECONDS); + Map<String, String> authParams = new HashMap<>(); + authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH); + authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH); + clientBuilder.authentication(AuthenticationTls.class.getName(), authParams); + + @Cleanup + PulsarClient pulsarClient = clientBuilder.build(); + // should be able to create producer successfully + pulsarClient.newProducer().topic(topicName).create(); + } +} + diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 32d7195..6203990 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -90,7 +90,7 @@ public class ConnectionPool implements Closeable { bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); try { - channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier, isSniProxy); + channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier); bootstrap.handler(channelInitializerHandler); } catch (Exception e) { log.error("Failed to create channel initializer"); @@ -293,39 +293,12 @@ public class ConnectionPool implements Closeable { * Attempt to establish a TCP connection to an already resolved single IP address */ private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) { - CompletableFuture<Channel> future = new CompletableFuture<>(); - // if proxy is configured in pulsar-client then make it thread-safe while updating channelInitializerHandler - if (isSniProxy) { - bootstrap.register().addListener((ChannelFuture cf) -> { - if (!cf.isSuccess()) { - future.completeExceptionally(cf.cause()); - return; - } - Channel channel = cf.channel(); - try { - channelInitializerHandler.initChannel(channel, sniHost); - channel.connect(new InetSocketAddress(ipAddress, port)).addListener((ChannelFuture channelFuture) -> { - if (channelFuture.isSuccess()) { - future.complete(channelFuture.channel()); - } else { - future.completeExceptionally(channelFuture.cause()); - } - }); - } catch (Exception e) { - log.warn("Failed to initialize channel with {}, {}", ipAddress, sniHost, e); - future.completeExceptionally(e); - } - }); - } else { - bootstrap.connect(ipAddress, port).addListener((ChannelFuture channelFuture) -> { - if (channelFuture.isSuccess()) { - future.complete(channelFuture.channel()); - } else { - future.completeExceptionally(channelFuture.cause()); - } - }); - } - return future; + InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port); + return adapt(bootstrap.register()) + .thenCompose(channel -> clientConfig.isUseTls() + ? channelInitializerHandler.initTls(channel, sniHost != null ? sniHost : remoteAddress) + : CompletableFuture.completedFuture(channel)) + .thenCompose(channel -> adapt(channel.connect(remoteAddress))); } public void releaseConnection(ClientCnx cnx) { @@ -364,7 +337,7 @@ public class ConnectionPool implements Closeable { } public static int signSafeMod(long dividend, int divisor) { - int mod = (int) (dividend % (long) divisor); + int mod = (int) (dividend % divisor); if (mod < 0) { mod += divisor; } @@ -372,4 +345,17 @@ public class ConnectionPool implements Closeable { } private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class); + + private static CompletableFuture<Channel> adapt(ChannelFuture channelFuture) { + CompletableFuture<Channel> adapter = new CompletableFuture<>(); + channelFuture.addListener((ChannelFuture cf) ->{ + if (cf.isSuccess()) { + adapter.complete(channelFuture.channel()); + } else { + adapter.completeExceptionally(channelFuture.cause()); + } + }); + return adapter; + } } + diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java index 043151c..f50bed5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java @@ -19,7 +19,7 @@ package org.apache.pulsar.client.impl; import java.net.InetSocketAddress; -import java.security.cert.X509Certificate; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -52,17 +52,15 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> private final Supplier<SslContext> sslContextSupplier; private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder; - private final boolean isSniProxyEnabled; private static final long TLS_CERTIFICATE_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(1); - public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx> clientCnxSupplier, boolean isSniProxyEnabled) + public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx> clientCnxSupplier) throws Exception { super(); this.clientCnxSupplier = clientCnxSupplier; this.tlsEnabled = conf.isUseTls(); this.tlsEnabledWithKeyStore = conf.isUseKeyStoreTls(); - this.isSniProxyEnabled = isSniProxyEnabled; if (tlsEnabled) { if (tlsEnabledWithKeyStore) { @@ -88,10 +86,10 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> return authData.getTlsTrustStoreStream() == null ? SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(), conf.getTlsTrustCertsFilePath(), - (X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey()) + authData.getTlsCertificates(), authData.getTlsPrivateKey()) : SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(), authData.getTlsTrustStoreStream(), - (X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey()); + authData.getTlsCertificates(), authData.getTlsPrivateKey()); } else { return SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(), conf.getTlsTrustCertsFilePath()); @@ -107,33 +105,35 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> @Override public void initChannel(SocketChannel ch) throws Exception { - /** - * skip initializing channel if sni-proxy is enabled in that case {@link ConnectionPool} will initialize the - * channel explicitly. - */ - if (!isSniProxyEnabled) { - initChannel(ch, null); - } - } - public void initChannel(Channel ch, InetSocketAddress sniHost) throws Exception { - if (tlsEnabled) { - if (tlsEnabledWithKeyStore) { - ch.pipeline().addLast(TLS_HANDLER, - new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine())); - } else { - SslHandler handler = sniHost != null - ? sslContextSupplier.get().newHandler(ch.alloc(), sniHost.getHostName(), sniHost.getPort()) - : sslContextSupplier.get().newHandler(ch.alloc()); - ch.pipeline().addLast(TLS_HANDLER, handler); - } - ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER); - } else { - ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); - } + // Setup channel except for the SsHandler for TLS enabled connections + + ch.pipeline().addLast("ByteBufPairEncoder", tlsEnabled ? ByteBufPair.COPYING_ENCODER : ByteBufPair.ENCODER); ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); ch.pipeline().addLast("handler", clientCnxSupplier.get()); } + + CompletableFuture<Channel> initTls(Channel ch, InetSocketAddress sniHost) { + if (!tlsEnabled) { + throw new IllegalStateException("TLS is not enabled in client configuration"); + } + CompletableFuture<Channel> initTlsFuture = new CompletableFuture<>(); + ch.eventLoop().execute(() -> { + try { + SslHandler handler = tlsEnabledWithKeyStore + ? new SslHandler(nettySSLContextAutoRefreshBuilder.get() + .createSSLEngine(sniHost.getHostString(), sniHost.getPort())) + : sslContextSupplier.get().newHandler(ch.alloc(), sniHost.getHostString(), sniHost.getPort()); + ch.pipeline().addFirst(TLS_HANDLER, handler); + initTlsFuture.complete(ch); + } catch (Throwable t) { + initTlsFuture.completeExceptionally(t); + } + }); + + return initTlsFuture; + } } + diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java index b9ad2e7..736d189 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java @@ -166,8 +166,14 @@ public class KeyStoreSSLContext { } public SSLEngine createSSLEngine() { - SSLEngine sslEngine = sslContext.createSSLEngine(); + return configureSSLEngine(sslContext.createSSLEngine()); + } + + public SSLEngine createSSLEngine(String peerHost, int peerPort) { + return configureSSLEngine(sslContext.createSSLEngine(peerHost, peerPort)); + } + private SSLEngine configureSSLEngine(SSLEngine sslEngine) { sslEngine.setEnabledProtocols(sslEngine.getSupportedProtocols()); sslEngine.setEnabledCipherSuites(sslEngine.getSupportedCipherSuites()); @@ -177,7 +183,6 @@ public class KeyStoreSSLContext { } else { sslEngine.setUseClientMode(true); } - return sslEngine; } @@ -353,3 +358,4 @@ public class KeyStoreSSLContext { return sslCtxFactory; } } +