This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fd450290508 [fix][client] Fix compatibility between kerberos and tls 
(#23798)
fd450290508 is described below

commit fd4502905089272882e7c0bda494170dafd60bda
Author: Zixuan Liu <[email protected]>
AuthorDate: Thu Jan 2 16:16:08 2025 +0800

    [fix][client] Fix compatibility between kerberos and tls (#23798)
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 .../pulsar/client/api/TlsProducerConsumerTest.java | 69 +++++++++++++++++++++
 .../admin/internal/http/AsyncHttpConnector.java    |  7 ++-
 .../apache/pulsar/client/api/Authentication.java   |  1 +
 .../org/apache/pulsar/client/cli/CmdConsume.java   |  2 +-
 .../org/apache/pulsar/client/cli/CmdProduce.java   |  2 +-
 .../java/org/apache/pulsar/client/cli/CmdRead.java |  2 +-
 .../org/apache/pulsar/client/impl/HttpClient.java  |  7 ++-
 .../client/impl/PulsarChannelInitializer.java      | 70 +++++++++++++---------
 .../client/impl/ClientInitializationTest.java      |  2 +-
 .../pulsar/proxy/server/AdminProxyHandler.java     | 32 ++++++----
 .../pulsar/proxy/server/DirectProxyHandler.java    | 64 +++++++++++---------
 .../proxy/server/ProxyServiceTlsStarterTest.java   |  2 +
 .../proxy/socket/client/PerformanceClient.java     |  2 +-
 13 files changed, 182 insertions(+), 80 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
index 44af37ca90f..98b917330ff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
@@ -18,18 +18,24 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import lombok.Cleanup;
 import org.apache.commons.compress.utils.IOUtils;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -295,4 +301,67 @@ public class TlsProducerConsumerTest extends 
TlsProducerConsumerBase {
         @Cleanup
         Producer<byte[]> ignored = 
client.newProducer().topic(topicName).create();
     }
+
+    @Test
+    public void testTlsWithFakeAuthentication() throws Exception {
+        Authentication authentication = spy(new Authentication() {
+            @Override
+            public String getAuthMethodName() {
+                return "fake";
+            }
+
+            @Override
+            public void configure(Map<String, String> authParams) {
+
+            }
+
+            @Override
+            public void start() {
+
+            }
+
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public AuthenticationDataProvider getAuthData(String 
brokerHostName) {
+                return mock(AuthenticationDataProvider.class);
+            }
+        });
+
+        @Cleanup
+        PulsarAdmin pulsarAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsar().getWebServiceAddressTls())
+                .tlsTrustCertsFilePath(CA_CERT_FILE_PATH)
+                .allowTlsInsecureConnection(false)
+                .enableTlsHostnameVerification(false)
+                .tlsKeyFilePath(getTlsFileForClient("admin.key-pk8"))
+                .tlsCertificateFilePath(getTlsFileForClient("admin.cert"))
+                .authentication(authentication)
+                .build();
+        pulsarAdmin.tenants().getTenants();
+        verify(authentication, never()).getAuthData();
+
+        @Cleanup
+        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(getPulsar().getBrokerServiceUrlTls())
+                .tlsTrustCertsFilePath(CA_CERT_FILE_PATH)
+                .allowTlsInsecureConnection(false)
+                .enableTlsHostnameVerification(false)
+                .tlsKeyFilePath(getTlsFileForClient("admin.key-pk8"))
+                .tlsCertificateFilePath(getTlsFileForClient("admin.cert"))
+                .authentication(authentication).build();
+        verify(authentication, never()).getAuthData();
+
+        final String topicName = "persistent://my-property/my-ns/my-topic-1";
+        internalSetUpForNamespace();
+        @Cleanup
+        Consumer<byte[]> ignoredConsumer =
+                
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name").subscribe();
+        verify(authentication, never()).getAuthData();
+        @Cleanup
+        Producer<byte[]> ignoredProducer = 
pulsarClient.newProducer().topic(topicName).create();
+        verify(authentication, never()).getAuthData();
+    }
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index 64ba0e99cb9..fb11d9e46d3 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -191,7 +191,8 @@ public class AsyncHttpConnector implements Connector, 
AsyncHttpRequestExecutor {
         // Set client key and certificate if available
         sslRefresher = Executors.newScheduledThreadPool(1,
                 new DefaultThreadFactory("pulsar-admin-ssl-refresher"));
-        PulsarSslConfiguration sslConfiguration = buildSslConfiguration(conf);
+        PulsarSslConfiguration sslConfiguration = buildSslConfiguration(conf, 
serviceNameResolver
+                .resolveHostUri().getHost());
         this.sslFactory = (PulsarSslFactory) 
Class.forName(conf.getSslFactoryPlugin())
                 .getConstructor().newInstance();
         this.sslFactory.initialize(sslConfiguration);
@@ -519,7 +520,7 @@ public class AsyncHttpConnector implements Connector, 
AsyncHttpRequestExecutor {
         }
     }
 
-    protected PulsarSslConfiguration 
buildSslConfiguration(ClientConfigurationData conf)
+    protected PulsarSslConfiguration 
buildSslConfiguration(ClientConfigurationData conf, String host)
             throws PulsarClientException {
         return PulsarSslConfiguration.builder()
                 .tlsProvider(conf.getSslProvider())
@@ -537,7 +538,7 @@ public class AsyncHttpConnector implements Connector, 
AsyncHttpRequestExecutor {
                 .allowInsecureConnection(conf.isTlsAllowInsecureConnection())
                 .requireTrustedClientCertOnConnect(false)
                 .tlsEnabledWithKeystore(conf.isUseKeyStoreTls())
-                .authData(conf.getAuthentication().getAuthData())
+                .authData(conf.getAuthentication().getAuthData(host))
                 .tlsCustomParams(conf.getSslFactoryPluginParams())
                 .serverMode(false)
                 .isHttps(true)
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
index 9bf1b24cbdb..48d9e3e2307 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java
@@ -48,6 +48,7 @@ public interface Authentication extends Closeable, 
Serializable {
      * @throws PulsarClientException
      *             any other error
      */
+    @Deprecated
     default AuthenticationDataProvider getAuthData() throws 
PulsarClientException {
         throw new UnsupportedAuthenticationException("Method not 
implemented!");
     }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index 0f0e2f0a9c8..98ca9bc8149 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -260,7 +260,7 @@ public class CmdConsume extends AbstractCmdConsume {
         try {
             if (authentication != null) {
                 authentication.start();
-                AuthenticationDataProvider authData = 
authentication.getAuthData();
+                AuthenticationDataProvider authData = 
authentication.getAuthData(consumerUri.getHost());
                 if (authData.hasDataForHttp()) {
                     for (Map.Entry<String, String> kv : 
authData.getHttpHeaders()) {
                         consumeRequest.setHeader(kv.getKey(), kv.getValue());
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index e5a88366021..01ac1df333d 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -469,7 +469,7 @@ public class CmdProduce extends AbstractCmd {
         try {
             if (authentication != null) {
                 authentication.start();
-                AuthenticationDataProvider authData = 
authentication.getAuthData();
+                AuthenticationDataProvider authData = 
authentication.getAuthData(produceUri.getHost());
                 if (authData.hasDataForHttp()) {
                     for (Map.Entry<String, String> kv : 
authData.getHttpHeaders()) {
                         produceRequest.setHeader(kv.getKey(), kv.getValue());
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
index 529d1d9c412..d0e20dfa570 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
@@ -244,7 +244,7 @@ public class CmdRead extends AbstractCmdConsume {
         try {
             if (authentication != null) {
                 authentication.start();
-                AuthenticationDataProvider authData = 
authentication.getAuthData();
+                AuthenticationDataProvider authData = 
authentication.getAuthData(readerUri.getHost());
                 if (authData.hasDataForHttp()) {
                     for (Map.Entry<String, String> kv : 
authData.getHttpHeaders()) {
                         readRequest.setHeader(kv.getKey(), kv.getValue());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index c53f79af12e..8e448d801fa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -97,7 +97,8 @@ public class HttpClient implements Closeable {
                 this.executorService = Executors
                         .newSingleThreadScheduledExecutor(new ExecutorProvider
                                 
.ExtendedThreadFactory("httpclient-ssl-refresh"));
-                PulsarSslConfiguration sslConfiguration = 
buildSslConfiguration(conf);
+                PulsarSslConfiguration sslConfiguration =
+                        buildSslConfiguration(conf, 
serviceNameResolver.resolveHostUri().getHost());
                 this.sslFactory = (PulsarSslFactory) 
Class.forName(conf.getSslFactoryPlugin())
                         .getConstructor().newInstance();
                 this.sslFactory.initialize(sslConfiguration);
@@ -233,7 +234,7 @@ public class HttpClient implements Closeable {
         return future;
     }
 
-    protected PulsarSslConfiguration 
buildSslConfiguration(ClientConfigurationData config)
+    protected PulsarSslConfiguration 
buildSslConfiguration(ClientConfigurationData config, String host)
             throws PulsarClientException {
         return PulsarSslConfiguration.builder()
                 .tlsProvider(config.getSslProvider())
@@ -252,7 +253,7 @@ public class HttpClient implements Closeable {
                 .requireTrustedClientCertOnConnect(false)
                 .tlsEnabledWithKeystore(config.isUseKeyStoreTls())
                 .tlsCustomParams(config.getSslFactoryPluginParams())
-                .authData(config.getAuthentication().getAuthData())
+                .authData(config.getAuthentication().getAuthData(host))
                 .serverMode(false)
                 .isHttps(true)
                 .build();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index 5097c34e0b2..b20833e46a2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -27,8 +27,10 @@ import io.netty.handler.flush.FlushConsolidationHandler;
 import io.netty.handler.proxy.Socks5ProxyHandler;
 import io.netty.handler.ssl.SslHandler;
 import java.net.InetSocketAddress;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
@@ -55,8 +57,8 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
     private final InetSocketAddress socks5ProxyAddress;
     private final String socks5ProxyUsername;
     private final String socks5ProxyPassword;
-
-    private final PulsarSslFactory pulsarSslFactory;
+    private final ClientConfigurationData conf;
+    private final Map<String, PulsarSslFactory> pulsarSslFactoryMap;
 
     private static final long TLS_CERTIFICATE_CACHE_MILLIS = 
TimeUnit.MINUTES.toMillis(1);
 
@@ -69,26 +71,17 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
         this.socks5ProxyAddress = conf.getSocks5ProxyAddress();
         this.socks5ProxyUsername = conf.getSocks5ProxyUsername();
         this.socks5ProxyPassword = conf.getSocks5ProxyPassword();
-
+        this.conf = conf.clone();
         if (tlsEnabled) {
-            this.pulsarSslFactory = (PulsarSslFactory) 
Class.forName(conf.getSslFactoryPlugin())
-                    .getConstructor().newInstance();
-            try {
-                PulsarSslConfiguration sslConfiguration = 
buildSslConfiguration(conf);
-                this.pulsarSslFactory.initialize(sslConfiguration);
-                this.pulsarSslFactory.createInternalSslContext();
-            } catch (Exception e) {
-                log.error("Unable to initialize and create the ssl context", 
e);
-            }
+            this.pulsarSslFactoryMap = new ConcurrentHashMap<>();
             if (scheduledExecutorService != null && 
conf.getAutoCertRefreshSeconds() > 0) {
                 scheduledExecutorService.scheduleWithFixedDelay(() -> 
this.refreshSslContext(conf),
                         conf.getAutoCertRefreshSeconds(),
                         conf.getAutoCertRefreshSeconds(),
                         TimeUnit.SECONDS);
             }
-
         } else {
-            pulsarSslFactory = null;
+            this.pulsarSslFactoryMap = null;
         }
     }
 
@@ -123,6 +116,23 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
         CompletableFuture<Channel> initTlsFuture = new CompletableFuture<>();
         ch.eventLoop().execute(() -> {
             try {
+                PulsarSslFactory pulsarSslFactory = 
pulsarSslFactoryMap.computeIfAbsent(sniHost.getHostName(), key -> {
+                    try {
+                        PulsarSslFactory factory = (PulsarSslFactory) 
Class.forName(conf.getSslFactoryPlugin())
+                                .getConstructor().newInstance();
+                        PulsarSslConfiguration sslConfiguration = 
buildSslConfiguration(conf, key);
+                        factory.initialize(sslConfiguration);
+                        factory.createInternalSslContext();
+                        return factory;
+                    } catch (Exception e) {
+                        log.error("Unable to initialize and create the ssl 
context", e);
+                        initTlsFuture.completeExceptionally(e);
+                        return null;
+                    }
+                });
+                if (pulsarSslFactory == null) {
+                    return;
+                }
                 SslHandler handler = new SslHandler(pulsarSslFactory
                         .createClientSslEngine(ch.alloc(), 
sniHost.getHostName(), sniHost.getPort()));
 
@@ -181,7 +191,9 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
             return ch;
         }));
     }
-    protected PulsarSslConfiguration 
buildSslConfiguration(ClientConfigurationData config)
+
+protected PulsarSslConfiguration buildSslConfiguration(ClientConfigurationData 
config,
+                                                       String host)
             throws PulsarClientException {
         return PulsarSslConfiguration.builder()
                 .tlsProvider(config.getSslProvider())
@@ -200,28 +212,30 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
                 .requireTrustedClientCertOnConnect(false)
                 .tlsEnabledWithKeystore(config.isUseKeyStoreTls())
                 .tlsCustomParams(config.getSslFactoryPluginParams())
-                .authData(config.getAuthentication().getAuthData())
+                .authData(config.getAuthentication().getAuthData(host))
                 .serverMode(false)
                 .build();
     }
 
     protected void refreshSslContext(ClientConfigurationData conf) {
-        try {
+        pulsarSslFactoryMap.forEach((key, pulsarSslFactory) -> {
             try {
-                if (conf.isUseKeyStoreTls()) {
-                    this.pulsarSslFactory.getInternalSslContext();
-                } else {
-                    this.pulsarSslFactory.getInternalNettySslContext();
+                try {
+                    if (conf.isUseKeyStoreTls()) {
+                        pulsarSslFactory.getInternalSslContext();
+                    } else {
+                        pulsarSslFactory.getInternalNettySslContext();
+                    }
+                } catch (Exception e) {
+                    log.error("SSL Context is not initialized", e);
+                    PulsarSslConfiguration sslConfiguration = 
buildSslConfiguration(conf, key);
+                    pulsarSslFactory.initialize(sslConfiguration);
                 }
+                pulsarSslFactory.update();
             } catch (Exception e) {
-                log.error("SSL Context is not initialized", e);
-                PulsarSslConfiguration sslConfiguration = 
buildSslConfiguration(conf);
-                this.pulsarSslFactory.initialize(sslConfiguration);
+                log.error("Failed to refresh SSL context", e);
             }
-            this.pulsarSslFactory.update();
-        } catch (Exception e) {
-            log.error("Failed to refresh SSL context", e);
-        }
+        });
     }
 
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
index 2682d011cd0..f7ff30c286c 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
@@ -41,6 +41,6 @@ public class ClientInitializationTest {
                 .build();
 
         verify(auth).start();
-        verify(auth, times(1)).getAuthData();
+        verify(auth, times(0)).getAuthData();
     }
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index 54b6db5198c..8d0c2c3ae39 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -38,6 +38,7 @@ import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -290,6 +291,19 @@ class AdminProxyHandler extends ProxyServlet {
         return new JettyHttpClient();
     }
 
+    private String getWebServiceUrl() throws PulsarServerException {
+        if (isBlank(brokerWebServiceUrl)) {
+            ServiceLookupData availableBroker = discoveryProvider.nextBroker();
+            if (config.isTlsEnabledWithBroker()) {
+                return availableBroker.getWebServiceUrlTls();
+            } else {
+                return availableBroker.getWebServiceUrl();
+            }
+        } else {
+            return brokerWebServiceUrl;
+        }
+    }
+
     @Override
     protected String rewriteTarget(HttpServletRequest request) {
         StringBuilder url = new StringBuilder();
@@ -305,17 +319,10 @@ class AdminProxyHandler extends ProxyServlet {
 
         if (isFunctionsRestRequest && !isBlank(functionWorkerWebServiceUrl)) {
             url.append(functionWorkerWebServiceUrl);
-        } else if (isBlank(brokerWebServiceUrl)) {
+        } else {
             try {
-                ServiceLookupData availableBroker = 
discoveryProvider.nextBroker();
-
-                if (config.isTlsEnabledWithBroker()) {
-                    url.append(availableBroker.getWebServiceUrlTls());
-                } else {
-                    url.append(availableBroker.getWebServiceUrl());
-                }
-
-                if (LOG.isDebugEnabled()) {
+                url.append(getWebServiceUrl());
+                if (LOG.isDebugEnabled() && isBlank(brokerWebServiceUrl)) {
                     LOG.debug("[{}:{}] Selected active broker is {}", 
request.getRemoteAddr(), request.getRemotePort(),
                             url);
                 }
@@ -324,8 +331,6 @@ class AdminProxyHandler extends ProxyServlet {
                         request.getRemotePort(), e.getMessage(), e);
                 return null;
             }
-        } else {
-            url.append(brokerWebServiceUrl);
         }
 
         if (url.lastIndexOf("/") == url.length() - 1) {
@@ -398,7 +403,8 @@ class AdminProxyHandler extends ProxyServlet {
     protected PulsarSslFactory createPulsarSslFactory() {
         try {
             try {
-                AuthenticationDataProvider authData = 
proxyClientAuthentication.getAuthData();
+                AuthenticationDataProvider authData =
+                        
proxyClientAuthentication.getAuthData(URI.create(getWebServiceUrl()).getHost());
                 PulsarSslConfiguration pulsarSslConfiguration = 
buildSslConfiguration(authData);
                 PulsarSslFactory sslFactory =
                         (PulsarSslFactory) 
Class.forName(config.getBrokerClientSslFactoryPlugin())
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 407c93074a0..681aa553c48 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -45,6 +45,8 @@ import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.util.CharsetUtil;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import lombok.Getter;
 import lombok.SneakyThrows;
@@ -87,7 +89,7 @@ public class DirectProxyHandler {
     private final Runnable onHandshakeCompleteAction;
     private final boolean tlsHostnameVerificationEnabled;
     final boolean tlsEnabledWithBroker;
-    private PulsarSslFactory sslFactory;
+    private Map<String, PulsarSslFactory> pulsarSslFactoryMap;
 
     @SneakyThrows
     public DirectProxyHandler(ProxyService service, ProxyConnection 
proxyConnection) {
@@ -102,27 +104,42 @@ public class DirectProxyHandler {
         this.tlsEnabledWithBroker = 
service.getConfiguration().isTlsEnabledWithBroker();
         this.tlsHostnameVerificationEnabled = 
service.getConfiguration().isTlsHostnameVerificationEnabled();
         this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask;
-        ProxyConfiguration config = service.getConfiguration();
-
-        if (tlsEnabledWithBroker) {
-            AuthenticationDataProvider authData = null;
-
-            if (!isEmpty(config.getBrokerClientAuthenticationPlugin())) {
-                try {
-                    authData = authentication.getAuthData();
-                } catch (PulsarClientException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-            PulsarSslConfiguration sslConfiguration = 
buildSslConfiguration(config, authData);
-            this.sslFactory = (PulsarSslFactory) 
Class.forName(config.getSslFactoryPlugin())
-                    .getConstructor().newInstance();
-            this.sslFactory.initialize(sslConfiguration);
-            this.sslFactory.createInternalSslContext();
-        }
+        this.pulsarSslFactoryMap = new ConcurrentHashMap<>();
     }
 
     public void connect(String brokerHostAndPort, InetSocketAddress 
targetBrokerAddress, int protocolVersion) {
+        String remoteHost;
+        try {
+            remoteHost = parseHost(brokerHostAndPort);
+        } catch (IllegalArgumentException e) {
+            log.warn("[{}] Failed to parse broker host '{}'", inboundChannel, 
brokerHostAndPort, e);
+            inboundChannel.close();
+            return;
+        }
+        PulsarSslFactory sslFactory =
+                tlsEnabledWithBroker ? 
pulsarSslFactoryMap.computeIfAbsent(remoteHost, (hostname) -> {
+                    AuthenticationDataProvider authData = null;
+
+                    if 
(!isEmpty(service.getConfiguration().getBrokerClientAuthenticationPlugin())) {
+                        try {
+                            authData = authentication.getAuthData(remoteHost);
+                        } catch (PulsarClientException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    PulsarSslConfiguration sslConfiguration =
+                            buildSslConfiguration(service.getConfiguration(), 
authData);
+                    try {
+                        PulsarSslFactory factory =
+                                (PulsarSslFactory) 
Class.forName(service.getConfiguration().getSslFactoryPlugin())
+                                        .getConstructor().newInstance();
+                        factory.initialize(sslConfiguration);
+                        factory.createInternalSslContext();
+                        return factory;
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                }) : null;
         ProxyConfiguration config = service.getConfiguration();
 
         // Start the connection attempt.
@@ -142,15 +159,6 @@ public class DirectProxyHandler {
             b.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
         }
 
-        String remoteHost;
-        try {
-            remoteHost = parseHost(brokerHostAndPort);
-        } catch (IllegalArgumentException e) {
-            log.warn("[{}] Failed to parse broker host '{}'", inboundChannel, 
brokerHostAndPort, e);
-            inboundChannel.close();
-            return;
-        }
-
         b.handler(new ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(SocketChannel ch) {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
index 1148234be62..ee8ae8d4afb 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
@@ -59,6 +59,7 @@ public class ProxyServiceTlsStarterTest extends 
MockedPulsarServiceBaseTest {
         
serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
         
serviceStarter.getConfig().setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
         
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
+        
serviceStarter.getConfig().setBrokerWebServiceURLTLS(pulsar.getWebServiceAddressTls());
         
serviceStarter.getConfig().setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);
         
serviceStarter.getConfig().setBrokerClientCertificateFilePath(BROKER_CERT_FILE_PATH);
         
serviceStarter.getConfig().setBrokerClientKeyFilePath(BROKER_KEY_FILE_PATH);
@@ -79,6 +80,7 @@ public class ProxyServiceTlsStarterTest extends 
MockedPulsarServiceBaseTest {
     protected void doInitConf() throws Exception {
         super.doInitConf();
         this.conf.setBrokerServicePortTls(Optional.of(0));
+        this.conf.setWebServicePortTls(Optional.of(0));
         this.conf.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
         this.conf.setTlsKeyFilePath(PROXY_KEY_FILE_PATH);
     }
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
index 4d73fd9f9b4..4fabf6d2185 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
@@ -251,7 +251,7 @@ public class PerformanceClient extends CmdBase {
                     Authentication auth = 
AuthenticationFactory.create(this.authPluginClassName,
                             this.authParams);
                     auth.start();
-                    AuthenticationDataProvider authData = auth.getAuthData();
+                    AuthenticationDataProvider authData = 
auth.getAuthData(produceUri.getHost());
                     if (authData.hasDataForHttp()) {
                         for (Map.Entry<String, String> kv : 
authData.getHttpHeaders()) {
                             produceRequest.setHeader(kv.getKey(), 
kv.getValue());

Reply via email to