This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new cdfe50fbedc [fix][client] Fix compatibility between kerberos and tls
(#23798)
cdfe50fbedc is described below
commit cdfe50fbedcb11320c114e0f9cd068bc59844fb4
Author: Zixuan Liu <[email protected]>
AuthorDate: Thu Jan 2 16:16:08 2025 +0800
[fix][client] Fix compatibility between kerberos and tls (#23798)
Signed-off-by: Zixuan Liu <[email protected]>
(cherry picked from commit fd4502905089272882e7c0bda494170dafd60bda)
---
.../pulsar/client/api/TlsProducerConsumerTest.java | 69 +++++++++++++++++++++
.../admin/internal/http/AsyncHttpConnector.java | 7 ++-
.../apache/pulsar/client/api/Authentication.java | 1 +
.../org/apache/pulsar/client/cli/CmdConsume.java | 2 +-
.../org/apache/pulsar/client/cli/CmdProduce.java | 2 +-
.../java/org/apache/pulsar/client/cli/CmdRead.java | 2 +-
.../org/apache/pulsar/client/impl/HttpClient.java | 7 ++-
.../client/impl/PulsarChannelInitializer.java | 70 +++++++++++++---------
.../client/impl/ClientInitializationTest.java | 2 +-
.../pulsar/proxy/server/AdminProxyHandler.java | 32 ++++++----
.../pulsar/proxy/server/DirectProxyHandler.java | 64 +++++++++++---------
.../proxy/server/ProxyServiceTlsStarterTest.java | 2 +
.../proxy/socket/client/PerformanceClient.java | 2 +-
13 files changed, 182 insertions(+), 80 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
index 44af37ca90f..98b917330ff 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
@@ -18,18 +18,24 @@
*/
package org.apache.pulsar.client.api;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import lombok.Cleanup;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -295,4 +301,67 @@ public class TlsProducerConsumerTest extends
TlsProducerConsumerBase {
@Cleanup
Producer<byte[]> ignored =
client.newProducer().topic(topicName).create();
}
+
+ @Test
+ public void testTlsWithFakeAuthentication() throws Exception {
+ Authentication authentication = spy(new Authentication() {
+ @Override
+ public String getAuthMethodName() {
+ return "fake";
+ }
+
+ @Override
+ public void configure(Map<String, String> authParams) {
+
+ }
+
+ @Override
+ public void start() {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public AuthenticationDataProvider getAuthData(String
brokerHostName) {
+ return mock(AuthenticationDataProvider.class);
+ }
+ });
+
+ @Cleanup
+ PulsarAdmin pulsarAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsar().getWebServiceAddressTls())
+ .tlsTrustCertsFilePath(CA_CERT_FILE_PATH)
+ .allowTlsInsecureConnection(false)
+ .enableTlsHostnameVerification(false)
+ .tlsKeyFilePath(getTlsFileForClient("admin.key-pk8"))
+ .tlsCertificateFilePath(getTlsFileForClient("admin.cert"))
+ .authentication(authentication)
+ .build();
+ pulsarAdmin.tenants().getTenants();
+ verify(authentication, never()).getAuthData();
+
+ @Cleanup
+ PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl(getPulsar().getBrokerServiceUrlTls())
+ .tlsTrustCertsFilePath(CA_CERT_FILE_PATH)
+ .allowTlsInsecureConnection(false)
+ .enableTlsHostnameVerification(false)
+ .tlsKeyFilePath(getTlsFileForClient("admin.key-pk8"))
+ .tlsCertificateFilePath(getTlsFileForClient("admin.cert"))
+ .authentication(authentication).build();
+ verify(authentication, never()).getAuthData();
+
+ final String topicName = "persistent://my-property/my-ns/my-topic-1";
+ internalSetUpForNamespace();
+ @Cleanup
+ Consumer<byte[]> ignoredConsumer =
+
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name").subscribe();
+ verify(authentication, never()).getAuthData();
+ @Cleanup
+ Producer<byte[]> ignoredProducer =
pulsarClient.newProducer().topic(topicName).create();
+ verify(authentication, never()).getAuthData();
+ }
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index 64ba0e99cb9..fb11d9e46d3 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -191,7 +191,8 @@ public class AsyncHttpConnector implements Connector,
AsyncHttpRequestExecutor {
// Set client key and certificate if available
sslRefresher = Executors.newScheduledThreadPool(1,
new DefaultThreadFactory("pulsar-admin-ssl-refresher"));
- PulsarSslConfiguration sslConfiguration = buildSslConfiguration(conf);
+ PulsarSslConfiguration sslConfiguration = buildSslConfiguration(conf,
serviceNameResolver
+ .resolveHostUri().getHost());
this.sslFactory = (PulsarSslFactory)
Class.forName(conf.getSslFactoryPlugin())
.getConstructor().newInstance();
this.sslFactory.initialize(sslConfiguration);
@@ -519,7 +520,7 @@ public class AsyncHttpConnector implements Connector,
AsyncHttpRequestExecutor {
}
}
- protected PulsarSslConfiguration
buildSslConfiguration(ClientConfigurationData conf)
+ protected PulsarSslConfiguration
buildSslConfiguration(ClientConfigurationData conf, String host)
throws PulsarClientException {
return PulsarSslConfiguration.builder()
.tlsProvider(conf.getSslProvider())
@@ -537,7 +538,7 @@ public class AsyncHttpConnector implements Connector,
AsyncHttpRequestExecutor {
.allowInsecureConnection(conf.isTlsAllowInsecureConnection())
.requireTrustedClientCertOnConnect(false)
.tlsEnabledWithKeystore(conf.isUseKeyStoreTls())
- .authData(conf.getAuthentication().getAuthData())
+ .authData(conf.getAuthentication().getAuthData(host))
.tlsCustomParams(conf.getSslFactoryPluginParams())
.serverMode(false)
.isHttps(true)
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
index 9bf1b24cbdb..48d9e3e2307 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
@@ -48,6 +48,7 @@ public interface Authentication extends Closeable,
Serializable {
* @throws PulsarClientException
* any other error
*/
+ @Deprecated
default AuthenticationDataProvider getAuthData() throws
PulsarClientException {
throw new UnsupportedAuthenticationException("Method not
implemented!");
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index 0f0e2f0a9c8..98ca9bc8149 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -260,7 +260,7 @@ public class CmdConsume extends AbstractCmdConsume {
try {
if (authentication != null) {
authentication.start();
- AuthenticationDataProvider authData =
authentication.getAuthData();
+ AuthenticationDataProvider authData =
authentication.getAuthData(consumerUri.getHost());
if (authData.hasDataForHttp()) {
for (Map.Entry<String, String> kv :
authData.getHttpHeaders()) {
consumeRequest.setHeader(kv.getKey(), kv.getValue());
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index e5a88366021..01ac1df333d 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -469,7 +469,7 @@ public class CmdProduce extends AbstractCmd {
try {
if (authentication != null) {
authentication.start();
- AuthenticationDataProvider authData =
authentication.getAuthData();
+ AuthenticationDataProvider authData =
authentication.getAuthData(produceUri.getHost());
if (authData.hasDataForHttp()) {
for (Map.Entry<String, String> kv :
authData.getHttpHeaders()) {
produceRequest.setHeader(kv.getKey(), kv.getValue());
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
index 529d1d9c412..d0e20dfa570 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
@@ -244,7 +244,7 @@ public class CmdRead extends AbstractCmdConsume {
try {
if (authentication != null) {
authentication.start();
- AuthenticationDataProvider authData =
authentication.getAuthData();
+ AuthenticationDataProvider authData =
authentication.getAuthData(readerUri.getHost());
if (authData.hasDataForHttp()) {
for (Map.Entry<String, String> kv :
authData.getHttpHeaders()) {
readRequest.setHeader(kv.getKey(), kv.getValue());
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index c53f79af12e..8e448d801fa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -97,7 +97,8 @@ public class HttpClient implements Closeable {
this.executorService = Executors
.newSingleThreadScheduledExecutor(new ExecutorProvider
.ExtendedThreadFactory("httpclient-ssl-refresh"));
- PulsarSslConfiguration sslConfiguration =
buildSslConfiguration(conf);
+ PulsarSslConfiguration sslConfiguration =
+ buildSslConfiguration(conf,
serviceNameResolver.resolveHostUri().getHost());
this.sslFactory = (PulsarSslFactory)
Class.forName(conf.getSslFactoryPlugin())
.getConstructor().newInstance();
this.sslFactory.initialize(sslConfiguration);
@@ -233,7 +234,7 @@ public class HttpClient implements Closeable {
return future;
}
- protected PulsarSslConfiguration
buildSslConfiguration(ClientConfigurationData config)
+ protected PulsarSslConfiguration
buildSslConfiguration(ClientConfigurationData config, String host)
throws PulsarClientException {
return PulsarSslConfiguration.builder()
.tlsProvider(config.getSslProvider())
@@ -252,7 +253,7 @@ public class HttpClient implements Closeable {
.requireTrustedClientCertOnConnect(false)
.tlsEnabledWithKeystore(config.isUseKeyStoreTls())
.tlsCustomParams(config.getSslFactoryPluginParams())
- .authData(config.getAuthentication().getAuthData())
+ .authData(config.getAuthentication().getAuthData(host))
.serverMode(false)
.isHttps(true)
.build();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index 5097c34e0b2..b20833e46a2 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -27,8 +27,10 @@ import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.SslHandler;
import java.net.InetSocketAddress;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -55,8 +57,8 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
private final InetSocketAddress socks5ProxyAddress;
private final String socks5ProxyUsername;
private final String socks5ProxyPassword;
-
- private final PulsarSslFactory pulsarSslFactory;
+ private final ClientConfigurationData conf;
+ private final Map<String, PulsarSslFactory> pulsarSslFactoryMap;
private static final long TLS_CERTIFICATE_CACHE_MILLIS =
TimeUnit.MINUTES.toMillis(1);
@@ -69,26 +71,17 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
this.socks5ProxyAddress = conf.getSocks5ProxyAddress();
this.socks5ProxyUsername = conf.getSocks5ProxyUsername();
this.socks5ProxyPassword = conf.getSocks5ProxyPassword();
-
+ this.conf = conf.clone();
if (tlsEnabled) {
- this.pulsarSslFactory = (PulsarSslFactory)
Class.forName(conf.getSslFactoryPlugin())
- .getConstructor().newInstance();
- try {
- PulsarSslConfiguration sslConfiguration =
buildSslConfiguration(conf);
- this.pulsarSslFactory.initialize(sslConfiguration);
- this.pulsarSslFactory.createInternalSslContext();
- } catch (Exception e) {
- log.error("Unable to initialize and create the ssl context",
e);
- }
+ this.pulsarSslFactoryMap = new ConcurrentHashMap<>();
if (scheduledExecutorService != null &&
conf.getAutoCertRefreshSeconds() > 0) {
scheduledExecutorService.scheduleWithFixedDelay(() ->
this.refreshSslContext(conf),
conf.getAutoCertRefreshSeconds(),
conf.getAutoCertRefreshSeconds(),
TimeUnit.SECONDS);
}
-
} else {
- pulsarSslFactory = null;
+ this.pulsarSslFactoryMap = null;
}
}
@@ -123,6 +116,23 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
CompletableFuture<Channel> initTlsFuture = new CompletableFuture<>();
ch.eventLoop().execute(() -> {
try {
+ PulsarSslFactory pulsarSslFactory =
pulsarSslFactoryMap.computeIfAbsent(sniHost.getHostName(), key -> {
+ try {
+ PulsarSslFactory factory = (PulsarSslFactory)
Class.forName(conf.getSslFactoryPlugin())
+ .getConstructor().newInstance();
+ PulsarSslConfiguration sslConfiguration =
buildSslConfiguration(conf, key);
+ factory.initialize(sslConfiguration);
+ factory.createInternalSslContext();
+ return factory;
+ } catch (Exception e) {
+ log.error("Unable to initialize and create the ssl
context", e);
+ initTlsFuture.completeExceptionally(e);
+ return null;
+ }
+ });
+ if (pulsarSslFactory == null) {
+ return;
+ }
SslHandler handler = new SslHandler(pulsarSslFactory
.createClientSslEngine(ch.alloc(),
sniHost.getHostName(), sniHost.getPort()));
@@ -181,7 +191,9 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
return ch;
}));
}
- protected PulsarSslConfiguration
buildSslConfiguration(ClientConfigurationData config)
+
+protected PulsarSslConfiguration buildSslConfiguration(ClientConfigurationData
config,
+ String host)
throws PulsarClientException {
return PulsarSslConfiguration.builder()
.tlsProvider(config.getSslProvider())
@@ -200,28 +212,30 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
.requireTrustedClientCertOnConnect(false)
.tlsEnabledWithKeystore(config.isUseKeyStoreTls())
.tlsCustomParams(config.getSslFactoryPluginParams())
- .authData(config.getAuthentication().getAuthData())
+ .authData(config.getAuthentication().getAuthData(host))
.serverMode(false)
.build();
}
protected void refreshSslContext(ClientConfigurationData conf) {
- try {
+ pulsarSslFactoryMap.forEach((key, pulsarSslFactory) -> {
try {
- if (conf.isUseKeyStoreTls()) {
- this.pulsarSslFactory.getInternalSslContext();
- } else {
- this.pulsarSslFactory.getInternalNettySslContext();
+ try {
+ if (conf.isUseKeyStoreTls()) {
+ pulsarSslFactory.getInternalSslContext();
+ } else {
+ pulsarSslFactory.getInternalNettySslContext();
+ }
+ } catch (Exception e) {
+ log.error("SSL Context is not initialized", e);
+ PulsarSslConfiguration sslConfiguration =
buildSslConfiguration(conf, key);
+ pulsarSslFactory.initialize(sslConfiguration);
}
+ pulsarSslFactory.update();
} catch (Exception e) {
- log.error("SSL Context is not initialized", e);
- PulsarSslConfiguration sslConfiguration =
buildSslConfiguration(conf);
- this.pulsarSslFactory.initialize(sslConfiguration);
+ log.error("Failed to refresh SSL context", e);
}
- this.pulsarSslFactory.update();
- } catch (Exception e) {
- log.error("Failed to refresh SSL context", e);
- }
+ });
}
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
index 2682d011cd0..f7ff30c286c 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
@@ -41,6 +41,6 @@ public class ClientInitializationTest {
.build();
verify(auth).start();
- verify(auth, times(1)).getAuthData();
+ verify(auth, times(0)).getAuthData();
}
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index 54b6db5198c..8d0c2c3ae39 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -38,6 +38,7 @@ import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -290,6 +291,19 @@ class AdminProxyHandler extends ProxyServlet {
return new JettyHttpClient();
}
+ private String getWebServiceUrl() throws PulsarServerException {
+ if (isBlank(brokerWebServiceUrl)) {
+ ServiceLookupData availableBroker = discoveryProvider.nextBroker();
+ if (config.isTlsEnabledWithBroker()) {
+ return availableBroker.getWebServiceUrlTls();
+ } else {
+ return availableBroker.getWebServiceUrl();
+ }
+ } else {
+ return brokerWebServiceUrl;
+ }
+ }
+
@Override
protected String rewriteTarget(HttpServletRequest request) {
StringBuilder url = new StringBuilder();
@@ -305,17 +319,10 @@ class AdminProxyHandler extends ProxyServlet {
if (isFunctionsRestRequest && !isBlank(functionWorkerWebServiceUrl)) {
url.append(functionWorkerWebServiceUrl);
- } else if (isBlank(brokerWebServiceUrl)) {
+ } else {
try {
- ServiceLookupData availableBroker =
discoveryProvider.nextBroker();
-
- if (config.isTlsEnabledWithBroker()) {
- url.append(availableBroker.getWebServiceUrlTls());
- } else {
- url.append(availableBroker.getWebServiceUrl());
- }
-
- if (LOG.isDebugEnabled()) {
+ url.append(getWebServiceUrl());
+ if (LOG.isDebugEnabled() && isBlank(brokerWebServiceUrl)) {
LOG.debug("[{}:{}] Selected active broker is {}",
request.getRemoteAddr(), request.getRemotePort(),
url);
}
@@ -324,8 +331,6 @@ class AdminProxyHandler extends ProxyServlet {
request.getRemotePort(), e.getMessage(), e);
return null;
}
- } else {
- url.append(brokerWebServiceUrl);
}
if (url.lastIndexOf("/") == url.length() - 1) {
@@ -398,7 +403,8 @@ class AdminProxyHandler extends ProxyServlet {
protected PulsarSslFactory createPulsarSslFactory() {
try {
try {
- AuthenticationDataProvider authData =
proxyClientAuthentication.getAuthData();
+ AuthenticationDataProvider authData =
+
proxyClientAuthentication.getAuthData(URI.create(getWebServiceUrl()).getHost());
PulsarSslConfiguration pulsarSslConfiguration =
buildSslConfiguration(authData);
PulsarSslFactory sslFactory =
(PulsarSslFactory)
Class.forName(config.getBrokerClientSslFactoryPlugin())
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 407c93074a0..681aa553c48 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -45,6 +45,8 @@ import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.SneakyThrows;
@@ -87,7 +89,7 @@ public class DirectProxyHandler {
private final Runnable onHandshakeCompleteAction;
private final boolean tlsHostnameVerificationEnabled;
final boolean tlsEnabledWithBroker;
- private PulsarSslFactory sslFactory;
+ private Map<String, PulsarSslFactory> pulsarSslFactoryMap;
@SneakyThrows
public DirectProxyHandler(ProxyService service, ProxyConnection
proxyConnection) {
@@ -102,27 +104,42 @@ public class DirectProxyHandler {
this.tlsEnabledWithBroker =
service.getConfiguration().isTlsEnabledWithBroker();
this.tlsHostnameVerificationEnabled =
service.getConfiguration().isTlsHostnameVerificationEnabled();
this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask;
- ProxyConfiguration config = service.getConfiguration();
-
- if (tlsEnabledWithBroker) {
- AuthenticationDataProvider authData = null;
-
- if (!isEmpty(config.getBrokerClientAuthenticationPlugin())) {
- try {
- authData = authentication.getAuthData();
- } catch (PulsarClientException e) {
- throw new RuntimeException(e);
- }
- }
- PulsarSslConfiguration sslConfiguration =
buildSslConfiguration(config, authData);
- this.sslFactory = (PulsarSslFactory)
Class.forName(config.getSslFactoryPlugin())
- .getConstructor().newInstance();
- this.sslFactory.initialize(sslConfiguration);
- this.sslFactory.createInternalSslContext();
- }
+ this.pulsarSslFactoryMap = new ConcurrentHashMap<>();
}
public void connect(String brokerHostAndPort, InetSocketAddress
targetBrokerAddress, int protocolVersion) {
+ String remoteHost;
+ try {
+ remoteHost = parseHost(brokerHostAndPort);
+ } catch (IllegalArgumentException e) {
+ log.warn("[{}] Failed to parse broker host '{}'", inboundChannel,
brokerHostAndPort, e);
+ inboundChannel.close();
+ return;
+ }
+ PulsarSslFactory sslFactory =
+ tlsEnabledWithBroker ?
pulsarSslFactoryMap.computeIfAbsent(remoteHost, (hostname) -> {
+ AuthenticationDataProvider authData = null;
+
+ if
(!isEmpty(service.getConfiguration().getBrokerClientAuthenticationPlugin())) {
+ try {
+ authData = authentication.getAuthData(remoteHost);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ PulsarSslConfiguration sslConfiguration =
+ buildSslConfiguration(service.getConfiguration(),
authData);
+ try {
+ PulsarSslFactory factory =
+ (PulsarSslFactory)
Class.forName(service.getConfiguration().getSslFactoryPlugin())
+ .getConstructor().newInstance();
+ factory.initialize(sslConfiguration);
+ factory.createInternalSslContext();
+ return factory;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }) : null;
ProxyConfiguration config = service.getConfiguration();
// Start the connection attempt.
@@ -142,15 +159,6 @@ public class DirectProxyHandler {
b.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
}
- String remoteHost;
- try {
- remoteHost = parseHost(brokerHostAndPort);
- } catch (IllegalArgumentException e) {
- log.warn("[{}] Failed to parse broker host '{}'", inboundChannel,
brokerHostAndPort, e);
- inboundChannel.close();
- return;
- }
-
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
index 1148234be62..ee8ae8d4afb 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
@@ -59,6 +59,7 @@ public class ProxyServiceTlsStarterTest extends
MockedPulsarServiceBaseTest {
serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
serviceStarter.getConfig().setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
+
serviceStarter.getConfig().setBrokerWebServiceURLTLS(pulsar.getWebServiceAddressTls());
serviceStarter.getConfig().setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
serviceStarter.getConfig().setBrokerClientCertificateFilePath(BROKER_CERT_FILE_PATH);
serviceStarter.getConfig().setBrokerClientKeyFilePath(BROKER_KEY_FILE_PATH);
@@ -79,6 +80,7 @@ public class ProxyServiceTlsStarterTest extends
MockedPulsarServiceBaseTest {
protected void doInitConf() throws Exception {
super.doInitConf();
this.conf.setBrokerServicePortTls(Optional.of(0));
+ this.conf.setWebServicePortTls(Optional.of(0));
this.conf.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
this.conf.setTlsKeyFilePath(PROXY_KEY_FILE_PATH);
}
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
index 4d73fd9f9b4..4fabf6d2185 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
@@ -251,7 +251,7 @@ public class PerformanceClient extends CmdBase {
Authentication auth =
AuthenticationFactory.create(this.authPluginClassName,
this.authParams);
auth.start();
- AuthenticationDataProvider authData = auth.getAuthData();
+ AuthenticationDataProvider authData =
auth.getAuthData(produceUri.getHost());
if (authData.hasDataForHttp()) {
for (Map.Entry<String, String> kv :
authData.getHttpHeaders()) {
produceRequest.setHeader(kv.getKey(),
kv.getValue());