This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new d3c0ccb [pulsar-client] support input-stream for trustStore cert (#7442) d3c0ccb is described below commit d3c0ccb17f282e8063173a50ee825efa280d4f92 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Sat Sep 19 01:13:38 2020 -0700 [pulsar-client] support input-stream for trustStore cert (#7442) * [pulsar-client] support input-stream for trustStore cert remove file closing fix check-style * fix flaky test --- .../pulsar/client/api/TlsProducerConsumerTest.java | 23 +++++++++++--- .../admin/internal/http/AsyncHttpConnector.java | 14 +++++---- .../client/api/AuthenticationDataProvider.java | 10 +++++++ .../org/apache/pulsar/client/impl/HttpClient.java | 13 ++++---- .../client/impl/PulsarChannelInitializer.java | 27 ++++++++++------- .../client/impl/auth/AuthenticationDataTls.java | 15 ++++++++-- .../pulsar/client/impl/auth/AuthenticationTls.java | 13 ++++++-- .../util/NettyClientSslContextRefresher.java | 12 ++++++-- .../apache/pulsar/common/util/SecurityUtility.java | 35 +++++++++++++++++----- 9 files changed, 122 insertions(+), 40 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java index 9f1eac8..614e75e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java @@ -142,16 +142,18 @@ public class TlsProducerConsumerTest extends TlsProducerConsumerBase { log.info("-- Starting {} test --", methodName); String topicName = "persistent://my-property/use/my-ns/my-topic1"; ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrlTls()) - .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false) + .enableTls(true).allowTlsInsecureConnection(false) .operationTimeout(1000, TimeUnit.MILLISECONDS); AtomicInteger index = new AtomicInteger(0); ByteArrayInputStream certStream = createByteInputStream(TLS_CLIENT_CERT_FILE_PATH); ByteArrayInputStream keyStream = createByteInputStream(TLS_CLIENT_KEY_FILE_PATH); + ByteArrayInputStream trustStoreStream = createByteInputStream(TLS_TRUST_CERT_FILE_PATH); Supplier<ByteArrayInputStream> certProvider = () -> getStream(index, certStream); Supplier<ByteArrayInputStream> keyProvider = () -> getStream(index, keyStream); - AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider); + Supplier<ByteArrayInputStream> trustStoreProvider = () -> getStream(index, trustStoreStream); + AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider, trustStoreProvider); clientBuilder.authentication(auth); @Cleanup PulsarClient pulsarClient = clientBuilder.build(); @@ -196,16 +198,20 @@ public class TlsProducerConsumerTest extends TlsProducerConsumerBase { public void testTlsCertsFromDynamicStreamExpiredAndRenewCert() throws Exception { log.info("-- Starting {} test --", methodName); ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrlTls()) - .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false) + .enableTls(true).allowTlsInsecureConnection(false) .operationTimeout(1000, TimeUnit.MILLISECONDS); AtomicInteger certIndex = new AtomicInteger(1); AtomicInteger keyIndex = new AtomicInteger(0); + AtomicInteger trustStoreIndex = new AtomicInteger(1); ByteArrayInputStream certStream = createByteInputStream(TLS_CLIENT_CERT_FILE_PATH); ByteArrayInputStream keyStream = createByteInputStream(TLS_CLIENT_KEY_FILE_PATH); + ByteArrayInputStream trustStoreStream = createByteInputStream(TLS_TRUST_CERT_FILE_PATH); Supplier<ByteArrayInputStream> certProvider = () -> getStream(certIndex, certStream, keyStream/* invalid cert file */); Supplier<ByteArrayInputStream> keyProvider = () -> getStream(keyIndex, keyStream); - AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider); + Supplier<ByteArrayInputStream> trustStoreProvider = () -> getStream(trustStoreIndex, trustStoreStream, + keyStream/* invalid cert file */); + AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider, trustStoreProvider); clientBuilder.authentication(auth); @Cleanup PulsarClient pulsarClient = clientBuilder.build(); @@ -219,6 +225,15 @@ public class TlsProducerConsumerTest extends TlsProducerConsumerBase { } certIndex.set(0); + try { + consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("my-subscriber-name").subscribe(); + Assert.fail("should have failed due to invalid tls cert"); + } catch (PulsarClientException e) { + // Ok.. + } + + trustStoreIndex.set(0); consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1") .subscriptionName("my-subscriber-name").subscribe(); consumer.close(); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java index 70373fb..1cb21ff 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java @@ -139,11 +139,15 @@ public class AsyncHttpConnector implements Connector { } else { SslContext sslCtx = null; if (authData.hasDataForTls()) { - sslCtx = SecurityUtility.createNettySslContextForClient( - conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(), - conf.getTlsTrustCertsFilePath(), - authData.getTlsCertificates(), - authData.getTlsPrivateKey()); + sslCtx = authData.getTlsTrustStoreStream() == null + ? SecurityUtility.createNettySslContextForClient( + conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(), + conf.getTlsTrustCertsFilePath(), authData.getTlsCertificates(), + authData.getTlsPrivateKey()) + : SecurityUtility.createNettySslContextForClient( + conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(), + authData.getTlsTrustStoreStream(), authData.getTlsCertificates(), + authData.getTlsPrivateKey()); } else { sslCtx = SecurityUtility.createNettySslContextForClient( conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(), diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java index 77eafe5..ea15cda 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java @@ -20,6 +20,7 @@ package org.apache.pulsar.client.api; import static java.nio.charset.StandardCharsets.UTF_8; +import java.io.InputStream; import java.io.Serializable; import java.security.PrivateKey; import java.security.cert.Certificate; @@ -64,6 +65,15 @@ public interface AuthenticationDataProvider extends Serializable { } /** + * + * @return an input-stream of the trust store, or null if the trust-store provided at + * {@link ClientConfigurationData#getTlsTrustStorePath()} + */ + default InputStream getTlsTrustStoreStream() { + return null; + } + + /** * Used for TLS authentication with keystore type. * * @return a KeyStoreParams for the client certificate chain, or null if the data are not available diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java index 3ba02bb..1d3839c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java @@ -35,6 +35,7 @@ import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.ssl.SslContext; import javax.net.ssl.SSLContext; import lombok.extern.slf4j.Slf4j; + import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; @@ -112,11 +113,13 @@ public class HttpClient implements Closeable { } else { SslContext sslCtx = null; if (authData.hasDataForTls()) { - sslCtx = SecurityUtility.createNettySslContextForClient( - conf.isTlsAllowInsecureConnection(), - conf.getTlsTrustCertsFilePath(), - authData.getTlsCertificates(), - authData.getTlsPrivateKey()); + sslCtx = authData.getTlsTrustStoreStream() == null + ? SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(), + conf.getTlsTrustCertsFilePath(), authData.getTlsCertificates(), + authData.getTlsPrivateKey()) + : SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(), + authData.getTlsTrustStoreStream(), authData.getTlsCertificates(), + authData.getTlsPrivateKey()); } else { sslCtx = SecurityUtility.createNettySslContextForClient( diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java index e418904..ef2a78b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java @@ -18,17 +18,9 @@ */ package org.apache.pulsar.client.impl; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslHandler; import java.security.cert.X509Certificate; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import lombok.extern.slf4j.Slf4j; -import lombok.Getter; -import lombok.Setter; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.AuthenticationDataProvider; @@ -39,6 +31,15 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.SecurityUtility; import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslHandler; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + @Slf4j public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> { @@ -86,9 +87,13 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> // Set client certificate if available AuthenticationDataProvider authData = conf.getAuthentication().getAuthData(); if (authData.hasDataForTls()) { - return SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(), - conf.getTlsTrustCertsFilePath(), (X509Certificate[]) authData.getTlsCertificates(), - authData.getTlsPrivateKey()); + return authData.getTlsTrustStoreStream() == null + ? SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(), + conf.getTlsTrustCertsFilePath(), + (X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey()) + : SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(), + authData.getTlsTrustStoreStream(), + (X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey()); } else { return SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(), conf.getTlsTrustCertsFilePath()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java index 0d3df12..f11e974 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java @@ -41,7 +41,7 @@ public class AuthenticationDataTls implements AuthenticationDataProvider { private FileModifiedTimeUpdater certFile, keyFile; // key and cert using stream private InputStream certStream, keyStream; - private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider; + private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider, trustStoreStreamProvider; public AuthenticationDataTls(String certFilePath, String keyFilePath) throws KeyManagementException { if (certFilePath == null) { @@ -58,6 +58,12 @@ public class AuthenticationDataTls implements AuthenticationDataProvider { public AuthenticationDataTls(Supplier<ByteArrayInputStream> certStreamProvider, Supplier<ByteArrayInputStream> keyStreamProvider) throws KeyManagementException { + this(certStreamProvider, keyStreamProvider, null); + } + + public AuthenticationDataTls(Supplier<ByteArrayInputStream> certStreamProvider, + Supplier<ByteArrayInputStream> keyStreamProvider, Supplier<ByteArrayInputStream> trustStoreStreamProvider) + throws KeyManagementException { if (certStreamProvider == null || certStreamProvider.get() == null) { throw new IllegalArgumentException("certStream provider or stream must not be null"); } @@ -66,12 +72,12 @@ public class AuthenticationDataTls implements AuthenticationDataProvider { } this.certStreamProvider = certStreamProvider; this.keyStreamProvider = keyStreamProvider; + this.trustStoreStreamProvider = trustStoreStreamProvider; this.certStream = certStreamProvider.get(); this.keyStream = keyStreamProvider.get(); this.tlsCertificates = SecurityUtility.loadCertificatesFromPemStream(certStream); this.tlsPrivateKey = SecurityUtility.loadPrivateKeyFromPemStream(keyStream); } - /* * TLS */ @@ -121,5 +127,10 @@ public class AuthenticationDataTls implements AuthenticationDataProvider { return this.tlsPrivateKey; } + @Override + public InputStream getTlsTrustStoreStream() { + return trustStoreStreamProvider != null ? trustStoreStreamProvider.get() : null; + } + private static final Logger LOG = LoggerFactory.getLogger(AuthenticationDataTls.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java index d899146..326fa46 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java @@ -45,7 +45,7 @@ public class AuthenticationTls implements Authentication, EncodedAuthenticationP private String certFilePath; private String keyFilePath; - private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider; + private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider, trustStoreStreamProvider; public AuthenticationTls() { } @@ -55,9 +55,16 @@ public class AuthenticationTls implements Authentication, EncodedAuthenticationP this.keyFilePath = keyFilePath; } - public AuthenticationTls(Supplier<ByteArrayInputStream> certStreamProvider, Supplier<ByteArrayInputStream> keyStreamProvider) { + public AuthenticationTls(Supplier<ByteArrayInputStream> certStreamProvider, + Supplier<ByteArrayInputStream> keyStreamProvider) { + this(certStreamProvider, keyStreamProvider, null); + } + + public AuthenticationTls(Supplier<ByteArrayInputStream> certStreamProvider, + Supplier<ByteArrayInputStream> keyStreamProvider, Supplier<ByteArrayInputStream> trustStoreStreamProvider) { this.certStreamProvider = certStreamProvider; this.keyStreamProvider = keyStreamProvider; + this.trustStoreStreamProvider = trustStoreStreamProvider; } @Override @@ -76,7 +83,7 @@ public class AuthenticationTls implements Authentication, EncodedAuthenticationP if (certFilePath != null && keyFilePath != null) { return new AuthenticationDataTls(certFilePath, keyFilePath); } else if (certStreamProvider != null && keyStreamProvider != null) { - return new AuthenticationDataTls(certStreamProvider, keyStreamProvider); + return new AuthenticationDataTls(certStreamProvider, keyStreamProvider, trustStoreStreamProvider); } } catch (Exception e) { throw new PulsarClientException(e); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyClientSslContextRefresher.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyClientSslContextRefresher.java index 48cf992..35919c9 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyClientSslContextRefresher.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyClientSslContextRefresher.java @@ -19,12 +19,14 @@ package org.apache.pulsar.common.util; import io.netty.handler.ssl.SslContext; + import java.io.FileNotFoundException; import java.io.IOException; import java.security.GeneralSecurityException; import java.security.cert.X509Certificate; import javax.net.ssl.SSLException; import lombok.extern.slf4j.Slf4j; + import org.apache.pulsar.client.api.AuthenticationDataProvider; /** @@ -52,9 +54,13 @@ public class NettyClientSslContextRefresher extends SslContextAutoRefreshBuilder public synchronized SslContext update() throws SSLException, FileNotFoundException, GeneralSecurityException, IOException { if (authData != null && authData.hasDataForTls()) { - this.sslNettyContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection, - this.tlsTrustCertsFilePath.getFileName(), (X509Certificate[]) authData.getTlsCertificates(), - authData.getTlsPrivateKey()); + this.sslNettyContext = authData.getTlsTrustStoreStream() == null + ? SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection, + tlsTrustCertsFilePath.getFileName(), (X509Certificate[]) authData.getTlsCertificates(), + authData.getTlsPrivateKey()) + : SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection, + authData.getTlsTrustStoreStream(), (X509Certificate[]) authData.getTlsCertificates(), + authData.getTlsPrivateKey()); } else { this.sslNettyContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection, this.tlsTrustCertsFilePath.getFileName()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java index 555cbac..2e3633d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java @@ -56,6 +56,8 @@ import javax.net.ssl.SSLException; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import lombok.extern.slf4j.Slf4j; + +import org.apache.commons.lang3.StringUtils; import org.eclipse.jetty.util.ssl.SslContextFactory; /** @@ -165,8 +167,23 @@ public class SecurityUtility { public static SslContext createNettySslContextForClient(boolean allowInsecureConnection, String trustCertsFilePath, Certificate[] certificates, PrivateKey privateKey) throws GeneralSecurityException, SSLException, FileNotFoundException, IOException { + + if (StringUtils.isNotBlank(trustCertsFilePath)) { + try (FileInputStream trustCertsStream = new FileInputStream(trustCertsFilePath)) { + return createNettySslContextForClient(allowInsecureConnection, trustCertsStream, certificates, + privateKey); + } + } else { + return createNettySslContextForClient(allowInsecureConnection, (InputStream) null, certificates, + privateKey); + } + } + + public static SslContext createNettySslContextForClient(boolean allowInsecureConnection, + InputStream trustCertsStream, Certificate[] certificates, PrivateKey privateKey) + throws GeneralSecurityException, SSLException, FileNotFoundException, IOException { SslContextBuilder builder = SslContextBuilder.forClient(); - setupTrustCerts(builder, allowInsecureConnection, trustCertsFilePath); + setupTrustCerts(builder, allowInsecureConnection, trustCertsStream); setupKeyManager(builder, privateKey, (X509Certificate[]) certificates); return builder.build(); } @@ -181,7 +198,13 @@ public class SecurityUtility { SslContextBuilder builder = SslContextBuilder.forServer(privateKey, (X509Certificate[]) certificates); setupCiphers(builder, ciphers); setupProtocols(builder, protocols); - setupTrustCerts(builder, allowInsecureConnection, trustCertsFilePath); + if (StringUtils.isNotBlank(trustCertsFilePath)) { + try (FileInputStream trustCertsStream = new FileInputStream(trustCertsFilePath)) { + setupTrustCerts(builder, allowInsecureConnection, trustCertsStream); + } + } else { + setupTrustCerts(builder, allowInsecureConnection, null); + } setupKeyManager(builder, privateKey, certificates); setupClientAuthentication(builder, requireTrustedClientCertOnConnect); return builder.build(); @@ -320,14 +343,12 @@ public class SecurityUtility { } private static void setupTrustCerts(SslContextBuilder builder, boolean allowInsecureConnection, - String trustCertsFilePath) throws IOException, FileNotFoundException { + InputStream trustCertsStream) throws IOException, FileNotFoundException { if (allowInsecureConnection) { builder.trustManager(InsecureTrustManagerFactory.INSTANCE); } else { - if (trustCertsFilePath != null && trustCertsFilePath.length() != 0) { - try (FileInputStream input = new FileInputStream(trustCertsFilePath)) { - builder.trustManager(input); - } + if (trustCertsStream != null) { + builder.trustManager(trustCertsStream); } else { builder.trustManager((File) null); }