This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new fe6d02812cb [improve][client] Implement tls_client_auth for 
AuthenticationOAuth2 (#25538)
fe6d02812cb is described below

commit fe6d02812cb9abad2c756bf19eca68d8a66fd0e7
Author: Hideaki Oguni <[email protected]>
AuthorDate: Wed Apr 29 00:41:09 2026 +0900

    [improve][client] Implement tls_client_auth for AuthenticationOAuth2 
(#25538)
    
    Co-authored-by: hoguni <[email protected]>
    
    (cherry-pick from commit d6ca792aa1b09708673f5f632829adb932fcfa0f)
---
 .../auth/oauth2/AuthenticationFactoryOAuth2.java   | 113 +++++++++++++++++--
 .../impl/auth/oauth2/AuthenticationOAuth2.java     |  13 ++-
 .../impl/auth/oauth2/ClientCredentialsFlow.java    |  14 ++-
 .../pulsar/client/impl/auth/oauth2/FlowBase.java   | 108 ++++++++++++++-----
 ...CredentialsFlow.java => TlsClientAuthFlow.java} | 119 ++++++++-------------
 .../protocol/ClientCredentialsExchangeRequest.java |   3 +
 .../impl/auth/oauth2/protocol/TokenClient.java     |   7 +-
 ...geRequest.java => TokenEndpointAuthMethod.java} |  40 ++++---
 .../oauth2/AuthenticationFactoryOAuth2Test.java    |  49 ++++++++-
 .../impl/auth/oauth2/AuthenticationOAuth2Test.java |  47 +++++++-
 .../impl/auth/oauth2/OAuth2MockHttpClient.java     |  49 +++++++++
 .../impl/auth/oauth2/TlsClientAuthFlowTest.java}   |  44 ++++----
 .../impl/auth/oauth2/protocol/TokenClientTest.java |  88 ++++++++++++++-
 13 files changed, 533 insertions(+), 161 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
index b035b02437b..2b4ee9940b8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
@@ -21,8 +21,10 @@ package org.apache.pulsar.client.impl.auth.oauth2;
 import java.net.URL;
 import java.time.Duration;
 import java.util.concurrent.ScheduledExecutorService;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Authentication;
 import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver;
+import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenEndpointAuthMethod;
 
 /**
  * Factory class that allows to create {@link Authentication} instances
@@ -92,18 +94,36 @@ public final class AuthenticationFactoryOAuth2 {
 
         private URL issuerUrl;
         private URL credentialsUrl;
+        private TokenEndpointAuthMethod tokenEndpointAuthMethod =
+                TokenEndpointAuthMethod.CLIENT_SECRET_POST;
+        private String clientId;
+        private String tlsCertFile;
+        private String tlsKeyFile;
         private String audience;
         private String scope;
         private Duration connectTimeout;
         private Duration readTimeout;
         private String trustCertsFilePath;
         private String wellKnownMetadataPath;
+        private Duration autoCertRefreshDuration;
         private double earlyTokenRefreshPercent = 
AuthenticationOAuth2.EARLY_TOKEN_REFRESH_PERCENT_DEFAULT;
         private ScheduledExecutorService scheduler;
 
         private ClientCredentialsBuilder() {
         }
 
+        /**
+         * Optional token endpoint auth method.
+         * Defaults to {@code client_secret_post}.
+         *
+         * @param tokenEndpointAuthMethod the token endpoint auth method
+         * @return the builder
+         */
+        public ClientCredentialsBuilder 
tokenEndpointAuthMethod(TokenEndpointAuthMethod tokenEndpointAuthMethod) {
+            this.tokenEndpointAuthMethod = tokenEndpointAuthMethod;
+            return this;
+        }
+
         /**
          * Required issuer URL.
          *
@@ -126,6 +146,43 @@ public final class AuthenticationFactoryOAuth2 {
             return this;
         }
 
+        /**
+         * Optional path to the file for a client certificate.
+         * Required when {@code tokenEndpointAuthMethod} is {@code 
tls_client_auth}
+         *
+         * @param tlsCertFile the path to the file for a client certificate
+         * @return the builder
+         */
+        public ClientCredentialsBuilder tlsCertFile(String tlsCertFile) {
+            this.tlsCertFile = tlsCertFile;
+            return this;
+        }
+
+        /**
+         * Optional path to the file for a client private key.
+         * Required when {@code tokenEndpointAuthMethod} is {@code 
tls_client_auth}
+         *
+         * @param tlsKeyFile the path to the file for a client private key
+         * @return the builder
+         */
+        public ClientCredentialsBuilder tlsKeyFile(String tlsKeyFile) {
+            this.tlsKeyFile = tlsKeyFile;
+            return this;
+        }
+
+        /**
+         * Optional client identifier issued by the authorization server.
+         * Only used by {@code tls_client_auth}.
+         * Defaults to {@code pulsar-client} when not provided.
+         *
+         * @param clientId the client identifier
+         * @return the builder
+         */
+        public ClientCredentialsBuilder clientId(String clientId) {
+            this.clientId = clientId;
+            return this;
+        }
+
         /**
          * Optional audience identifier used by some Identity Providers, like 
Auth0.
          *
@@ -137,6 +194,17 @@ public final class AuthenticationFactoryOAuth2 {
             return this;
         }
 
+        /**
+         * Optional certificate refresh interval.
+         *
+         * @param autoCertRefreshDuration the Certificate refresh interval
+         * @return the builder
+         */
+        public ClientCredentialsBuilder autoCertRefreshDuration(Duration 
autoCertRefreshDuration) {
+            this.autoCertRefreshDuration = autoCertRefreshDuration;
+            return this;
+        }
+
         /**
          * Optional scope expressed as a list of space-delimited, 
case-sensitive strings.
          * The strings are defined by the authorization server.
@@ -236,16 +304,41 @@ public final class AuthenticationFactoryOAuth2 {
          * @return an Authentication object
          */
         public Authentication build() {
-            ClientCredentialsFlow flow = ClientCredentialsFlow.builder()
-                    .issuerUrl(issuerUrl)
-                    .privateKey(credentialsUrl == null ? null : 
credentialsUrl.toExternalForm())
-                    .audience(audience)
-                    .scope(scope)
-                    .connectTimeout(connectTimeout)
-                    .readTimeout(readTimeout)
-                    .trustCertsFilePath(trustCertsFilePath)
-                    .wellKnownMetadataPath(wellKnownMetadataPath)
-                    .build();
+            Flow flow;
+            if (tokenEndpointAuthMethod == 
TokenEndpointAuthMethod.CLIENT_SECRET_POST) {
+                flow = ClientCredentialsFlow.builder()
+                        .issuerUrl(issuerUrl)
+                        .privateKey(credentialsUrl == null ? null : 
credentialsUrl.toExternalForm())
+                        .audience(audience)
+                        .scope(scope)
+                        .connectTimeout(connectTimeout)
+                        .readTimeout(readTimeout)
+                        .trustCertsFilePath(trustCertsFilePath)
+                        .certFile(tlsCertFile)
+                        .keyFile(tlsKeyFile)
+                        .autoCertRefreshDuration(autoCertRefreshDuration)
+                        .wellKnownMetadataPath(wellKnownMetadataPath)
+                        .build();
+            } else if (tokenEndpointAuthMethod == 
TokenEndpointAuthMethod.TLS_CLIENT_AUTH) {
+                if (StringUtils.isBlank(tlsCertFile) || 
StringUtils.isBlank(tlsKeyFile)) {
+                    throw new IllegalArgumentException("Required configuration 
parameters: tlsCertFile, tlsKeyFile");
+                }
+                flow = TlsClientAuthFlow.builder()
+                        .issuerUrl(issuerUrl)
+                        .clientId(clientId)
+                        .certFile(tlsCertFile)
+                        .keyFile(tlsKeyFile)
+                        .audience(audience)
+                        .scope(scope)
+                        .connectTimeout(connectTimeout)
+                        .readTimeout(readTimeout)
+                        .trustCertsFilePath(trustCertsFilePath)
+                        .wellKnownMetadataPath(wellKnownMetadataPath)
+                        .autoCertRefreshDuration(autoCertRefreshDuration)
+                        .build();
+            } else {
+                throw new IllegalArgumentException("Unsupported auth method: " 
+ tokenEndpointAuthMethod);
+            }
             return new AuthenticationOAuth2(flow, earlyTokenRefreshPercent, 
scheduler);
         }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
index a8a52909136..b760b16c532 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
@@ -38,6 +38,7 @@ import 
org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.AuthenticationUtil;
+import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenEndpointAuthMethod;
 import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
 import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.BackoffBuilder;
@@ -73,6 +74,7 @@ import org.apache.pulsar.common.util.BackoffBuilder;
 public class AuthenticationOAuth2 implements Authentication, 
EncodedAuthenticationParameterSupport {
 
     public static final String CONFIG_PARAM_TYPE = "type";
+    public static final String CONFIG_PARAM_TOKEN_ENDPOINT_AUTH_METHOD = 
"tokenEndpointAuthMethod";
     public static final String CONFIG_PARAM_EARLY_TOKEN_REFRESH_PERCENT = 
"earlyTokenRefreshPercent";
     public static final String TYPE_CLIENT_CREDENTIALS = "client_credentials";
     public static final int EARLY_TOKEN_REFRESH_PERCENT_DEFAULT = 1; // 
feature disabled by default
@@ -158,7 +160,16 @@ public class AuthenticationOAuth2 implements 
Authentication, EncodedAuthenticati
         Map<String, String> params = 
parseAuthParameters(encodedAuthParamString);
         String type = params.getOrDefault(CONFIG_PARAM_TYPE, 
TYPE_CLIENT_CREDENTIALS);
         if (TYPE_CLIENT_CREDENTIALS.equals(type)) {
-            this.flow = ClientCredentialsFlow.fromParameters(params);
+            TokenEndpointAuthMethod authMethod = 
TokenEndpointAuthMethod.fromValue(
+                    
params.getOrDefault(CONFIG_PARAM_TOKEN_ENDPOINT_AUTH_METHOD,
+                            
TokenEndpointAuthMethod.CLIENT_SECRET_POST.value()));
+            if (authMethod == TokenEndpointAuthMethod.CLIENT_SECRET_POST) {
+                this.flow = ClientCredentialsFlow.fromParameters(params);
+            } else if (authMethod == TokenEndpointAuthMethod.TLS_CLIENT_AUTH) {
+                this.flow = TlsClientAuthFlow.fromParameters(params);
+            } else {
+                throw new IllegalArgumentException("Unsupported auth method: " 
+ authMethod);
+            }
         } else {
             throw new IllegalArgumentException("Unsupported authentication 
type: " + type);
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
index d841010add8..6692e86848b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchangeRequest;
 import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchanger;
 import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenClient;
+import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenEndpointAuthMethod;
 import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenExchangeException;
 import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
 
@@ -64,8 +65,10 @@ class ClientCredentialsFlow extends FlowBase {
     @Builder
     public ClientCredentialsFlow(URL issuerUrl, String audience, String 
privateKey, String scope,
                                  Duration connectTimeout, Duration 
readTimeout, String trustCertsFilePath,
+                                 String certFile, String keyFile, Duration 
autoCertRefreshDuration,
                                  String wellKnownMetadataPath) {
-        super(issuerUrl, connectTimeout, readTimeout, trustCertsFilePath, 
wellKnownMetadataPath);
+        super(issuerUrl, connectTimeout, readTimeout, trustCertsFilePath, 
certFile, keyFile, autoCertRefreshDuration,
+                wellKnownMetadataPath);
         this.audience = audience;
         this.privateKey = privateKey;
         this.scope = scope;
@@ -87,6 +90,9 @@ class ClientCredentialsFlow extends FlowBase {
         Duration connectTimeout = parseParameterDuration(params, 
CONFIG_PARAM_CONNECT_TIMEOUT);
         Duration readTimeout = parseParameterDuration(params, 
CONFIG_PARAM_READ_TIMEOUT);
         String trustCertsFilePath = 
params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH);
+        String certFile = params.get(CONFIG_PARAM_CERT_FILE);
+        String keyFile = params.get(CONFIG_PARAM_TLS_KEY_FILE);
+        Duration autoCertRefreshDuration = parseParameterDuration(params, 
CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION);
         String wellKnownMetadataPath = 
params.get(CONFIG_PARAM_WELL_KNOWN_METADATA_PATH);
 
         return ClientCredentialsFlow.builder()
@@ -97,6 +103,9 @@ class ClientCredentialsFlow extends FlowBase {
                 .connectTimeout(connectTimeout)
                 .readTimeout(readTimeout)
                 .trustCertsFilePath(trustCertsFilePath)
+                .certFile(certFile)
+                .keyFile(keyFile)
+                .autoCertRefreshDuration(autoCertRefreshDuration)
                 .wellKnownMetadataPath(wellKnownMetadataPath)
                 .build();
     }
@@ -138,7 +147,7 @@ class ClientCredentialsFlow extends FlowBase {
         assert this.metadata != null;
 
         URL tokenUrl = this.metadata.getTokenEndpoint();
-        this.exchanger = new TokenClient(tokenUrl, getHttpClient());
+        this.exchanger = new TokenClient(tokenUrl, httpClient);
         initialized = true;
     }
 
@@ -157,6 +166,7 @@ class ClientCredentialsFlow extends FlowBase {
                 .clientSecret(keyFile.getClientSecret())
                 .audience(this.audience)
                 .scope(this.scope)
+                .authMethod(TokenEndpointAuthMethod.CLIENT_SECRET_POST)
                 .build();
         TokenResult tr;
         if (!initialized) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
index 63248e86051..d1caa82e07c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
@@ -26,6 +26,9 @@ import java.net.URL;
 import java.time.Duration;
 import java.time.format.DateTimeParseException;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import javax.net.ssl.SSLException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -34,9 +37,14 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver;
 import org.apache.pulsar.client.impl.auth.oauth2.protocol.Metadata;
 import org.apache.pulsar.client.impl.auth.oauth2.protocol.MetadataResolver;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.client.util.PulsarHttpAsyncSslEngineFactory;
+import org.apache.pulsar.common.util.PulsarSslConfiguration;
+import org.apache.pulsar.common.util.PulsarSslFactory;
 import org.asynchttpclient.AsyncHttpClient;
 import org.asynchttpclient.DefaultAsyncHttpClient;
 import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.SslEngineFactory;
 
 /**
  * An abstract OAuth 2.0 authorization flow.
@@ -47,34 +55,37 @@ abstract class FlowBase implements Flow {
     public static final String CONFIG_PARAM_CONNECT_TIMEOUT = "connectTimeout";
     public static final String CONFIG_PARAM_READ_TIMEOUT = "readTimeout";
     public static final String CONFIG_PARAM_TRUST_CERTS_FILE_PATH = 
"trustCertsFilePath";
+    public static final String CONFIG_PARAM_CERT_FILE = "tlsCertFile";
+    public static final String CONFIG_PARAM_TLS_KEY_FILE = "tlsKeyFile";
+    public static final String CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION = 
"autoCertRefreshDuration";
     public static final String CONFIG_PARAM_WELL_KNOWN_METADATA_PATH = 
"wellKnownMetadataPath";
 
     protected static final Duration DEFAULT_CONNECT_TIMEOUT = 
Duration.ofSeconds(10);
     protected static final Duration DEFAULT_READ_TIMEOUT = 
Duration.ofSeconds(30);
+    protected static final Duration DEFAULT_AUTO_CERT_REFRESH_DURATION = 
Duration.ofSeconds(300);
 
     private static final long serialVersionUID = 1L;
 
     protected final URL issuerUrl;
-    private final Duration connectTimeout;
-    private final Duration readTimeout;
-    private final String trustCertsFilePath;
+    protected transient AsyncHttpClient httpClient;
     protected final String wellKnownMetadataPath;
-
+    protected transient PulsarSslFactory sslFactory;
+    protected transient ScheduledExecutorService sslRefreshScheduler;
     protected transient Metadata metadata;
-    private transient AsyncHttpClient httpClient;
 
     protected FlowBase(URL issuerUrl, Duration connectTimeout, Duration 
readTimeout, String trustCertsFilePath,
+                       String certFile, String keyFile, Duration 
autoCertRefreshDuration,
                        String wellKnownMetadataPath) {
         this.issuerUrl = issuerUrl;
-        this.connectTimeout = connectTimeout;
-        this.readTimeout = readTimeout;
-        this.trustCertsFilePath = trustCertsFilePath;
+        this.httpClient = defaultHttpClient(readTimeout, connectTimeout, 
trustCertsFilePath, certFile, keyFile);
+        long autoCertRefreshSeconds = 
getParameterDurationToSeconds(CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION,
+                autoCertRefreshDuration, DEFAULT_AUTO_CERT_REFRESH_DURATION);
+        scheduleSslContextRefreshIfEnabled(autoCertRefreshSeconds);
         this.wellKnownMetadataPath = wellKnownMetadataPath;
-        getHttpClient();
     }
 
     private AsyncHttpClient defaultHttpClient(Duration readTimeout, Duration 
connectTimeout,
-                                              String trustCertsFilePath) {
+                                              String trustCertsFilePath, 
String certFile, String keyFile) {
         DefaultAsyncHttpClientConfig.Builder confBuilder = new 
DefaultAsyncHttpClientConfig.Builder();
         confBuilder.setCookieStore(null);
         confBuilder.setUseProxyProperties(true);
@@ -85,7 +96,31 @@ abstract class FlowBase implements Flow {
         confBuilder.setReadTimeout(
                 getParameterDurationToMillis(CONFIG_PARAM_READ_TIMEOUT, 
readTimeout, DEFAULT_READ_TIMEOUT));
         confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", 
PulsarVersion.getVersion()));
-        if (StringUtils.isNotBlank(trustCertsFilePath)) {
+        boolean hasCertFile = StringUtils.isNotBlank(certFile);
+        boolean hasKeyFile = StringUtils.isNotBlank(keyFile);
+        if (hasCertFile != hasKeyFile) {
+            throw new IllegalArgumentException("Invalid TLS client certificate 
configuration: " + CONFIG_PARAM_CERT_FILE
+                    + " and " + CONFIG_PARAM_TLS_KEY_FILE + " must be provided 
together");
+        }
+        if (hasCertFile && hasKeyFile) {
+            try {
+                PulsarSslConfiguration sslConfiguration = 
PulsarSslConfiguration.builder()
+                        .tlsCertificateFilePath(certFile)
+                        .tlsKeyFilePath(keyFile)
+                        .tlsTrustCertsFilePath(trustCertsFilePath)
+                        .allowInsecureConnection(false)
+                        .serverMode(false)
+                        .isHttps(true)
+                        .build();
+                sslFactory = new 
org.apache.pulsar.common.util.DefaultPulsarSslFactory();
+                sslFactory.initialize(sslConfiguration);
+                sslFactory.createInternalSslContext();
+                SslEngineFactory sslEngineFactory = new 
PulsarHttpAsyncSslEngineFactory(sslFactory, null);
+                confBuilder.setSslEngineFactory(sslEngineFactory);
+            } catch (Exception e) {
+                throw new IllegalArgumentException("Invalid TLS client 
certificate configuration", e);
+            }
+        } else if (StringUtils.isNotBlank(trustCertsFilePath)) {
             try {
                 confBuilder.setSslContext(SslContextBuilder.forClient()
                         .trustManager(new File(trustCertsFilePath))
@@ -97,28 +132,47 @@ abstract class FlowBase implements Flow {
         return new DefaultAsyncHttpClient(confBuilder.build());
     }
 
-    protected synchronized AsyncHttpClient getHttpClient() {
-        if (httpClient == null) {
-            httpClient = defaultHttpClient(readTimeout, connectTimeout, 
trustCertsFilePath);
+    private void scheduleSslContextRefreshIfEnabled(long refreshSeconds) {
+        if (sslFactory == null || refreshSeconds <= 0 || sslRefreshScheduler 
!= null) {
+            return;
+        }
+        sslRefreshScheduler = Executors.newSingleThreadScheduledExecutor(
+                new 
ExecutorProvider.ExtendedThreadFactory("oauth2-tls-cert-refresher", true));
+        sslRefreshScheduler.scheduleWithFixedDelay(this::refreshSslContext,
+                refreshSeconds, refreshSeconds, TimeUnit.SECONDS);
+        log.info("Scheduled TLS certificate refresh, refreshSeconds {}", 
refreshSeconds);
+    }
+
+    private void refreshSslContext() {
+        if (this.sslFactory == null) {
+            return;
+        }
+        try {
+            this.sslFactory.update();
+            log.debug("Successfully refreshed SSL context");
+        } catch (Exception e) {
+            log.error("Failed to refresh SSL context", e);
         }
-        return httpClient;
     }
 
     private int getParameterDurationToMillis(String name, Duration value, 
Duration defaultValue) {
+        return (int) getParameterDuration(name, value, 
defaultValue).toMillis();
+    }
+
+    private long getParameterDurationToSeconds(String name, Duration value, 
Duration defaultValue) {
+        return getParameterDuration(name, value, defaultValue).getSeconds();
+    }
+
+    private Duration getParameterDuration(String name, Duration value, 
Duration defaultValue) {
         Duration duration;
         if (value == null) {
-            if (log.isDebugEnabled()) {
-                log.debug("Configuration for [{}] is using the default value: 
[{}]", name, defaultValue);
-            }
+            log.debug("Configuration is using the default value, name {}, 
defaultValue {}", name, defaultValue);
             duration = defaultValue;
         } else {
-            if (log.isDebugEnabled()) {
-                log.debug("Configuration for [{}] is: [{}]", name, value);
-            }
+            log.debug("Configuration, name {}, value {}", name, value);
             duration = value;
         }
-
-        return (int) duration.toMillis();
+        return duration;
     }
 
     public void initialize() throws PulsarClientException {
@@ -131,7 +185,7 @@ abstract class FlowBase implements Flow {
     }
 
     protected MetadataResolver createMetadataResolver() {
-        return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, 
getHttpClient(), wellKnownMetadataPath);
+        return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, httpClient, 
wellKnownMetadataPath);
     }
 
     static String parseParameterString(Map<String, String> params, String 
name) {
@@ -168,8 +222,14 @@ abstract class FlowBase implements Flow {
 
     @Override
     public void close() throws Exception {
+        if (sslRefreshScheduler != null) {
+            sslRefreshScheduler.shutdownNow();
+        }
         if (httpClient != null) {
             httpClient.close();
         }
+        if (sslFactory != null) {
+            sslFactory.close();
+        }
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlow.java
similarity index 53%
copy from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
copy to 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlow.java
index d841010add8..d61dadef83b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlow.java
@@ -18,145 +18,113 @@
  */
 package org.apache.pulsar.client.impl.auth.oauth2;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.URISyntaxException;
 import java.net.URL;
-import java.net.URLConnection;
-import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.Map;
 import lombok.Builder;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchangeRequest;
-import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchanger;
 import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenClient;
+import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenEndpointAuthMethod;
 import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenExchangeException;
 import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
 
 /**
- * Implementation of OAuth 2.0 Client Credentials flow.
+ * Implementation of OAuth 2.0 Client TLS Authentication flow.
  *
- * @see <a href="https://tools.ietf.org/html/rfc6749#section-4.4";>OAuth 2.0 
RFC 6749, section 4.4</a>
+ * @see <a href="https://datatracker.ietf.org/doc/html/rfc8705";>RFC 8705 - 
OAuth 2.0 Mutual-TLS Client Authentication</a>
  */
 @Slf4j
-class ClientCredentialsFlow extends FlowBase {
+class TlsClientAuthFlow extends FlowBase {
     public static final String CONFIG_PARAM_ISSUER_URL = "issuerUrl";
+    public static final String CONFIG_PARAM_CLIENT_ID = "clientId";
     public static final String CONFIG_PARAM_AUDIENCE = "audience";
-    // Maps to the keyFileUrl
-    public static final String CONFIG_PARAM_KEY_FILE = "privateKey";
     public static final String CONFIG_PARAM_SCOPE = "scope";
+    public static final String CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION =
+            FlowBase.CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION;
+
+    private static final String DEFAULT_CLIENT_ID = "pulsar-client";
 
     private static final long serialVersionUID = 1L;
 
+    private final String clientId;
     private final String audience;
-    private final String privateKey;
     private final String scope;
 
-    private transient ClientCredentialsExchanger exchanger;
+    private transient TokenClient exchanger;
 
     private boolean initialized = false;
 
     @Builder
-    public ClientCredentialsFlow(URL issuerUrl, String audience, String 
privateKey, String scope,
-                                 Duration connectTimeout, Duration 
readTimeout, String trustCertsFilePath,
-                                 String wellKnownMetadataPath) {
-        super(issuerUrl, connectTimeout, readTimeout, trustCertsFilePath, 
wellKnownMetadataPath);
+    public TlsClientAuthFlow(URL issuerUrl, String clientId, String certFile, 
String keyFile, String audience,
+                             String scope, Duration connectTimeout, Duration 
readTimeout, String trustCertsFilePath,
+                             String wellKnownMetadataPath, Duration 
autoCertRefreshDuration) {
+        super(issuerUrl, connectTimeout, readTimeout, trustCertsFilePath, 
certFile, keyFile, autoCertRefreshDuration,
+                wellKnownMetadataPath);
+        this.clientId = StringUtils.defaultIfBlank(clientId, 
DEFAULT_CLIENT_ID);
         this.audience = audience;
-        this.privateKey = privateKey;
         this.scope = scope;
     }
 
-
     /**
-     * Constructs a {@link ClientCredentialsFlow} from configuration 
parameters.
+     * Constructs a {@link TlsClientAuthFlow} from configuration parameters.
      *
-     * @param params
-     * @return
+     * @param params Configuration parameters
+     * @return A new TlsClientAuthFlow instance
      */
-    public static ClientCredentialsFlow fromParameters(Map<String, String> 
params) {
+    public static TlsClientAuthFlow fromParameters(Map<String, String> params) 
{
         URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL);
-        String privateKeyUrl = parseParameterString(params, 
CONFIG_PARAM_KEY_FILE);
-        // These are optional parameters, so we only perform a get
+        // In mTLS-based providers, caller input for client_id can be optional.
+        // Keep sending client_id in token request for RFC compatibility by 
applying a default value.
+        String clientId = params.getOrDefault(CONFIG_PARAM_CLIENT_ID, 
DEFAULT_CLIENT_ID);
+        String certFile = parseParameterString(params, CONFIG_PARAM_CERT_FILE);
+        String keyFile = parseParameterString(params, 
CONFIG_PARAM_TLS_KEY_FILE);
+        // These are optional parameters, so we allow null values
         String scope = params.get(CONFIG_PARAM_SCOPE);
         String audience = params.get(CONFIG_PARAM_AUDIENCE);
         Duration connectTimeout = parseParameterDuration(params, 
CONFIG_PARAM_CONNECT_TIMEOUT);
         Duration readTimeout = parseParameterDuration(params, 
CONFIG_PARAM_READ_TIMEOUT);
         String trustCertsFilePath = 
params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH);
         String wellKnownMetadataPath = 
params.get(CONFIG_PARAM_WELL_KNOWN_METADATA_PATH);
+        Duration autoCertRefreshDuration = parseParameterDuration(params, 
CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION);
 
-        return ClientCredentialsFlow.builder()
+        return TlsClientAuthFlow.builder()
                 .issuerUrl(issuerUrl)
+                .clientId(clientId)
+                .certFile(certFile)
+                .keyFile(keyFile)
                 .audience(audience)
-                .privateKey(privateKeyUrl)
                 .scope(scope)
                 .connectTimeout(connectTimeout)
                 .readTimeout(readTimeout)
                 .trustCertsFilePath(trustCertsFilePath)
                 .wellKnownMetadataPath(wellKnownMetadataPath)
+                .autoCertRefreshDuration(autoCertRefreshDuration)
                 .build();
     }
 
-    /**
-     * Loads the private key from the given URL.
-     *
-     * @param privateKeyURL
-     * @return
-     * @throws IOException
-     */
-    private static KeyFile loadPrivateKey(String privateKeyURL) throws 
IOException {
-        try {
-            URLConnection urlConnection = new 
org.apache.pulsar.client.api.url.URL(privateKeyURL).openConnection();
-            try {
-                String protocol = urlConnection.getURL().getProtocol();
-                String contentType = urlConnection.getContentType();
-                if ("data".equals(protocol) && 
!"application/json".equals(contentType)) {
-                    throw new IllegalArgumentException(
-                            "Unsupported media type or encoding format: " + 
urlConnection.getContentType());
-                }
-                KeyFile privateKey;
-                try (Reader r = new InputStreamReader((InputStream) 
urlConnection.getContent(),
-                        StandardCharsets.UTF_8)) {
-                    privateKey = KeyFile.fromJson(r);
-                }
-                return privateKey;
-            } finally {
-                IOUtils.close(urlConnection);
-            }
-        } catch (URISyntaxException | InstantiationException | 
IllegalAccessException e) {
-            throw new IOException("Invalid privateKey format", e);
-        }
-    }
-
     @Override
     public void initialize() throws PulsarClientException {
         super.initialize();
         assert this.metadata != null;
 
         URL tokenUrl = this.metadata.getTokenEndpoint();
-        this.exchanger = new TokenClient(tokenUrl, getHttpClient());
+        this.exchanger = new TokenClient(tokenUrl, httpClient);
+
         initialized = true;
     }
 
     public TokenResult authenticate() throws PulsarClientException {
-        // read the private key from storage
-        KeyFile keyFile;
-        try {
-            keyFile = loadPrivateKey(this.privateKey);
-        } catch (IOException e) {
-            throw new PulsarClientException.AuthenticationException("Unable to 
read private key: " + e.getMessage());
-        }
-
-        // request an access token using client credentials
+        // request an access token using TLS client authentication
         ClientCredentialsExchangeRequest req = 
ClientCredentialsExchangeRequest.builder()
-                .clientId(keyFile.getClientId())
-                .clientSecret(keyFile.getClientSecret())
+                .clientId(this.clientId)
                 .audience(this.audience)
                 .scope(this.scope)
+                .authMethod(TokenEndpointAuthMethod.TLS_CLIENT_AUTH)
                 .build();
         TokenResult tr;
         if (!initialized) {
@@ -179,4 +147,9 @@ class ClientCredentialsFlow extends FlowBase {
             exchanger.close();
         }
     }
-}
+
+    @VisibleForTesting
+    String getClientId() {
+        return clientId;
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
index da026bce47e..de36da5ad5b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
@@ -42,4 +42,7 @@ public class ClientCredentialsExchangeRequest {
 
     @JsonProperty("scope")
     private String scope;
+
+    @JsonProperty("token_endpoint_auth_method")
+    private TokenEndpointAuthMethod authMethod;
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
index 6eee7847535..10a587ddbca 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
@@ -56,10 +56,15 @@ public class TokenClient implements 
ClientCredentialsExchanger {
      * @return Generate the final request body from a map.
      */
     String buildClientCredentialsBody(ClientCredentialsExchangeRequest req) {
+        TokenEndpointAuthMethod authMethod = req.getAuthMethod() == null
+                ? TokenEndpointAuthMethod.CLIENT_SECRET_POST
+                : req.getAuthMethod();
         Map<String, String> bodyMap = new TreeMap<>();
         bodyMap.put("grant_type", "client_credentials");
         bodyMap.put("client_id", req.getClientId());
-        bodyMap.put("client_secret", req.getClientSecret());
+        if (authMethod == TokenEndpointAuthMethod.CLIENT_SECRET_POST) {
+            bodyMap.put("client_secret", req.getClientSecret());
+        }
         // Only set audience and scope if they are non-empty.
         if (!StringUtils.isBlank(req.getAudience())) {
             bodyMap.put("audience", req.getAudience());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenEndpointAuthMethod.java
similarity index 58%
copy from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
copy to 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenEndpointAuthMethod.java
index da026bce47e..104a64bd756 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenEndpointAuthMethod.java
@@ -18,28 +18,26 @@
  */
 package org.apache.pulsar.client.impl.auth.oauth2.protocol;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
-import lombok.Builder;
-import lombok.Data;
+public enum TokenEndpointAuthMethod {
+    CLIENT_SECRET_POST("client_secret_post"),
+    TLS_CLIENT_AUTH("tls_client_auth");
 
-/**
- * A token request based on the exchange of client credentials.
- *
- * @see <a href="https://tools.ietf.org/html/rfc6749#section-4.4";>OAuth 2.0 
RFC 6749, section 4.4</a>
- */
-@Data
-@Builder
-public class ClientCredentialsExchangeRequest {
-
-    @JsonProperty("client_id")
-    private String clientId;
+    private final String value;
 
-    @JsonProperty("client_secret")
-    private String clientSecret;
+    TokenEndpointAuthMethod(String value) {
+        this.value = value;
+    }
 
-    @JsonProperty("audience")
-    private String audience;
+    public String value() {
+        return value;
+    }
 
-    @JsonProperty("scope")
-    private String scope;
-}
+    public static TokenEndpointAuthMethod fromValue(String value) {
+        for (TokenEndpointAuthMethod method : values()) {
+            if (method.value.equalsIgnoreCase(value)) {
+                return method;
+            }
+        }
+        throw new IllegalArgumentException("Unsupported token endpoint auth 
method: " + value);
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java
index f76fee6e10d..ea675af9974 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java
@@ -18,11 +18,14 @@
  */
 package org.apache.pulsar.client.impl.auth.oauth2;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import java.io.IOException;
 import java.net.URL;
 import java.time.Duration;
 import org.apache.pulsar.client.api.Authentication;
+import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenEndpointAuthMethod;
 import org.testng.annotations.Test;
 
 public class AuthenticationFactoryOAuth2Test {
@@ -36,17 +39,56 @@ public class AuthenticationFactoryOAuth2Test {
         Duration connectTimeout = Duration.parse("PT11S");
         Duration readTimeout = Duration.ofSeconds(31);
         String trustCertsFilePath = null;
+        String tlsCertFile = "";
+        String tlsKeyFile = "";
         String wellKnownMetadataPath = "/.well-known/custom-path";
         try (Authentication authentication =
                      
AuthenticationFactoryOAuth2.clientCredentialsBuilder().issuerUrl(issuerUrl)
                              
.credentialsUrl(credentialsUrl).audience(audience).scope(scope)
                              
.connectTimeout(connectTimeout).readTimeout(readTimeout)
-                             .trustCertsFilePath(trustCertsFilePath)
-                             
.wellKnownMetadataPath(wellKnownMetadataPath).build()) {
+                             
.trustCertsFilePath(trustCertsFilePath).tlsCertFile(tlsCertFile)
+                             
.tlsKeyFile(tlsKeyFile).wellKnownMetadataPath(wellKnownMetadataPath).build()) {
             assertTrue(authentication instanceof AuthenticationOAuth2);
+            assertEquals(((AuthenticationOAuth2) 
authentication).flow.getClass(), ClientCredentialsFlow.class);
         }
     }
 
+    @Test
+    public void testBuilderWithTlsClientAuthFlow() throws Exception {
+        URL issuerUrl = new URL("http://localhost";);
+        String clientId = "test-client";
+        String tlsCertFile = "/path/to/cert.pem";
+        String tlsKeyFile = "/path/to/key.pem";
+        String audience = "audience";
+        String scope = "scope";
+        Duration autoCertRefreshDuration = Duration.ofSeconds(123);
+        OAuth2MockHttpClient.withMockedSslFactory(() -> {
+            try (Authentication authentication =
+                         
AuthenticationFactoryOAuth2.clientCredentialsBuilder().issuerUrl(issuerUrl)
+                                 
.tokenEndpointAuthMethod(TokenEndpointAuthMethod.TLS_CLIENT_AUTH)
+                                 .clientId(clientId)
+                                 .tlsCertFile(tlsCertFile)
+                                 .tlsKeyFile(tlsKeyFile)
+                                 .audience(audience)
+                                 .scope(scope)
+                                 
.autoCertRefreshDuration(autoCertRefreshDuration)
+                                 .build()) {
+                assertTrue(authentication instanceof AuthenticationOAuth2);
+                assertEquals(((AuthenticationOAuth2) 
authentication).flow.getClass(), TlsClientAuthFlow.class);
+            }
+        });
+    }
+
+    @Test
+    public void testBuilderWithTlsClientAuthMissingCertOrKey() throws 
IOException {
+        URL issuerUrl = new URL("http://localhost";);
+        assertThrows(IllegalArgumentException.class, () ->
+                AuthenticationFactoryOAuth2.clientCredentialsBuilder()
+                        .issuerUrl(issuerUrl)
+                        
.tokenEndpointAuthMethod(TokenEndpointAuthMethod.TLS_CLIENT_AUTH)
+                        .build());
+    }
+
     @Test
     public void testStandardAuthzServerBuilder() throws IOException {
         URL issuerUrl = new URL("http://localhost";);
@@ -60,6 +102,7 @@ public class AuthenticationFactoryOAuth2Test {
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void testClientCredentials() throws IOException {
         URL issuerUrl = new URL("http://localhost";);
@@ -71,4 +114,4 @@ public class AuthenticationFactoryOAuth2Test {
         }
     }
 
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
index 46dbe41b48e..6b956957d30 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
@@ -41,7 +41,9 @@ import lombok.Cleanup;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver;
+import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenEndpointAuthMethod;
 import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -63,6 +65,13 @@ public class AuthenticationOAuth2Test {
         this.auth = new AuthenticationOAuth2(flow, this.clock, 1, null);
     }
 
+    @AfterMethod(alwaysRun = true)
+    public void after() throws Exception {
+        if (this.auth != null) {
+            this.auth.close();
+        }
+    }
+
     @Test
     public void testGetAuthMethodName() {
         assertEquals(this.auth.getAuthMethodName(), "token");
@@ -110,6 +119,42 @@ public class AuthenticationOAuth2Test {
         assertNotNull(this.auth.flow);
     }
 
+    @Test
+    public void testConfigureWithTlsClientAuth() throws Exception {
+        Map<String, String> params = new HashMap<>();
+        params.put("type", "client_credentials");
+        
params.put(AuthenticationOAuth2.CONFIG_PARAM_TOKEN_ENDPOINT_AUTH_METHOD,
+                TokenEndpointAuthMethod.TLS_CLIENT_AUTH.value());
+        params.put("clientId", "test-client");
+        params.put("tlsCertFile", "/path/to/cert.pem");
+        params.put("tlsKeyFile", "/path/to/key.pem");
+        params.put("issuerUrl", "http://localhost";);
+        ObjectMapper mapper = new ObjectMapper();
+        String authParams = mapper.writeValueAsString(params);
+        OAuth2MockHttpClient.withMockedSslFactory(() -> {
+            this.auth.configure(authParams);
+            assertNotNull(this.auth.flow);
+            assertEquals(this.auth.flow.getClass(), TlsClientAuthFlow.class);
+        });
+    }
+
+    @Test
+    public void testConfigureCredentialsWithTlsValues() throws Exception {
+        Map<String, String> params = new HashMap<>();
+        params.put("type", "client_credentials");
+        params.put("privateKey", "data:base64,e30=");
+        params.put("tlsCertFile", "/path/to/cert.pem");
+        params.put("tlsKeyFile", "/path/to/key.pem");
+        params.put("issuerUrl", "http://localhost";);
+        ObjectMapper mapper = new ObjectMapper();
+        String authParams = mapper.writeValueAsString(params);
+        OAuth2MockHttpClient.withMockedSslFactory(() -> {
+            this.auth.configure(authParams);
+            assertNotNull(this.auth.flow);
+            assertEquals(this.auth.flow.getClass(), 
ClientCredentialsFlow.class);
+        });
+    }
+
     // ----- configure() via default constructor -----
 
     @Test
@@ -360,4 +405,4 @@ public class AuthenticationOAuth2Test {
         verify(this.flow).close();
         assertThrows(PulsarClientException.AlreadyClosedException.class, () -> 
this.auth.getAuthData());
     }
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/OAuth2MockHttpClient.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/OAuth2MockHttpClient.java
new file mode 100644
index 00000000000..7a0bb0663e1
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/OAuth2MockHttpClient.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mockConstruction;
+import org.apache.pulsar.common.util.DefaultPulsarSslFactory;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.mockito.MockedConstruction;
+
+final class OAuth2MockHttpClient {
+
+    private OAuth2MockHttpClient() {
+    }
+
+    @FunctionalInterface
+    interface ThrowingRunnable {
+        void run() throws Exception;
+    }
+
+    static void withMockedSslFactory(ThrowingRunnable runnable) throws 
Exception {
+        try (MockedConstruction<DefaultPulsarSslFactory> ignoredSslFactory =
+                     mockConstruction(DefaultPulsarSslFactory.class, (mock, 
context) -> {
+                         doNothing().when(mock).initialize(any());
+                         doNothing().when(mock).createInternalSslContext();
+                     });
+             MockedConstruction<DefaultAsyncHttpClient> ignoredHttpClient =
+                     mockConstruction(DefaultAsyncHttpClient.class)) {
+            runnable.run();
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlowTest.java
similarity index 50%
copy from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
copy to 
pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlowTest.java
index da026bce47e..203a0f6f15c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlowTest.java
@@ -16,30 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+package org.apache.pulsar.client.impl.auth.oauth2;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
-import lombok.Builder;
-import lombok.Data;
+import static org.testng.Assert.assertEquals;
+import java.util.HashMap;
+import java.util.Map;
+import org.testng.annotations.Test;
 
-/**
- * A token request based on the exchange of client credentials.
- *
- * @see <a href="https://tools.ietf.org/html/rfc6749#section-4.4";>OAuth 2.0 
RFC 6749, section 4.4</a>
- */
-@Data
-@Builder
-public class ClientCredentialsExchangeRequest {
-
-    @JsonProperty("client_id")
-    private String clientId;
-
-    @JsonProperty("client_secret")
-    private String clientSecret;
-
-    @JsonProperty("audience")
-    private String audience;
+public class TlsClientAuthFlowTest {
 
-    @JsonProperty("scope")
-    private String scope;
-}
+    @Test
+    public void testFromParametersWithoutClientId() throws Exception {
+        Map<String, String> params = new HashMap<>();
+        params.put("tlsCertFile", "/path/to/cert.pem");
+        params.put("tlsKeyFile", "/path/to/key.pem");
+        params.put("issuerUrl", "http://localhost";);
+        params.put("scope", "http://localhost";);
+        OAuth2MockHttpClient.withMockedSslFactory(() -> {
+            TlsClientAuthFlow flow = TlsClientAuthFlow.fromParameters(params);
+            assertEquals(flow.getClientId(), "pulsar-client");
+            flow.close();
+        });
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
index 13142768207..0992eef0409 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl.auth.oauth2.protocol;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertNotNull;
@@ -48,8 +49,15 @@ public class TokenClientTest {
                 .clientId("test-client-id")
                 .clientSecret("test-client-secret")
                 .scope("test-scope")
+                .authMethod(TokenEndpointAuthMethod.CLIENT_SECRET_POST)
                 .build();
         String body = tokenClient.buildClientCredentialsBody(request);
+        assertThat(body)
+                .contains("grant_type=")
+                .contains("client_id=")
+                .contains("client_secret=")
+                .contains("audience=")
+                .contains("scope=");
         BoundRequestBuilder boundRequestBuilder = 
mock(BoundRequestBuilder.class);
         Response response = mock(Response.class);
         ListenableFuture<Response> listenableFuture = 
mock(ListenableFuture.class);
@@ -81,6 +89,10 @@ public class TokenClientTest {
                 .clientSecret("test-client-secret")
                 .build();
         String body = tokenClient.buildClientCredentialsBody(request);
+        assertThat(body)
+                .contains("grant_type=")
+                .contains("client_id=")
+                .contains("client_secret=");
         BoundRequestBuilder boundRequestBuilder = 
mock(BoundRequestBuilder.class);
         Response response = mock(Response.class);
         ListenableFuture<Response> listenableFuture = 
mock(ListenableFuture.class);
@@ -99,4 +111,78 @@ public class TokenClientTest {
         TokenResult tr = tokenClient.exchangeClientCredentials(request);
         assertNotNull(tr);
     }
-}
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void exchangeTlsClientAuthSuccessTest() throws
+            IOException, TokenExchangeException, ExecutionException, 
InterruptedException {
+        DefaultAsyncHttpClient defaultAsyncHttpClient = 
mock(DefaultAsyncHttpClient.class);
+        URL url = new URL("http://localhost";);
+        TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient);
+        ClientCredentialsExchangeRequest request = 
ClientCredentialsExchangeRequest.builder()
+                .clientId("test-client-id")
+                .audience("test-audience")
+                .scope("test-scope")
+                .authMethod(TokenEndpointAuthMethod.TLS_CLIENT_AUTH)
+                .build();
+        String body = tokenClient.buildClientCredentialsBody(request);
+        assertThat(body)
+                .contains("grant_type=")
+                .contains("client_id=")
+                .contains("audience=")
+                .contains("scope=")
+                .doesNotContain("client_secret=");
+        BoundRequestBuilder boundRequestBuilder = 
mock(BoundRequestBuilder.class);
+        Response response = mock(Response.class);
+        ListenableFuture<Response> listenableFuture = 
mock(ListenableFuture.class);
+        
when(defaultAsyncHttpClient.preparePost(url.toString())).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.setHeader("Accept", 
"application/json")).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.setHeader("Content-Type",
+                
"application/x-www-form-urlencoded")).thenReturn(boundRequestBuilder);
+        
when(boundRequestBuilder.setBody(body)).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.execute()).thenReturn(listenableFuture);
+        when(listenableFuture.get()).thenReturn(response);
+        when(response.getStatusCode()).thenReturn(200);
+        TokenResult tokenResult = new TokenResult();
+        tokenResult.setAccessToken("test-access-token");
+        tokenResult.setIdToken("test-id");
+        when(response.getResponseBodyAsBytes()).thenReturn(new 
Gson().toJson(tokenResult).getBytes());
+        TokenResult tr = tokenClient.exchangeClientCredentials(request);
+        assertNotNull(tr);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void exchangeTlsClientAuthSuccessWithoutOptionalParamsTest() throws
+            IOException, TokenExchangeException, ExecutionException, 
InterruptedException {
+        DefaultAsyncHttpClient defaultAsyncHttpClient = 
mock(DefaultAsyncHttpClient.class);
+        URL url = new URL("http://localhost";);
+        TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient);
+        ClientCredentialsExchangeRequest request = 
ClientCredentialsExchangeRequest.builder()
+                .clientId("test-client-id")
+                .authMethod(TokenEndpointAuthMethod.TLS_CLIENT_AUTH)
+                .build();
+        String body = tokenClient.buildClientCredentialsBody(request);
+        assertThat(body)
+                .contains("grant_type=")
+                .contains("client_id=")
+                .doesNotContain("client_secret=");
+        BoundRequestBuilder boundRequestBuilder = 
mock(BoundRequestBuilder.class);
+        Response response = mock(Response.class);
+        ListenableFuture<Response> listenableFuture = 
mock(ListenableFuture.class);
+        
when(defaultAsyncHttpClient.preparePost(url.toString())).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.setHeader("Accept", 
"application/json")).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.setHeader("Content-Type",
+                
"application/x-www-form-urlencoded")).thenReturn(boundRequestBuilder);
+        
when(boundRequestBuilder.setBody(body)).thenReturn(boundRequestBuilder);
+        when(boundRequestBuilder.execute()).thenReturn(listenableFuture);
+        when(listenableFuture.get()).thenReturn(response);
+        when(response.getStatusCode()).thenReturn(200);
+        TokenResult tokenResult = new TokenResult();
+        tokenResult.setAccessToken("test-access-token");
+        tokenResult.setIdToken("test-id");
+        when(response.getResponseBodyAsBytes()).thenReturn(new 
Gson().toJson(tokenResult).getBytes());
+        TokenResult tr = tokenClient.exchangeClientCredentials(request);
+        assertNotNull(tr);
+    }
+}
\ No newline at end of file

Reply via email to