This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new a7e30d8 [pulsar-client] sni-proxy protocol should pass sni-host address without resolving (#8062) a7e30d8 is described below commit a7e30d8150f626e75d8a17980e6e1019de659a5d Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Mon Sep 21 15:39:09 2020 -0700 [pulsar-client] sni-proxy protocol should pass sni-host address without resolving (#8062) make sni-proxy connection creation thread-safe remove unused pair initialize channel explicitly when sni-proxy is configured initialize channel in io thread fix channel var --- .../pulsar/client/api/ProxyProtocolTest.java | 3 +- .../apache/pulsar/client/impl/ConnectionPool.java | 105 +++++++++++---------- .../client/impl/PulsarChannelInitializer.java | 41 ++++---- 3 files changed, 79 insertions(+), 70 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java index 964ebd9..d264b83 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java @@ -40,7 +40,7 @@ public class ProxyProtocolTest extends TlsProducerConsumerBase { // Client should try to connect to proxy and pass broker-url as SNI header String proxyUrl = pulsar.getBrokerServiceUrlTls(); - String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651"; + String brokerServiceUrl = "pulsar+ssl://unresolvable-address:6651"; String topicName = "persistent://my-property/use/my-ns/my-topic1"; ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl) @@ -53,7 +53,6 @@ public class ProxyProtocolTest extends TlsProducerConsumerBase { @Cleanup PulsarClient pulsarClient = clientBuilder.build(); - // should be able to create producer successfully pulsarClient.newProducer().topic(topicName).create(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 316145e..32d7195 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -19,7 +19,6 @@ package org.apache.pulsar.client.impl; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -37,7 +36,6 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; -import java.net.UnknownHostException; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -66,6 +64,7 @@ public class ConnectionPool implements Closeable { private final ClientConfigurationData clientConfig; private final EventLoopGroup eventLoopGroup; private final int maxConnectionsPerHosts; + private final boolean isSniProxy; protected final DnsNameResolver dnsResolver; @@ -78,6 +77,8 @@ public class ConnectionPool implements Closeable { this.eventLoopGroup = eventLoopGroup; this.clientConfig = conf; this.maxConnectionsPerHosts = conf.getConnectionsPerBroker(); + this.isSniProxy = clientConfig.isUseTls() && clientConfig.getProxyProtocol() != null + && StringUtils.isNotBlank(clientConfig.getProxyServiceUrl()); pool = new ConcurrentHashMap<>(); bootstrap = new Bootstrap(); @@ -89,7 +90,7 @@ public class ConnectionPool implements Closeable { bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); try { - channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier); + channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier, isSniProxy); bootstrap.handler(channelInitializerHandler); } catch (Exception e) { log.error("Failed to create channel initializer"); @@ -224,18 +225,24 @@ public class ConnectionPool implements Closeable { * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server */ private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) { - String hostname = unresolvedAddress.getHostString(); - int port = unresolvedAddress.getPort(); + int port; + CompletableFuture<List<InetAddress>> resolvedAddress = null; try { - // For non-sni-proxy: Resolve DNS --> Attempt to connect to all IP addresses until once succeeds - CompletableFuture<List<InetAddress>> resolvedAddress = isSniProxy() - ? CompletableFuture.completedFuture(Lists.newArrayList(InetAddress.getByName(hostname))) - : resolveName(hostname); - return resolvedAddress - .thenCompose(inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), port)); - } catch (UnknownHostException e) { - log.error("Invalid remote url {}", hostname, e); - return FutureUtil.failedFuture(new InvalidServiceURL("Invalid url " + hostname, e)); + if (isSniProxy) { + URI proxyURI = new URI(clientConfig.getProxyServiceUrl()); + port = proxyURI.getPort(); + resolvedAddress = resolveName(proxyURI.getHost()); + } else { + port = unresolvedAddress.getPort(); + resolvedAddress = resolveName(unresolvedAddress.getHostString()); + } + return resolvedAddress.thenCompose( + inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), port, + isSniProxy ? unresolvedAddress : null)); + } catch (URISyntaxException e) { + log.error("Invalid Proxy url {}", clientConfig.getProxyServiceUrl(), e); + return FutureUtil + .failedFuture(new InvalidServiceURL("Invalid url " + clientConfig.getProxyServiceUrl(), e)); } } @@ -243,16 +250,16 @@ public class ConnectionPool implements Closeable { * Try to connect to a sequence of IP addresses until a successfull connection can be made, or fail if no address is * working */ - private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port) { + private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port, InetSocketAddress sniHost) { CompletableFuture<Channel> future = new CompletableFuture<>(); - connectToAddress(unresolvedAddresses.next(), port, false).thenAccept(channel -> { + connectToAddress(unresolvedAddresses.next(), port, sniHost).thenAccept(channel -> { // Successfully connected to server future.complete(channel); }).exceptionally(exception -> { if (unresolvedAddresses.hasNext()) { // Try next IP address - connectToResolvedAddresses(unresolvedAddresses, port).thenAccept(channel -> { + connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(channel -> { future.complete(channel); }).exceptionally(ex -> { // This is already unwinding the recursive call @@ -285,35 +292,39 @@ public class ConnectionPool implements Closeable { /** * Attempt to establish a TCP connection to an already resolved single IP address */ - private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, boolean ignoreProxyUrl) { + private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) { CompletableFuture<Channel> future = new CompletableFuture<>(); - - if (!ignoreProxyUrl && isSniProxy()) { - // client wants to connect to proxy and wants to pass - // target connection host in sni header - channelInitializerHandler.setSniHostName(ipAddress.getHostName()); - channelInitializerHandler.setSniHostPort(port); - // connect to proxy host - try { - URI proxyURI = new URI(clientConfig.getProxyServiceUrl()); - // resolve proxy host-address and try to connect again by passing flag ignoreProxyUrl because proxy-host - // will be already resolved - return resolveName(proxyURI.getHost()) - .thenCompose(inetAddresses -> connectToAddress(inetAddresses.iterator().next(), proxyURI.getPort(), true)); - } catch (URISyntaxException e) { - log.error("Failed to parse proxy-service url {}", clientConfig.getProxyServiceUrl(), e); - future.completeExceptionally(e); - return future; - } + // if proxy is configured in pulsar-client then make it thread-safe while updating channelInitializerHandler + if (isSniProxy) { + bootstrap.register().addListener((ChannelFuture cf) -> { + if (!cf.isSuccess()) { + future.completeExceptionally(cf.cause()); + return; + } + Channel channel = cf.channel(); + try { + channelInitializerHandler.initChannel(channel, sniHost); + channel.connect(new InetSocketAddress(ipAddress, port)).addListener((ChannelFuture channelFuture) -> { + if (channelFuture.isSuccess()) { + future.complete(channelFuture.channel()); + } else { + future.completeExceptionally(channelFuture.cause()); + } + }); + } catch (Exception e) { + log.warn("Failed to initialize channel with {}, {}", ipAddress, sniHost, e); + future.completeExceptionally(e); + } + }); + } else { + bootstrap.connect(ipAddress, port).addListener((ChannelFuture channelFuture) -> { + if (channelFuture.isSuccess()) { + future.complete(channelFuture.channel()); + } else { + future.completeExceptionally(channelFuture.cause()); + } + }); } - bootstrap.connect(ipAddress, port).addListener((ChannelFuture channelFuture) -> { - if (channelFuture.isSuccess()) { - future.complete(channelFuture.channel()); - } else { - future.completeExceptionally(channelFuture.cause()); - } - }); - return future; } @@ -336,7 +347,6 @@ public class ConnectionPool implements Closeable { } catch (InterruptedException e) { log.warn("EventLoopGroup shutdown was interrupted", e); } - dnsResolver.close(); } @@ -361,10 +371,5 @@ public class ConnectionPool implements Closeable { return mod; } - private boolean isSniProxy() { - return channelInitializerHandler.isTlsEnabled() && clientConfig.getProxyProtocol() != null - && StringUtils.isNotBlank(clientConfig.getProxyServiceUrl()); - } - private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java index ef2a78b..043151c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java @@ -18,11 +18,11 @@ */ package org.apache.pulsar.client.impl; +import java.net.InetSocketAddress; import java.security.cert.X509Certificate; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.util.ObjectCache; @@ -31,13 +31,13 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.SecurityUtility; import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder; +import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import lombok.Getter; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -52,19 +52,17 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> private final Supplier<SslContext> sslContextSupplier; private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder; - @Setter - private String sniHostName; - @Setter - private int sniHostPort; + private final boolean isSniProxyEnabled; private static final long TLS_CERTIFICATE_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(1); - public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx> clientCnxSupplier) + public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx> clientCnxSupplier, boolean isSniProxyEnabled) throws Exception { super(); this.clientCnxSupplier = clientCnxSupplier; this.tlsEnabled = conf.isUseTls(); this.tlsEnabledWithKeyStore = conf.isUseKeyStoreTls(); + this.isSniProxyEnabled = isSniProxyEnabled; if (tlsEnabled) { if (tlsEnabledWithKeyStore) { @@ -109,26 +107,33 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> @Override public void initChannel(SocketChannel ch) throws Exception { + /** + * skip initializing channel if sni-proxy is enabled in that case {@link ConnectionPool} will initialize the + * channel explicitly. + */ + if (!isSniProxyEnabled) { + initChannel(ch, null); + } + } + + public void initChannel(Channel ch, InetSocketAddress sniHost) throws Exception { if (tlsEnabled) { if (tlsEnabledWithKeyStore) { ch.pipeline().addLast(TLS_HANDLER, new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine())); - } else { - SslHandler handler = StringUtils.isNotBlank(sniHostName) - ? sslContextSupplier.get().newHandler(ch.alloc(), sniHostName, sniHostPort) - : sslContextSupplier.get().newHandler(ch.alloc()); - ch.pipeline().addLast(TLS_HANDLER, handler); - } + } else { + SslHandler handler = sniHost != null + ? sslContextSupplier.get().newHandler(ch.alloc(), sniHost.getHostName(), sniHost.getPort()) + : sslContextSupplier.get().newHandler(ch.alloc()); + ch.pipeline().addLast(TLS_HANDLER, handler); + } ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER); } else { ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); } - ch.pipeline() - .addLast("frameDecoder", - new LengthFieldBasedFrameDecoder( - Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, - 0, 4, 0, 4)); + ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( + Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); ch.pipeline().addLast("handler", clientCnxSupplier.get()); } }