This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 98bf97e [pulsar-broker] Perform auto cert refresh for Pulsar-admin (#8831) 98bf97e is described below commit 98bf97e0765db30022597dd468afcc5227b417e8 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Wed Dec 23 12:21:16 2020 -0800 [pulsar-broker] Perform auto cert refresh for Pulsar-admin (#8831) ### Motivation We are frequently getting 500 on `pulsar-admin topics list <ns>` cli command. It happens because `pulsar-admin topics` rest-api internally uses `pulsar-admin` to get list of non-persistent topics. `PulsarAdmin-HttpClient` crates persistent connection but it doesn't perform auto-cert refresh so, if cert is expired and reconnection happens then broker always gets 500 when it uses `pulsar-admin` internally due to invalid certs. ``` 21:09:16.025 [AsyncHttpClient-48-9] ERROR org.apache.pulsar.broker.admin.v1.NonPersistentTopics - [role] Failed to get list of topics under namespace prop/cluster/ns java.util.concurrent.ExecutionException: org.apache.pulsar.client.admin.PulsarAdminException: java.net.ConnectException: error:10000416:SSL routines:OPENSSL_internal:SSLV3_ALERT_CERTIFICATE_UNKNOWN at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?] at org.apache.pulsar.broker.admin.v1.NonPersistentTopics.lambda$getList$0(NonPersistentTopics.java:211) ~[pulsar-broker.jar:] at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[?:?] at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?] at org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl$4.failed(NonPersistentTopicsImpl.java:215) ~[pulsar-client-admin-original.jar:] at org.glassfish.jersey.client.JerseyInvocation$4.failed(JerseyInvocation.java:1030) ~[jersey-client-2.27.jar:?] at org.glassfish.jersey.client.ClientRuntime.processFailure(ClientRuntime.java:231) ~[jersey-client-2.27.jar:?] at org.glassfish.jersey.client.ClientRuntime.access$100(ClientRuntime.java:85) ~[jersey-client-2.27.jar:?] at org.glassfish.jersey.client.ClientRuntime$2.lambda$failure$1(ClientRuntime.java:183) ~[jersey-client-2.27.jar:?] at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272) [jersey-common-2.27.jar:?] at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268) [jersey-common-2.27.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:316) [jersey-common-2.27.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:298) [jersey-common-2.27.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:268) [jersey-common-2.27.jar:?] ``` ### Modification Add Capability in HttpClient to perform auto-cert refresh to avoid any tls handshake failure. --- .../pulsar/broker/admin/AdminApiTlsAuthTest.java | 60 ++++++++ .../apache/pulsar/client/admin/PulsarAdmin.java | 14 +- .../pulsar/client/admin/PulsarAdminBuilder.java | 7 + .../admin/internal/PulsarAdminBuilderImpl.java | 15 +- .../admin/internal/http/AsyncHttpConnector.java | 12 +- .../internal/http/AsyncHttpConnectorProvider.java | 12 +- .../client/api/AuthenticationDataProvider.java | 15 ++ .../client/impl/auth/AuthenticationDataTls.java | 10 ++ .../common/util/FileModifiedTimeUpdater.java | 3 + .../apache/pulsar/common/util/KeyManagerProxy.java | 152 +++++++++++++++++++++ .../apache/pulsar/common/util/SecurityUtility.java | 34 +++++ .../pulsar/common/util/TrustManagerProxy.java | 129 +++++++++++++++++ 12 files changed, 446 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java index b623725..1cc110f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java @@ -20,9 +20,15 @@ package org.apache.pulsar.broker.admin; import com.google.common.collect.ImmutableSet; +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; import java.security.cert.X509Certificate; import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; import javax.ws.rs.NotAuthorizedException; @@ -34,6 +40,7 @@ import javax.ws.rs.core.MediaType; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -371,4 +378,57 @@ public class AdminApiTlsAuthTest extends MockedPulsarServiceBaseTest { admin.namespaces().deleteNamespace("tenant1/ns1"); } } + + /** + * Validates Pulsar-admin performs auto cert refresh. + * @throws Exception + */ + @Test + public void testCertRefreshForPulsarAdmin() throws Exception { + String adminUser = "admin"; + String user2 = "user1"; + File keyFile = new File(getTLSFile("temp" + ".key-pk8")); + Path keyFilePath = Paths.get(keyFile.getAbsolutePath()); + int autoCertRefreshTimeSec = 1; + try { + Files.copy(Paths.get(getTLSFile(user2 + ".key-pk8")), keyFilePath, StandardCopyOption.REPLACE_EXISTING); + PulsarAdmin admin = PulsarAdmin.builder() + .allowTlsInsecureConnection(false) + .enableTlsHostnameVerification(false) + .serviceHttpUrl(brokerUrlTls.toString()) + .autoCertRefreshTime(1, TimeUnit.SECONDS) + .authentication("org.apache.pulsar.client.impl.auth.AuthenticationTls", + String.format("tlsCertFile:%s,tlsKeyFile:%s", + getTLSFile(adminUser + ".cert"), keyFile)) + .tlsTrustCertsFilePath(getTLSFile("ca.cert")).build(); + // try to call admin-api which should fail due to incorrect key-cert + try { + admin.tenants().createTenant("tenantX", + new TenantInfo(ImmutableSet.of("foobar"), ImmutableSet.of("test"))); + Assert.fail("should have failed due to invalid key file"); + } catch (Exception e) { + //OK + } + // replace correct key file + Files.delete(keyFile.toPath()); + Thread.sleep(2 * autoCertRefreshTimeSec * 1000); + Files.copy(Paths.get(getTLSFile(adminUser + ".key-pk8")), keyFilePath); + MutableBoolean success = new MutableBoolean(false); + retryStrategically((test) -> { + try { + admin.tenants().createTenant("tenantX", + new TenantInfo(ImmutableSet.of("foobar"), ImmutableSet.of("test"))); + success.setValue(true); + return true; + }catch(Exception e) { + return false; + } + }, 5, 1000); + Assert.assertTrue(success.booleanValue()); + Assert.assertEquals(ImmutableSet.of("tenantX"), admin.tenants().getTenants()); + admin.close(); + }finally { + Files.delete(keyFile.toPath()); + } + } } \ No newline at end of file diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java index f202397..7695e32 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java @@ -73,6 +73,7 @@ public class PulsarAdmin implements Closeable { public static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 60; public static final int DEFAULT_READ_TIMEOUT_SECONDS = 60; public static final int DEFAULT_REQUEST_TIMEOUT_SECONDS = 300; + public static final int DEFAULT_CERT_REFRESH_SECONDS = 300; private final Clusters clusters; private final Brokers brokers; @@ -133,9 +134,8 @@ public class PulsarAdmin implements Closeable { public PulsarAdmin(String serviceUrl, ClientConfigurationData clientConfigData) throws PulsarClientException { this(serviceUrl, clientConfigData, DEFAULT_CONNECT_TIMEOUT_SECONDS, TimeUnit.SECONDS, - DEFAULT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS, - DEFAULT_REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS, null); - + DEFAULT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS, DEFAULT_REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS, + DEFAULT_CERT_REFRESH_SECONDS, TimeUnit.SECONDS, null); } public PulsarAdmin(String serviceUrl, @@ -146,6 +146,8 @@ public class PulsarAdmin implements Closeable { TimeUnit readTimeoutUnit, int requestTimeout, TimeUnit requestTimeoutUnit, + int autoCertRefreshTime, + TimeUnit autoCertRefreshTimeUnit, ClassLoader clientBuilderClassLoader) throws PulsarClientException { this.connectTimeout = connectTimeout; this.connectTimeoutUnit = connectTimeoutUnit; @@ -166,7 +168,8 @@ public class PulsarAdmin implements Closeable { clientConfigData.setServiceUrl(serviceUrl); } - AsyncHttpConnectorProvider asyncConnectorProvider = new AsyncHttpConnectorProvider(clientConfigData); + AsyncHttpConnectorProvider asyncConnectorProvider = new AsyncHttpConnectorProvider(clientConfigData, + (int) autoCertRefreshTimeUnit.toSeconds(autoCertRefreshTime)); ClientConfig httpConfig = new ClientConfig(); httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true); @@ -200,7 +203,8 @@ public class PulsarAdmin implements Closeable { this.asyncHttpConnector = asyncConnectorProvider.getConnector( Math.toIntExact(connectTimeoutUnit.toMillis(this.connectTimeout)), Math.toIntExact(readTimeoutUnit.toMillis(this.readTimeout)), - Math.toIntExact(requestTimeoutUnit.toMillis(this.requestTimeout))); + Math.toIntExact(requestTimeoutUnit.toMillis(this.requestTimeout)), + (int) autoCertRefreshTimeUnit.toSeconds(autoCertRefreshTime)); long readTimeoutMs = readTimeoutUnit.toMillis(this.readTimeout); this.clusters = new ClustersImpl(root, auth, readTimeoutMs); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java index fc647e0..fda3694 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java @@ -252,6 +252,13 @@ public interface PulsarAdminBuilder { PulsarAdminBuilder requestTimeout(int requestTimeout, TimeUnit requestTimeoutUnit); /** + * This sets auto cert refresh time if Pulsar admin uses tls authentication. + * + * @param autoCertRefreshTime + * @param autoCertRefreshTimeUnit + */ + PulsarAdminBuilder autoCertRefreshTime(int autoCertRefreshTime, TimeUnit autoCertRefreshTimeUnit); + /** * * @return */ diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java index b942cdb..75985c5 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java @@ -35,16 +35,18 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder { private int connectTimeout = PulsarAdmin.DEFAULT_CONNECT_TIMEOUT_SECONDS; private int readTimeout = PulsarAdmin.DEFAULT_READ_TIMEOUT_SECONDS; private int requestTimeout = PulsarAdmin.DEFAULT_REQUEST_TIMEOUT_SECONDS; + private int autoCertRefreshTime = PulsarAdmin.DEFAULT_CERT_REFRESH_SECONDS; private TimeUnit connectTimeoutUnit = TimeUnit.SECONDS; private TimeUnit readTimeoutUnit = TimeUnit.SECONDS; private TimeUnit requestTimeoutUnit = TimeUnit.SECONDS; + private TimeUnit autoCertRefreshTimeUnit = TimeUnit.SECONDS; private ClassLoader clientBuilderClassLoader = null; @Override public PulsarAdmin build() throws PulsarClientException { - return new PulsarAdmin(conf.getServiceUrl(), - conf, connectTimeout, connectTimeoutUnit, readTimeout, readTimeoutUnit, - requestTimeout, requestTimeoutUnit, clientBuilderClassLoader); + return new PulsarAdmin(conf.getServiceUrl(), conf, connectTimeout, connectTimeoutUnit, readTimeout, + readTimeoutUnit, requestTimeout, requestTimeoutUnit, autoCertRefreshTime, + autoCertRefreshTimeUnit, clientBuilderClassLoader); } public PulsarAdminBuilderImpl() { @@ -168,6 +170,13 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder { } @Override + public PulsarAdminBuilder autoCertRefreshTime(int autoCertRefreshTime, TimeUnit autoCertRefreshTimeUnit) { + this.autoCertRefreshTime = autoCertRefreshTime; + this.autoCertRefreshTimeUnit = autoCertRefreshTimeUnit; + return this; + } + + @Override public PulsarAdminBuilder setContextClassLoader(ClassLoader clientBuilderClassLoader) { this.clientBuilderClassLoader = clientBuilderClassLoader; return this; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java index 1ff6e19..bc15f24 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java @@ -80,16 +80,18 @@ public class AsyncHttpConnector implements Connector { private final ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("delayer")); - public AsyncHttpConnector(Client client, ClientConfigurationData conf) { + public AsyncHttpConnector(Client client, ClientConfigurationData conf, int autoCertRefreshTimeSeconds) { this((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT), (int) client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT), PulsarAdmin.DEFAULT_REQUEST_TIMEOUT_SECONDS * 1000, + autoCertRefreshTimeSeconds, conf); } @SneakyThrows public AsyncHttpConnector(int connectTimeoutMs, int readTimeoutMs, - int requestTimeoutMs, ClientConfigurationData conf) { + int requestTimeoutMs, + int autoCertRefreshTimeSeconds, ClientConfigurationData conf) { DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); confBuilder.setFollowRedirect(true); confBuilder.setRequestTimeout(conf.getRequestTimeoutMs()); @@ -136,10 +138,10 @@ public class AsyncHttpConnector implements Connector { SslContext sslCtx = null; if (authData.hasDataForTls()) { sslCtx = authData.getTlsTrustStoreStream() == null - ? SecurityUtility.createNettySslContextForClient( + ? SecurityUtility.createAutoRefreshSslContextForClient( conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(), - conf.getTlsTrustCertsFilePath(), authData.getTlsCertificates(), - authData.getTlsPrivateKey()) + conf.getTlsTrustCertsFilePath(), authData.getTlsCerificateFilePath(), + authData.getTlsPrivateKeyFilePath(), null, autoCertRefreshTimeSeconds, delayer) : SecurityUtility.createNettySslContextForClient( conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(), authData.getTlsTrustStoreStream(), authData.getTlsCertificates(), diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java index 33f2439..c73bdfd 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java @@ -31,21 +31,25 @@ public class AsyncHttpConnectorProvider implements ConnectorProvider { private final ClientConfigurationData conf; private Connector connector; + private final int autoCertRefreshTimeSeconds; - public AsyncHttpConnectorProvider(ClientConfigurationData conf) { + public AsyncHttpConnectorProvider(ClientConfigurationData conf, int autoCertRefreshTimeSeconds) { this.conf = conf; + this.autoCertRefreshTimeSeconds = autoCertRefreshTimeSeconds; } @Override public Connector getConnector(Client client, Configuration runtimeConfig) { if (connector == null) { - connector = new AsyncHttpConnector(client, conf); + connector = new AsyncHttpConnector(client, conf, autoCertRefreshTimeSeconds); } return connector; } - public AsyncHttpConnector getConnector(int connectTimeoutMs, int readTimeoutMs, int requestTimeoutMs) { - return new AsyncHttpConnector(connectTimeoutMs, readTimeoutMs, requestTimeoutMs, conf); + public AsyncHttpConnector getConnector(int connectTimeoutMs, int readTimeoutMs, int requestTimeoutMs, + int autoCertRefreshTimeSeconds) { + return new AsyncHttpConnector(connectTimeoutMs, readTimeoutMs, requestTimeoutMs, autoCertRefreshTimeSeconds, + conf); } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java index 87adba3..4624821 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java @@ -58,6 +58,13 @@ public interface AuthenticationDataProvider extends Serializable { } /** + * @return a client certificate file path + */ + default String getTlsCerificateFilePath() { + return null; + } + + /** * * @return a private key for the client certificate, or null if the data are not available */ @@ -67,6 +74,14 @@ public interface AuthenticationDataProvider extends Serializable { /** * + * @return a private key file path + */ + default String getTlsPrivateKeyFilePath() { + return null; + } + + /** + * * @return an input-stream of the trust store, or null if the trust-store provided at * {@link ClientConfigurationData#getTlsTrustStorePath()} */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java index 588f614..3e9e903 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java @@ -129,5 +129,15 @@ public class AuthenticationDataTls implements AuthenticationDataProvider { return trustStoreStreamProvider != null ? trustStoreStreamProvider.get() : null; } + @Override + public String getTlsCerificateFilePath() { + return certFile != null ? certFile.getFileName() : null; + } + + @Override + public String getTlsPrivateKeyFilePath() { + return keyFile != null ? keyFile.getFileName() : null; + } + private static final Logger LOG = LoggerFactory.getLogger(AuthenticationDataTls.class); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java index d269624..99bdb8e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java @@ -24,12 +24,15 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.attribute.FileTime; import lombok.Getter; +import lombok.ToString; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Class working with file's modified time. */ +@ToString public class FileModifiedTimeUpdater { @Getter String fileName; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java new file mode 100644 index 0000000..b7b5173 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.Principal; +import java.security.PrivateKey; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.X509ExtendedKeyManager; + +import io.netty.handler.ssl.SslContext; +import lombok.extern.slf4j.Slf4j; + +/** + * This class wraps {@link X509ExtendedKeyManager} and gives opportunity to refresh key-manager with refreshed certs + * without changing {@link SslContext}. + */ +@Slf4j +public class KeyManagerProxy extends X509ExtendedKeyManager { + + private static final char[] KEYSTORE_PASSWORD = "secret".toCharArray(); + private volatile X509ExtendedKeyManager keyManager; + private FileModifiedTimeUpdater certFile, keyFile; + + public KeyManagerProxy(String certFilePath, String keyFilePath, int refreshDurationSec, + ScheduledExecutorService executor) { + this.certFile = new FileModifiedTimeUpdater(certFilePath); + this.keyFile = new FileModifiedTimeUpdater(keyFilePath); + try { + updateKeyManager(); + } catch (CertificateException e) { + log.warn("Failed to load cert {}", certFile, e); + throw new IllegalArgumentException(e); + } catch (KeyStoreException e) { + log.warn("Failed to load key {}", keyFile, e); + throw new IllegalArgumentException(e); + } catch (NoSuchAlgorithmException | UnrecoverableKeyException e) { + log.warn("Failed to update key Manager", e); + throw new IllegalArgumentException(e); + } + executor.scheduleWithFixedDelay(() -> updateKeyManagerSafely(), refreshDurationSec, refreshDurationSec, + TimeUnit.SECONDS); + } + + public void updateKeyManagerSafely() { + try { + updateKeyManager(); + } catch (Exception e) { + log.warn("Failed to update key Manager for {}, {}", certFile.getFileName(), keyFile.getFileName(), e); + } + } + + public void updateKeyManager() + throws CertificateException, KeyStoreException, NoSuchAlgorithmException, UnrecoverableKeyException { + if (keyManager != null && !certFile.checkAndRefresh() && !keyFile.checkAndRefresh()) { + return; + } + log.info("refreshing key manager for {} {}", certFile.getFileName(), keyFile.getFileName()); + X509Certificate certificate; + PrivateKey privateKey = null; + KeyStore keyStore; + try (InputStream publicCertStream = new FileInputStream(certFile.getFileName()); + InputStream privateKeyStream = new FileInputStream(keyFile.getFileName())) { + final CertificateFactory cf = CertificateFactory.getInstance("X.509"); + certificate = (X509Certificate) cf.generateCertificate(publicCertStream); + keyStore = KeyStore.getInstance("JKS"); + String alias = certificate.getSubjectX500Principal().getName(); + privateKey = SecurityUtility.loadPrivateKeyFromPemFile(keyFile.getFileName()); + keyStore.load(null); + keyStore.setKeyEntry(alias, privateKey, KEYSTORE_PASSWORD, new X509Certificate[] { certificate }); + } catch (IOException | KeyManagementException e) { + throw new IllegalArgumentException(e); + } + + final KeyManagerFactory keyManagerFactory = KeyManagerFactory + .getInstance(KeyManagerFactory.getDefaultAlgorithm()); + keyManagerFactory.init(keyStore, KEYSTORE_PASSWORD); + this.keyManager = (X509ExtendedKeyManager) keyManagerFactory.getKeyManagers()[0]; + } + + @Override + public String[] getClientAliases(String s, Principal[] principals) { + return keyManager.getClientAliases(s, principals); + } + + @Override + public String chooseClientAlias(String[] strings, Principal[] principals, Socket socket) { + return keyManager.chooseClientAlias(strings, principals, socket); + } + + @Override + public String[] getServerAliases(String s, Principal[] principals) { + return keyManager.getServerAliases(s, principals); + } + + @Override + public String chooseServerAlias(String s, Principal[] principals, Socket socket) { + return keyManager.chooseServerAlias(s, principals, socket); + } + + @Override + public X509Certificate[] getCertificateChain(String s) { + return keyManager.getCertificateChain(s); + } + + @Override + public PrivateKey getPrivateKey(String s) { + return keyManager.getPrivateKey(s); + } + + @Override + public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine) { + return keyManager.chooseEngineClientAlias(keyType, issuers, engine); + } + + @Override + public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine) { + return keyManager.chooseEngineServerAlias(keyType, issuers, engine); + } + +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java index 5d0b896..8438f13 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java @@ -49,6 +49,8 @@ import java.security.spec.PKCS8EncodedKeySpec; import java.util.Base64; import java.util.Collection; import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; + import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; @@ -72,6 +74,7 @@ public class SecurityUtility { // also used to get Factories. e.g. CertificateFactory.getInstance("X.509", "BCFIPS") public static final String BC_FIPS = "BCFIPS"; public static final String BC = "BC"; + private static final String SSLCONTEXT_ALGORITHM = "TLSv1.2"; public static boolean isBCFIPS() { return BC_PROVIDER.getClass().getCanonicalName().equals(BC_FIPS_PROVIDER_CLASS); @@ -148,6 +151,37 @@ public class SecurityUtility { return createSslContext(allowInsecureConnection, trustCertificates, certificates, privateKey); } + /** + * Creates {@link SslContext} with capability to do auto-cert refresh. + * @param allowInsecureConnection + * @param trustCertsFilePath + * @param certFilePath + * @param keyFilePath + * @param sslContextAlgorithm + * @param refreshDurationSec + * @param executor + * @return + * @throws GeneralSecurityException + * @throws SSLException + * @throws FileNotFoundException + * @throws IOException + */ + public static SslContext createAutoRefreshSslContextForClient(boolean allowInsecureConnection, + String trustCertsFilePath, String certFilePath, String keyFilePath, String sslContextAlgorithm, + int refreshDurationSec, ScheduledExecutorService executor) + throws GeneralSecurityException, SSLException, FileNotFoundException, IOException { + KeyManagerProxy keyManager = new KeyManagerProxy(certFilePath, keyFilePath, refreshDurationSec, executor); + SslContextBuilder sslContexBuilder = SslContextBuilder.forClient(); + sslContexBuilder.keyManager(keyManager); + if (allowInsecureConnection) { + sslContexBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE); + } else { + TrustManagerProxy trustManager = new TrustManagerProxy(trustCertsFilePath, refreshDurationSec, executor); + sslContexBuilder.trustManager(trustManager); + } + return sslContexBuilder.build(); + } + public static SslContext createNettySslContextForClient(boolean allowInsecureConnection, String trustCertsFilePath, String certFilePath, String keyFilePath) throws GeneralSecurityException, SSLException, FileNotFoundException, IOException { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/TrustManagerProxy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/TrustManagerProxy.java new file mode 100644 index 0000000..7edbbb4 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/TrustManagerProxy.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.X509ExtendedTrustManager; + +import io.netty.handler.ssl.SslContext; +import lombok.extern.slf4j.Slf4j; + +/** + * This class wraps {@link X509ExtendedTrustManager} and gives opportunity to refresh Trust-manager with refreshed certs + * without changing {@link SslContext}. + */ +@Slf4j +public class TrustManagerProxy extends X509ExtendedTrustManager { + + private volatile X509ExtendedTrustManager trustManager; + private FileModifiedTimeUpdater certFile; + + public TrustManagerProxy(String caCertFile, int refreshDurationSec, ScheduledExecutorService executor) { + this.certFile = new FileModifiedTimeUpdater(caCertFile); + try { + updateTrustManager(); + } catch (IOException | CertificateException e) { + log.warn("Failed to load cert {}, {}", certFile, e.getMessage()); + throw new IllegalArgumentException(e); + } catch (NoSuchAlgorithmException | KeyStoreException e) { + log.warn("Failed to init trust-store", e); + throw new IllegalArgumentException(e); + } + executor.scheduleWithFixedDelay(() -> updateTrustManagerSafely(), refreshDurationSec, refreshDurationSec, + TimeUnit.SECONDS); + } + + private void updateTrustManagerSafely() { + try { + updateTrustManager(); + } catch (Exception e) { + log.warn("Failed to init trust-store {}", certFile.getFileName(), e); + } + } + + private void updateTrustManager() throws CertificateException, KeyStoreException, NoSuchAlgorithmException, + FileNotFoundException, IOException { + CertificateFactory factory = CertificateFactory.getInstance("X.509"); + try (InputStream inputStream = new FileInputStream(certFile.getFileName())) { + X509Certificate certificate = (X509Certificate) factory.generateCertificate(inputStream); + String alias = certificate.getSubjectX500Principal().getName(); + KeyStore keyStore = KeyStore.getInstance("JKS"); + keyStore.load(null); + keyStore.setCertificateEntry(alias, certificate); + final TrustManagerFactory trustManagerFactory = TrustManagerFactory + .getInstance(TrustManagerFactory.getDefaultAlgorithm()); + trustManagerFactory.init(keyStore); + trustManager = (X509ExtendedTrustManager) trustManagerFactory.getTrustManagers()[0]; + } + } + + @Override + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + trustManager.checkClientTrusted(x509Certificates, s); + } + + @Override + public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + trustManager.checkServerTrusted(x509Certificates, s); + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return trustManager.getAcceptedIssuers(); + } + + @Override + public void checkClientTrusted(X509Certificate[] chain, String authType, Socket socket) + throws CertificateException { + trustManager.checkClientTrusted(chain, authType, socket); + } + + @Override + public void checkClientTrusted(X509Certificate[] chain, String authType, SSLEngine engine) + throws CertificateException { + trustManager.checkClientTrusted(chain, authType, engine); + } + + @Override + public void checkServerTrusted(X509Certificate[] chain, String authType, Socket socket) + throws CertificateException { + trustManager.checkServerTrusted(chain, authType, socket); + } + + @Override + public void checkServerTrusted(X509Certificate[] chain, String authType, SSLEngine engine) + throws CertificateException { + trustManager.checkServerTrusted(chain, authType, engine); + } +}