This is an automated email from the ASF dual-hosted git repository.

Technoboy- pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e131b1cc0dd [fix][client] Make ClientBuilder serializable (#25730)
e131b1cc0dd is described below

commit e131b1cc0dd6579f9743f0525f9ca26867b0f40c
Author: jiangpengcheng <[email protected]>
AuthorDate: Sat May 9 19:16:56 2026 +0800

    [fix][client] Make ClientBuilder serializable (#25730)
---
 .../impl/auth/oauth2/ClientCredentialsFlow.java    |  2 +-
 .../pulsar/client/impl/auth/oauth2/FlowBase.java   | 32 ++++++++++++++++++----
 .../client/impl/auth/oauth2/TlsClientAuthFlow.java |  2 +-
 .../impl/auth/oauth2/AuthenticationOAuth2Test.java | 23 ++++++++++++++++
 4 files changed, 51 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
index 703c201c864..4d07347597a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
@@ -147,7 +147,7 @@ class ClientCredentialsFlow extends FlowBase {
         assert this.metadata != null;
 
         URL tokenUrl = this.metadata.getTokenEndpoint();
-        this.exchanger = new TokenClient(tokenUrl, httpClient);
+        this.exchanger = new TokenClient(tokenUrl, getHttpClient());
         initialized = true;
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
index 159508dffef..e4b57305833 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
@@ -67,22 +67,32 @@ abstract class FlowBase implements Flow {
     private static final long serialVersionUID = 1L;
 
     protected final URL issuerUrl;
-    protected final AsyncHttpClient httpClient;
+    private final Duration connectTimeout;
+    private final Duration readTimeout;
+    private final String trustCertsFilePath;
+    private final String certFile;
+    private final String keyFile;
+    private final long autoCertRefreshSeconds;
     protected final String wellKnownMetadataPath;
 
     protected transient PulsarSslFactory sslFactory;
     protected transient ScheduledExecutorService sslRefreshScheduler;
     protected transient Metadata metadata;
+    private transient AsyncHttpClient httpClient;
 
     protected FlowBase(URL issuerUrl, Duration connectTimeout, Duration 
readTimeout, String trustCertsFilePath,
                        String certFile, String keyFile, Duration 
autoCertRefreshDuration,
                        String wellKnownMetadataPath) {
         this.issuerUrl = issuerUrl;
-        this.httpClient = defaultHttpClient(readTimeout, connectTimeout, 
trustCertsFilePath, certFile, keyFile);
-        long autoCertRefreshSeconds = 
getParameterDurationToSeconds(CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION,
+        this.connectTimeout = connectTimeout;
+        this.readTimeout = readTimeout;
+        this.trustCertsFilePath = trustCertsFilePath;
+        this.certFile = certFile;
+        this.keyFile = keyFile;
+        this.autoCertRefreshSeconds = 
getParameterDurationToSeconds(CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION,
                 autoCertRefreshDuration, DEFAULT_AUTO_CERT_REFRESH_DURATION);
-        scheduleSslContextRefreshIfEnabled(autoCertRefreshSeconds);
         this.wellKnownMetadataPath = wellKnownMetadataPath;
+        getHttpClient();
     }
 
     private AsyncHttpClient defaultHttpClient(Duration readTimeout, Duration 
connectTimeout,
@@ -133,6 +143,14 @@ abstract class FlowBase implements Flow {
         return new DefaultAsyncHttpClient(confBuilder.build());
     }
 
+    protected synchronized AsyncHttpClient getHttpClient() {
+        if (httpClient == null) {
+            httpClient = defaultHttpClient(readTimeout, connectTimeout, 
trustCertsFilePath, certFile, keyFile);
+            scheduleSslContextRefreshIfEnabled(autoCertRefreshSeconds);
+        }
+        return httpClient;
+    }
+
     private void scheduleSslContextRefreshIfEnabled(long refreshSeconds) {
         if (sslFactory == null || refreshSeconds <= 0 || sslRefreshScheduler 
!= null) {
             return;
@@ -188,7 +206,7 @@ abstract class FlowBase implements Flow {
     }
 
     protected MetadataResolver createMetadataResolver() {
-        return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, httpClient, 
wellKnownMetadataPath);
+        return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, 
getHttpClient(), wellKnownMetadataPath);
     }
 
     static String parseParameterString(Map<String, String> params, String 
name) {
@@ -228,7 +246,9 @@ abstract class FlowBase implements Flow {
         if (sslRefreshScheduler != null) {
             sslRefreshScheduler.shutdownNow();
         }
-        httpClient.close();
+        if (httpClient != null) {
+            httpClient.close();
+        }
         if (sslFactory != null) {
             sslFactory.close();
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlow.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlow.java
index fafe3179ebf..d592002b4a4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlow.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlow.java
@@ -113,7 +113,7 @@ class TlsClientAuthFlow extends FlowBase {
         assert this.metadata != null;
 
         URL tokenUrl = this.metadata.getTokenEndpoint();
-        this.exchanger = new TokenClient(tokenUrl, httpClient);
+        this.exchanger = new TokenClient(tokenUrl, getHttpClient());
 
         initialized = true;
     }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
index cfe9d9b4d99..8fc10969431 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
@@ -28,6 +28,8 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertThrows;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
@@ -119,6 +121,27 @@ public class AuthenticationOAuth2Test {
         assertNotNull(this.auth.flow);
     }
 
+    @Test
+    public void testConfiguredAuthIsSerializable() throws Exception {
+        Map<String, String> params = new HashMap<>();
+        params.put("type", "client_credentials");
+        params.put("privateKey", "data:base64,e30=");
+        params.put("issuerUrl", "http://localhost";);
+        params.put("connectTimeout", "PT10S");
+        params.put("readTimeout", "PT30S");
+        ObjectMapper mapper = new ObjectMapper();
+        String authParams = mapper.writeValueAsString(params);
+        AuthenticationOAuth2 configuredAuth = new AuthenticationOAuth2();
+        configuredAuth.configure(authParams);
+
+        try (ByteArrayOutputStream out = new ByteArrayOutputStream();
+            ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(out)) {
+            objectOutputStream.writeObject(configuredAuth);
+        } finally {
+            configuredAuth.close();
+        }
+    }
+
     @Test
     public void testConfigureWithTlsClientAuth() throws Exception {
         Map<String, String> params = new HashMap<>();

Reply via email to