This is an automated email from the ASF dual-hosted git repository.
Technoboy- pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e131b1cc0dd [fix][client] Make ClientBuilder serializable (#25730)
e131b1cc0dd is described below
commit e131b1cc0dd6579f9743f0525f9ca26867b0f40c
Author: jiangpengcheng <[email protected]>
AuthorDate: Sat May 9 19:16:56 2026 +0800
[fix][client] Make ClientBuilder serializable (#25730)
---
.../impl/auth/oauth2/ClientCredentialsFlow.java | 2 +-
.../pulsar/client/impl/auth/oauth2/FlowBase.java | 32 ++++++++++++++++++----
.../client/impl/auth/oauth2/TlsClientAuthFlow.java | 2 +-
.../impl/auth/oauth2/AuthenticationOAuth2Test.java | 23 ++++++++++++++++
4 files changed, 51 insertions(+), 8 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
index 703c201c864..4d07347597a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
@@ -147,7 +147,7 @@ class ClientCredentialsFlow extends FlowBase {
assert this.metadata != null;
URL tokenUrl = this.metadata.getTokenEndpoint();
- this.exchanger = new TokenClient(tokenUrl, httpClient);
+ this.exchanger = new TokenClient(tokenUrl, getHttpClient());
initialized = true;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
index 159508dffef..e4b57305833 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
@@ -67,22 +67,32 @@ abstract class FlowBase implements Flow {
private static final long serialVersionUID = 1L;
protected final URL issuerUrl;
- protected final AsyncHttpClient httpClient;
+ private final Duration connectTimeout;
+ private final Duration readTimeout;
+ private final String trustCertsFilePath;
+ private final String certFile;
+ private final String keyFile;
+ private final long autoCertRefreshSeconds;
protected final String wellKnownMetadataPath;
protected transient PulsarSslFactory sslFactory;
protected transient ScheduledExecutorService sslRefreshScheduler;
protected transient Metadata metadata;
+ private transient AsyncHttpClient httpClient;
protected FlowBase(URL issuerUrl, Duration connectTimeout, Duration
readTimeout, String trustCertsFilePath,
String certFile, String keyFile, Duration
autoCertRefreshDuration,
String wellKnownMetadataPath) {
this.issuerUrl = issuerUrl;
- this.httpClient = defaultHttpClient(readTimeout, connectTimeout,
trustCertsFilePath, certFile, keyFile);
- long autoCertRefreshSeconds =
getParameterDurationToSeconds(CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION,
+ this.connectTimeout = connectTimeout;
+ this.readTimeout = readTimeout;
+ this.trustCertsFilePath = trustCertsFilePath;
+ this.certFile = certFile;
+ this.keyFile = keyFile;
+ this.autoCertRefreshSeconds =
getParameterDurationToSeconds(CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION,
autoCertRefreshDuration, DEFAULT_AUTO_CERT_REFRESH_DURATION);
- scheduleSslContextRefreshIfEnabled(autoCertRefreshSeconds);
this.wellKnownMetadataPath = wellKnownMetadataPath;
+ getHttpClient();
}
private AsyncHttpClient defaultHttpClient(Duration readTimeout, Duration
connectTimeout,
@@ -133,6 +143,14 @@ abstract class FlowBase implements Flow {
return new DefaultAsyncHttpClient(confBuilder.build());
}
+ protected synchronized AsyncHttpClient getHttpClient() {
+ if (httpClient == null) {
+ httpClient = defaultHttpClient(readTimeout, connectTimeout,
trustCertsFilePath, certFile, keyFile);
+ scheduleSslContextRefreshIfEnabled(autoCertRefreshSeconds);
+ }
+ return httpClient;
+ }
+
private void scheduleSslContextRefreshIfEnabled(long refreshSeconds) {
if (sslFactory == null || refreshSeconds <= 0 || sslRefreshScheduler
!= null) {
return;
@@ -188,7 +206,7 @@ abstract class FlowBase implements Flow {
}
protected MetadataResolver createMetadataResolver() {
- return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, httpClient,
wellKnownMetadataPath);
+ return DefaultMetadataResolver.fromIssuerUrl(issuerUrl,
getHttpClient(), wellKnownMetadataPath);
}
static String parseParameterString(Map<String, String> params, String
name) {
@@ -228,7 +246,9 @@ abstract class FlowBase implements Flow {
if (sslRefreshScheduler != null) {
sslRefreshScheduler.shutdownNow();
}
- httpClient.close();
+ if (httpClient != null) {
+ httpClient.close();
+ }
if (sslFactory != null) {
sslFactory.close();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlow.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlow.java
index fafe3179ebf..d592002b4a4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlow.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlow.java
@@ -113,7 +113,7 @@ class TlsClientAuthFlow extends FlowBase {
assert this.metadata != null;
URL tokenUrl = this.metadata.getTokenEndpoint();
- this.exchanger = new TokenClient(tokenUrl, httpClient);
+ this.exchanger = new TokenClient(tokenUrl, getHttpClient());
initialized = true;
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
index cfe9d9b4d99..8fc10969431 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
@@ -28,6 +28,8 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertThrows;
import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
@@ -119,6 +121,27 @@ public class AuthenticationOAuth2Test {
assertNotNull(this.auth.flow);
}
+ @Test
+ public void testConfiguredAuthIsSerializable() throws Exception {
+ Map<String, String> params = new HashMap<>();
+ params.put("type", "client_credentials");
+ params.put("privateKey", "data:base64,e30=");
+ params.put("issuerUrl", "http://localhost");
+ params.put("connectTimeout", "PT10S");
+ params.put("readTimeout", "PT30S");
+ ObjectMapper mapper = new ObjectMapper();
+ String authParams = mapper.writeValueAsString(params);
+ AuthenticationOAuth2 configuredAuth = new AuthenticationOAuth2();
+ configuredAuth.configure(authParams);
+
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ObjectOutputStream objectOutputStream = new
ObjectOutputStream(out)) {
+ objectOutputStream.writeObject(configuredAuth);
+ } finally {
+ configuredAuth.close();
+ }
+ }
+
@Test
public void testConfigureWithTlsClientAuth() throws Exception {
Map<String, String> params = new HashMap<>();