This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 27bb3865eb3 [improve][client] Enable configurable preemptive OAuth2 
token refresh (#25363)
27bb3865eb3 is described below

commit 27bb3865eb38a6451180f4682393117b8f9be92f
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Mar 21 18:01:56 2026 +0200

    [improve][client] Enable configurable preemptive OAuth2 token refresh 
(#25363)
    
    Co-authored-by: Michael Marshall <[email protected]>
    Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
 .../auth/oauth2/AuthenticationFactoryOAuth2.java   |  51 ++++-
 .../impl/auth/oauth2/AuthenticationOAuth2.java     | 252 +++++++++++++++++++--
 .../AuthenticationOAuth2StandardAuthzServer.java   |  26 +--
 .../impl/auth/oauth2/ClientCredentialsFlow.java    |   2 +
 .../impl/auth/oauth2/AuthenticationOAuth2Test.java | 186 ++++++++++++++-
 5 files changed, 469 insertions(+), 48 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
index 7c89c6cde6d..b035b02437b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
@@ -19,14 +19,17 @@
 package org.apache.pulsar.client.impl.auth.oauth2;
 
 import java.net.URL;
-import java.time.Clock;
 import java.time.Duration;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.pulsar.client.api.Authentication;
 import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver;
 
 /**
  * Factory class that allows to create {@link Authentication} instances
  * for OAuth 2.0 authentication methods.
+ *
+ * <p>Use {@link #clientCredentialsBuilder()} to build an {@link 
Authentication} object
+ * for the client credentials flow, with optional early token refresh support.
  */
 public final class AuthenticationFactoryOAuth2 {
 
@@ -37,7 +40,9 @@ public final class AuthenticationFactoryOAuth2 {
      * @param credentialsUrl the credentials URL
      * @param audience       An optional field. The audience identifier used 
by some Identity Providers, like Auth0.
      * @return an Authentication object
+     * @deprecated use {@link #clientCredentialsBuilder()}, instead.
      */
+    @Deprecated
     public static Authentication clientCredentials(URL issuerUrl, URL 
credentialsUrl, String audience) {
         return clientCredentials(issuerUrl, credentialsUrl, audience, null);
     }
@@ -55,7 +60,9 @@ public final class AuthenticationFactoryOAuth2 {
      *                       and each string adds an additional access range 
to the requested scope.
      *                       From here: 
https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
      * @return an Authentication object
+     * @deprecated use {@link #clientCredentialsBuilder()}, instead.
      */
+    @Deprecated
     public static Authentication clientCredentials(URL issuerUrl, URL 
credentialsUrl, String audience, String scope) {
         return 
clientCredentialsBuilder().issuerUrl(issuerUrl).credentialsUrl(credentialsUrl).audience(audience)
                 .scope(scope).build();
@@ -91,6 +98,8 @@ public final class AuthenticationFactoryOAuth2 {
         private Duration readTimeout;
         private String trustCertsFilePath;
         private String wellKnownMetadataPath;
+        private double earlyTokenRefreshPercent = 
AuthenticationOAuth2.EARLY_TOKEN_REFRESH_PERCENT_DEFAULT;
+        private ScheduledExecutorService scheduler;
 
         private ClientCredentialsBuilder() {
         }
@@ -120,7 +129,7 @@ public final class AuthenticationFactoryOAuth2 {
         /**
          * Optional audience identifier used by some Identity Providers, like 
Auth0.
          *
-         * @param audience the audiance
+         * @param audience the audience
          * @return the builder
          */
         public ClientCredentialsBuilder audience(String audience) {
@@ -188,7 +197,41 @@ public final class AuthenticationFactoryOAuth2 {
         }
 
         /**
-         * Authenticate with client credentials.
+         * The fraction of the token's {@code expires_in} time at which the 
client starts attempting
+         * a background refresh. Must be greater than 0. Values &ge; 1 disable 
early refresh (the default).
+         *
+         * <p>For example, {@code 0.8} means the client will attempt to 
refresh after 80% of the
+         * token lifetime has elapsed, leaving a 20% buffer to tolerate a 
temporary OAuth server
+         * outage while the existing token is still valid. During an outage 
the client keeps retrying
+         * in the background with exponential backoff, continuing to serve 
requests with the current
+         * token until it actually expires.
+         *
+         * @param earlyTokenRefreshPercent fractional value in (0, 1) to 
enable, or &ge; 1 to disable
+         * @return the builder
+         */
+        public ClientCredentialsBuilder earlyTokenRefreshPercent(double 
earlyTokenRefreshPercent) {
+            if (earlyTokenRefreshPercent <= 0) {
+                throw new IllegalArgumentException("earlyTokenRefreshPercent 
must be greater than 0.");
+            }
+            this.earlyTokenRefreshPercent = earlyTokenRefreshPercent;
+            return this;
+        }
+
+        /**
+         * Optional scheduler for background token refresh tasks. If not set 
and early refresh is
+         * enabled, a shared internal daemon-thread scheduler is used 
automatically.
+         * {@link AuthenticationOAuth2} will never shut down a caller-supplied 
scheduler.
+         *
+         * @param scheduler the scheduler to use for background token refresh
+         * @return the builder
+         */
+        public ClientCredentialsBuilder scheduler(ScheduledExecutorService 
scheduler) {
+            this.scheduler = scheduler;
+            return this;
+        }
+
+        /**
+         * Builds the {@link Authentication} object.
          *
          * @return an Authentication object
          */
@@ -203,7 +246,7 @@ public final class AuthenticationFactoryOAuth2 {
                     .trustCertsFilePath(trustCertsFilePath)
                     .wellKnownMetadataPath(wellKnownMetadataPath)
                     .build();
-            return new AuthenticationOAuth2(flow, Clock.systemDefaultZone());
+            return new AuthenticationOAuth2(flow, earlyTokenRefreshPercent, 
scheduler);
         }
 
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
index 694e4681d97..a8a52909136 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
@@ -18,44 +18,134 @@
  */
 package org.apache.pulsar.client.impl.auth.oauth2;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
 import java.time.Clock;
 import java.time.Instant;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.JavaVersion;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.AuthenticationUtil;
 import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
 
 /**
  * Pulsar client authentication provider based on OAuth 2.0.
+ *
+ * The first call to {@link #getAuthData()} will result in a blocking network 
call to retrieve the OAuth2.0 token from
+ * the Identity Provider. After that, there are two behaviors, depending on 
{@link #earlyTokenRefreshPercent}:
+ *
+ * 1. If {@link #earlyTokenRefreshPercent} is less than 1, this authentication 
class will schedule a runnable to refresh
+ * the token in n seconds where n is the result of multiplying {@link 
#earlyTokenRefreshPercent} and the `expires_in`
+ * value returned by the Identity Provider. If the call to the Identity 
Provider fails, this class will retry attempting
+ * to refresh the token using an exponential backoff. If the token is not 
refreshed before it expires, the Pulsar client
+ * will make one final blocking call to the Identity Provider. If that call 
fails, this class will pass the failure to
+ * the Pulsar client. This proactive approach to token management is good for 
use cases that want to avoid latency
+ * spikes from calls to the Identity Provider and that want to be able to 
withstand short Identity Provider outages. The
+ * tradeoff is that this class consumes slightly more resources.
+ *
+ * 2. If {@link #earlyTokenRefreshPercent} is greater than or equal to 1, this 
class will not retrieve a new token until
+ * the {@link #getAuthData()} method is called while the cached token is 
expired. If the call to the Identity Provider
+ * fails, this class will pass the failure to the Pulsar client. This lazy 
approach is good for use cases that are not
+ * latency sensitive and that will not use the token frequently.
+ *
+ * {@link #earlyTokenRefreshPercent} must be greater than 0. It defaults to 1, 
which means that early token refresh is
+ * disabled by default.
+ *
+ * The current implementation of this class can block the calling thread.
+ *
+ * This class is intended to be called from multiple threads, and is therefore 
designed to be thread-safe.
  */
 @Slf4j
 public class AuthenticationOAuth2 implements Authentication, 
EncodedAuthenticationParameterSupport {
 
     public static final String CONFIG_PARAM_TYPE = "type";
+    public static final String CONFIG_PARAM_EARLY_TOKEN_REFRESH_PERCENT = 
"earlyTokenRefreshPercent";
     public static final String TYPE_CLIENT_CREDENTIALS = "client_credentials";
+    public static final int EARLY_TOKEN_REFRESH_PERCENT_DEFAULT = 1; // 
feature disabled by default
     public static final String AUTH_METHOD_NAME = "token";
-    public static final double EXPIRY_ADJUSTMENT = 0.9;
     private static final long serialVersionUID = 1L;
 
+    // Shared executor used when the caller does not supply one. Uses daemon 
threads and scales down to
+    // 0 threads when no instances are actively refreshing tokens.
+    private static final ScheduledExecutorService INTERNAL_SCHEDULER = 
createInternalScheduler();
+
+    private static ScheduledExecutorService createInternalScheduler() {
+        // corePoolSize=0 allows the pool to scale down to zero idle threads 
(Java 9+).
+        // On Java 8 a corePoolSize of 0 means tasks may never execute, so 
fall back to 1.
+        int corePoolSize = 
SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9) ? 0 : 1;
+        return Executors.newScheduledThreadPool(corePoolSize,
+                new DefaultThreadFactory("oauth2-token-refresher", true));
+    }
+
+    private transient ScheduledExecutorService scheduler;
+    double earlyTokenRefreshPercent;
+
     final Clock clock;
-    Flow flow;
-    transient CachedToken cachedToken;
+    volatile Flow flow;
+    private transient volatile CachedToken cachedToken;
+
+    // Only ever updated in synchronized block on class.
+    private boolean isClosed = false;
 
+    // Only ever updated on the single scheduler thread. Do not need to be 
volatile.
+    private transient Backoff backoff;
+    private transient ScheduledFuture<?> nextRefreshAttempt;
+
+    // No args constructor used when creating class with reflection
     public AuthenticationOAuth2() {
-        this.clock = Clock.systemDefaultZone();
+        this(Clock.systemDefaultZone(), EARLY_TOKEN_REFRESH_PERCENT_DEFAULT, 
null);
     }
 
     AuthenticationOAuth2(Flow flow, Clock clock) {
+        this(flow, clock, EARLY_TOKEN_REFRESH_PERCENT_DEFAULT, null);
+    }
+
+    AuthenticationOAuth2(Flow flow,
+                         double earlyTokenRefreshPercent,
+                         ScheduledExecutorService scheduler) {
+        this(flow, Clock.systemDefaultZone(), earlyTokenRefreshPercent, 
scheduler);
+    }
+
+    AuthenticationOAuth2(Flow flow,
+                         Clock clock,
+                         double earlyTokenRefreshPercent,
+                         ScheduledExecutorService scheduler) {
+        this(clock, earlyTokenRefreshPercent, scheduler);
         this.flow = flow;
+    }
+
+    /**
+     * @param clock - clock to use when determining token expiration.
+     * @param earlyTokenRefreshPercent - see javadoc for {@link 
AuthenticationOAuth2}. Must be greater than 0.
+     * @param scheduler - The scheduler to use for background token refreshes. 
If {@code null} and
+     *                  {@link #earlyTokenRefreshPercent} is less than 1, the 
shared internal daemon-thread
+     *                  scheduler is used. If the caller supplies a scheduler, 
this class will not shut it down.
+     */
+    private AuthenticationOAuth2(Clock clock, double earlyTokenRefreshPercent, 
ScheduledExecutorService scheduler) {
+        if (earlyTokenRefreshPercent <= 0) {
+            throw new IllegalArgumentException("EarlyTokenRefreshPercent must 
be greater than 0.");
+        }
+        this.earlyTokenRefreshPercent = earlyTokenRefreshPercent;
         this.clock = clock;
+        if (scheduler == null && earlyTokenRefreshPercent < 1) {
+            this.scheduler = INTERNAL_SCHEDULER;
+        } else {
+            this.scheduler = scheduler;
+        }
     }
 
     @Override
@@ -65,6 +155,16 @@ public class AuthenticationOAuth2 implements 
Authentication, EncodedAuthenticati
 
     @Override
     public void configure(String encodedAuthParamString) {
+        Map<String, String> params = 
parseAuthParameters(encodedAuthParamString);
+        String type = params.getOrDefault(CONFIG_PARAM_TYPE, 
TYPE_CLIENT_CREDENTIALS);
+        if (TYPE_CLIENT_CREDENTIALS.equals(type)) {
+            this.flow = ClientCredentialsFlow.fromParameters(params);
+        } else {
+            throw new IllegalArgumentException("Unsupported authentication 
type: " + type);
+        }
+    }
+
+    protected Map<String, String> parseAuthParameters(String 
encodedAuthParamString) {
         if (StringUtils.isBlank(encodedAuthParamString)) {
             throw new IllegalArgumentException("No authentication parameters 
were provided");
         }
@@ -75,13 +175,44 @@ public class AuthenticationOAuth2 implements 
Authentication, EncodedAuthenticati
             throw new IllegalArgumentException("Malformed authentication 
parameters", e);
         }
 
-        String type = params.getOrDefault(CONFIG_PARAM_TYPE, 
TYPE_CLIENT_CREDENTIALS);
-        switch(type) {
-            case TYPE_CLIENT_CREDENTIALS:
-                this.flow = ClientCredentialsFlow.fromParameters(params);
-                break;
-            default:
-                throw new IllegalArgumentException("Unsupported authentication 
type: " + type);
+        String earlyRefreshPercentStr = 
params.get(CONFIG_PARAM_EARLY_TOKEN_REFRESH_PERCENT);
+        if (earlyRefreshPercentStr != null) {
+            double percent = parseEarlyRefreshPercent(earlyRefreshPercentStr);
+            this.earlyTokenRefreshPercent = percent;
+            if (percent < 1 && this.scheduler == null) {
+                this.scheduler = INTERNAL_SCHEDULER;
+            }
+        }
+        return params;
+    }
+
+    /**
+     * Parses the {@code earlyRefreshPercent} configuration value.
+     *
+     * <p>If the string contains a decimal point it is interpreted as a 
fractional value in [0, 1]
+     * and used directly (e.g. {@code "0.8"} → 0.8). Otherwise the string is 
treated as an integer
+     * percentage and divided by 100 (e.g. {@code "80"} → 0.8, {@code "100"} → 
1.0).
+     *
+     * @param value the raw string from the configuration map
+     * @return the resolved fractional percent, must be &gt; 0
+     * @throws IllegalArgumentException if the value cannot be parsed or is ≤ 0
+     */
+    static double parseEarlyRefreshPercent(String value) {
+        try {
+            double percent;
+            if (value.contains(".")) {
+                percent = Double.parseDouble(value);
+            } else {
+                percent = Integer.parseInt(value) / 100.0;
+            }
+            if (percent <= 0) {
+                throw new IllegalArgumentException(
+                        CONFIG_PARAM_EARLY_TOKEN_REFRESH_PERCENT + " must be 
greater than 0, got: " + value);
+            }
+            return percent;
+        } catch (NumberFormatException e) {
+            throw new IllegalArgumentException(
+                    "Malformed configuration parameter: " + 
CONFIG_PARAM_EARLY_TOKEN_REFRESH_PERCENT, e);
         }
     }
 
@@ -96,21 +227,109 @@ public class AuthenticationOAuth2 implements 
Authentication, EncodedAuthenticati
         flow.initialize();
     }
 
+    /**
+     * The first time that this method is called, it retrieves a token. All 
subsequent
+     * calls should get a cached value. However, if there is an issue with the 
Identity
+     * Provider, there is a chance that the background thread responsible for 
keeping
+     * the refresh token hot will
+     * @return The authentication data identifying this client that will be 
sent to the broker
+     * @throws PulsarClientException
+     */
     @Override
     public synchronized AuthenticationDataProvider getAuthData() throws 
PulsarClientException {
+        if (isClosed) {
+            throw new 
PulsarClientException.AlreadyClosedException("Authentication already closed.");
+        }
         if (this.cachedToken == null || this.cachedToken.isExpired()) {
-            TokenResult tr = this.flow.authenticate();
-            this.cachedToken = new CachedToken(tr);
+            this.authenticate();
         }
         return this.cachedToken.getAuthData();
     }
 
+    /**
+     * Retrieve the token (synchronously), and then schedule refresh runnable.
+     */
+    private void authenticate() throws PulsarClientException {
+        if (log.isDebugEnabled()) {
+            log.debug("Attempting to retrieve OAuth2 token now.");
+        }
+        TokenResult tr = this.flow.authenticate();
+        this.cachedToken = new CachedToken(tr);
+        handleSuccessfulTokenRefresh();
+    }
+
+    /**
+     * When we successfully get a token, we need to schedule the next attempt 
to refresh it.
+     * This is done completely based on the "expires_in" value returned by the 
identity provider.
+     * The code is run on the single scheduler thread in order to ensure that 
the backoff and the nextRefreshAttempt are
+     * updated safely.
+     */
+    private void handleSuccessfulTokenRefresh() {
+        if (scheduler != null && earlyTokenRefreshPercent < 1) {
+            scheduler.execute(() -> {
+                  backoff = buildBackoff(cachedToken.latest.getExpiresIn());
+                  long expiresInMillis = 
TimeUnit.SECONDS.toMillis(cachedToken.latest.getExpiresIn());
+                  scheduleRefresh((long) (expiresInMillis * 
earlyTokenRefreshPercent));
+            });
+        }
+    }
+
+    /**
+     * Attempt to refresh the token. If successful, schedule the next refresh 
task according to the
+     * {@link #earlyTokenRefreshPercent}. If failed, schedule another attempt 
to refresh the token according to the
+     * {@link #backoff} policy.
+     */
+    private void refreshToken() {
+        try {
+            this.authenticate();
+        } catch (PulsarClientException | RuntimeException e) {
+            long delayMillis = backoff.next();
+            log.error("Error refreshing token. Will retry in {} millis.", 
delayMillis, e);
+            scheduleRefresh(delayMillis);
+        }
+    }
+
+    /**
+     * Schedule the task to refresh the token.
+     * NOTE: this method must be run on the {@link #scheduler} thread in order 
to ensure {@link #nextRefreshAttempt}
+     * is accessed and updated safely.
+     * @param delayMillis the time, in milliseconds, to wait before starting 
to attempt to refresh the token.
+     */
+    private void scheduleRefresh(long delayMillis) {
+        nextRefreshAttempt = scheduler.schedule(this::refreshToken, 
delayMillis, TimeUnit.MILLISECONDS);
+    }
+
+    private Backoff buildBackoff(int expiresInSeconds) {
+        return new BackoffBuilder()
+                .setInitialTime(1, TimeUnit.SECONDS)
+                .setMax(10, TimeUnit.MINUTES)
+                // Attempt a final token refresh attempt 2 seconds before the 
token actually expires, if necessary.
+                .setMandatoryStop(Math.max(0, expiresInSeconds - 2), 
TimeUnit.SECONDS)
+                .create();
+    }
+
     @Override
-    public void close() throws IOException {
+    public synchronized void close() throws IOException {
         try {
-            flow.close();
+            isClosed = true;
+            if (flow != null) {
+                flow.close();
+            }
         } catch (Exception e) {
             throw new IOException(e);
+        } finally {
+            if (scheduler != null) {
+                // Cancel all subsequent refresh attempts by canceling the 
next token refresh attempt. By running
+                // this command on the single scheduler thread, we remove the 
chance for a race condition that could
+                // allow a currently executing refresh attempt to schedule 
another refresh attempt.
+                // We never shut down the scheduler here because either it is 
the shared INTERNAL_SCHEDULER,
+                // or it was provided by the caller who manages its own 
lifecycle.
+                scheduler.execute(() -> {
+                    if (nextRefreshAttempt != null) {
+                        nextRefreshAttempt.cancel(false);
+                    }
+                });
+            }
         }
     }
 
@@ -122,8 +341,7 @@ public class AuthenticationOAuth2 implements 
Authentication, EncodedAuthenticati
 
         public CachedToken(TokenResult latest) {
             this.latest = latest;
-            int adjustedExpiresIn = (int) (latest.getExpiresIn() * 
EXPIRY_ADJUSTMENT);
-            this.expiresAt = 
AuthenticationOAuth2.this.clock.instant().plusSeconds(adjustedExpiresIn);
+            this.expiresAt = 
AuthenticationOAuth2.this.clock.instant().plusSeconds(latest.getExpiresIn());
             this.authData = new 
AuthenticationDataOAuth2(latest.getAccessToken());
         }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2StandardAuthzServer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2StandardAuthzServer.java
index c61d6d7b097..4afd0d15ea4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2StandardAuthzServer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2StandardAuthzServer.java
@@ -18,11 +18,8 @@
  */
 package org.apache.pulsar.client.impl.auth.oauth2;
 
-import java.io.IOException;
 import java.time.Clock;
 import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.impl.AuthenticationUtil;
 import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver;
 
 /**
@@ -43,28 +40,11 @@ public class AuthenticationOAuth2StandardAuthzServer 
extends AuthenticationOAuth
     }
 
     @Override
-    public void configure(String encodedAuthParamString) {
-        if (StringUtils.isBlank(encodedAuthParamString)) {
-            throw new IllegalArgumentException("No authentication parameters 
were provided");
-        }
-        Map<String, String> params;
-        try {
-            params = 
AuthenticationUtil.configureFromJsonString(encodedAuthParamString);
-        } catch (IOException e) {
-            throw new IllegalArgumentException("Malformed authentication 
parameters", e);
-        }
-
+    protected Map<String, String> parseAuthParameters(String 
encodedAuthParamString) {
+        Map<String, String> params = 
super.parseAuthParameters(encodedAuthParamString);
         // Always set the OAuth 2.0 standard metadata path
         params.put(FlowBase.CONFIG_PARAM_WELL_KNOWN_METADATA_PATH,
                 DefaultMetadataResolver.OAUTH_WELL_KNOWN_METADATA_PATH);
-
-        String type = params.getOrDefault(CONFIG_PARAM_TYPE, 
TYPE_CLIENT_CREDENTIALS);
-        switch(type) {
-            case TYPE_CLIENT_CREDENTIALS:
-                this.flow = ClientCredentialsFlow.fromParameters(params);
-                break;
-            default:
-                throw new IllegalArgumentException("Unsupported authentication 
type: " + type);
-        }
+        return params;
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
index f3c85ca5cdd..d841010add8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
@@ -47,6 +47,7 @@ import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
 class ClientCredentialsFlow extends FlowBase {
     public static final String CONFIG_PARAM_ISSUER_URL = "issuerUrl";
     public static final String CONFIG_PARAM_AUDIENCE = "audience";
+    // Maps to the keyFileUrl
     public static final String CONFIG_PARAM_KEY_FILE = "privateKey";
     public static final String CONFIG_PARAM_SCOPE = "scope";
 
@@ -70,6 +71,7 @@ class ClientCredentialsFlow extends FlowBase {
         this.scope = scope;
     }
 
+
     /**
      * Constructs a {@link ClientCredentialsFlow} from configuration 
parameters.
      *
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
index d430d8f0e40..46dbe41b48e 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
@@ -18,12 +18,15 @@
  */
 package org.apache.pulsar.client.impl.auth.oauth2;
 
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertThrows;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.net.MalformedURLException;
 import java.net.URI;
@@ -33,7 +36,10 @@ import java.time.Instant;
 import java.time.ZoneOffset;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.Cleanup;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver;
 import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
 import org.testng.annotations.BeforeMethod;
@@ -54,7 +60,7 @@ public class AuthenticationOAuth2Test {
     public void before() {
         this.clock = new MockClock(Instant.EPOCH, ZoneOffset.UTC);
         this.flow = mock(Flow.class);
-        this.auth = new AuthenticationOAuth2(flow, this.clock);
+        this.auth = new AuthenticationOAuth2(flow, this.clock, 1, null);
     }
 
     @Test
@@ -104,6 +110,120 @@ public class AuthenticationOAuth2Test {
         assertNotNull(this.auth.flow);
     }
 
+    // ----- configure() via default constructor -----
+
+    @Test
+    public void testConfigureViaDefaultConstructorSetsFlow() throws Exception {
+        AuthenticationOAuth2 auth = new AuthenticationOAuth2();
+        auth.configure(minimalCredentialsJson());
+        assertNotNull(auth.flow);
+    }
+
+    @Test
+    public void 
testConfigureViaDefaultConstructorDefaultEarlyRefreshIsDisabled() throws 
Exception {
+        AuthenticationOAuth2 auth = new AuthenticationOAuth2();
+        auth.configure(minimalCredentialsJson());
+        assertEquals(auth.earlyTokenRefreshPercent, (double) 
AuthenticationOAuth2.EARLY_TOKEN_REFRESH_PERCENT_DEFAULT);
+    }
+
+    @Test
+    public void testConfigureViaDefaultConstructorEarlyRefreshDecimal() throws 
Exception {
+        AuthenticationOAuth2 auth = new AuthenticationOAuth2();
+        auth.configure(credentialsJsonWithEarlyRefresh("0.8"));
+        assertEquals(auth.earlyTokenRefreshPercent, 0.8);
+    }
+
+    @Test
+    public void testConfigureViaDefaultConstructorEarlyRefreshInteger() throws 
Exception {
+        AuthenticationOAuth2 auth = new AuthenticationOAuth2();
+        auth.configure(credentialsJsonWithEarlyRefresh("80"));
+        assertEquals(auth.earlyTokenRefreshPercent, 0.8);
+    }
+
+    @Test
+    public void 
testConfigureViaDefaultConstructorEarlyRefreshIntegerDisabled() throws 
Exception {
+        // Integer 100 → 1.0, meaning disabled
+        AuthenticationOAuth2 auth = new AuthenticationOAuth2();
+        auth.configure(credentialsJsonWithEarlyRefresh("100"));
+        assertEquals(auth.earlyTokenRefreshPercent, 1.0);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConfigureViaDefaultConstructorEarlyRefreshZero() throws 
Exception {
+        AuthenticationOAuth2 auth = new AuthenticationOAuth2();
+        auth.configure(credentialsJsonWithEarlyRefresh("0"));
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConfigureViaDefaultConstructorEarlyRefreshInvalid() throws 
Exception {
+        AuthenticationOAuth2 auth = new AuthenticationOAuth2();
+        auth.configure(credentialsJsonWithEarlyRefresh("not-a-number"));
+    }
+
+    @Test
+    public void 
testConfigureViaDefaultConstructorEarlyRefreshTriggersBackgroundRefresh() 
throws Exception {
+        // Verify that when early refresh is enabled via configure(), the 
background scheduler
+        // is activated and actually calls authenticate() before token expiry.
+        AuthenticationOAuth2 auth = new AuthenticationOAuth2();
+        auth.configure(credentialsJsonWithEarlyRefresh("10")); // 10% → 
refresh at 100ms of a 1s token
+        // Replace the flow with a mock after configure() has set it up
+        auth.flow = this.flow;
+        TokenResult tr = 
TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(1).build();
+        doReturn(tr).when(this.flow).authenticate();
+
+        auth.getAuthData();
+        // Give the background scheduler time to trigger at least one 
additional refresh
+        Thread.sleep(500);
+        auth.close();
+        verify(this.flow, atLeast(2)).authenticate();
+    }
+
+    // ----- parseEarlyRefreshPercent -----
+
+    @Test
+    public void testParseEarlyRefreshPercentDecimal() {
+        assertEquals(AuthenticationOAuth2.parseEarlyRefreshPercent("0.8"), 
0.8);
+        assertEquals(AuthenticationOAuth2.parseEarlyRefreshPercent("0.5"), 
0.5);
+        assertEquals(AuthenticationOAuth2.parseEarlyRefreshPercent("1.0"), 
1.0);
+    }
+
+    @Test
+    public void testParseEarlyRefreshPercentInteger() {
+        assertEquals(AuthenticationOAuth2.parseEarlyRefreshPercent("80"), 0.8);
+        assertEquals(AuthenticationOAuth2.parseEarlyRefreshPercent("50"), 0.5);
+        assertEquals(AuthenticationOAuth2.parseEarlyRefreshPercent("100"), 
1.0);
+        assertEquals(AuthenticationOAuth2.parseEarlyRefreshPercent("1"), 0.01);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testParseEarlyRefreshPercentZeroInteger() {
+        AuthenticationOAuth2.parseEarlyRefreshPercent("0");
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testParseEarlyRefreshPercentInvalid() {
+        AuthenticationOAuth2.parseEarlyRefreshPercent("not-a-number");
+    }
+
+    // ----- helpers -----
+
+    private static String minimalCredentialsJson() throws Exception {
+        Map<String, String> params = new HashMap<>();
+        params.put("type", "client_credentials");
+        params.put("privateKey", "data:base64,e30=");
+        params.put("issuerUrl", "http://localhost";);
+        return new ObjectMapper().writeValueAsString(params);
+    }
+
+    private static String credentialsJsonWithEarlyRefresh(String 
earlyRefreshValue) throws Exception {
+        Map<String, String> params = new HashMap<>();
+        params.put("type", "client_credentials");
+        params.put("privateKey", "data:base64,e30=");
+        params.put("issuerUrl", "http://localhost";);
+        
params.put(AuthenticationOAuth2.CONFIG_PARAM_EARLY_TOKEN_REFRESH_PERCENT, 
earlyRefreshValue);
+        return new ObjectMapper().writeValueAsString(params);
+    }
+
     @Test
     public void testStart() throws Exception {
         this.auth.start();
@@ -111,7 +231,7 @@ public class AuthenticationOAuth2Test {
     }
 
     @Test
-    public void testGetAuthData() throws Exception {
+    public void testGetAuthDataNoEarlyRefresh() throws Exception {
         AuthenticationDataProvider data;
         TokenResult tr = 
TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(TEST_EXPIRES_IN).build();
         doReturn(tr).when(this.flow).authenticate();
@@ -124,13 +244,70 @@ public class AuthenticationOAuth2Test {
         verify(this.flow, times(1)).authenticate();
         assertEquals(data.getCommandData(), tr.getAccessToken());
 
-        // cache miss
-        clock.advance(Duration.ofSeconds(TEST_EXPIRES_IN));
+        // cache miss (have to move passed expiration b/c we refresh when 
token is expired now)
+        // NOTE: this works because the token uses the mocked clock.
+        clock.advance(Duration.ofSeconds(TEST_EXPIRES_IN + 1));
         data = this.auth.getAuthData();
         verify(this.flow, times(2)).authenticate();
         assertEquals(data.getCommandData(), tr.getAccessToken());
     }
 
+    // This test skips the early refresh logic and just ensures that if the 
class were to somehow fail
+    // to refresh the token before expiration, the caller will get one final 
attempt at calling authenticate
+    @Test
+    public void testGetAuthDataWithEarlyRefresh() throws Exception {
+        @Cleanup AuthenticationOAuth2 auth = new AuthenticationOAuth2(flow, 
this.clock, 0.8, null);
+        AuthenticationDataProvider data;
+        TokenResult tr = 
TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(TEST_EXPIRES_IN).build();
+        doReturn(tr).when(this.flow).authenticate();
+        data = auth.getAuthData();
+        verify(this.flow, times(1)).authenticate();
+        assertEquals(data.getCommandData(), tr.getAccessToken());
+
+        // cache hit
+        data = auth.getAuthData();
+        verify(this.flow, times(1)).authenticate();
+        assertEquals(data.getCommandData(), tr.getAccessToken());
+
+        // cache miss (have to move passed expiration b/c we refresh when 
token is expired now)
+        clock.advance(Duration.ofSeconds(TEST_EXPIRES_IN + 1));
+        data = auth.getAuthData();
+        verify(this.flow, times(2)).authenticate();
+        assertEquals(data.getCommandData(), tr.getAccessToken());
+    }
+
+    // This test ensures that the early token refresh actually calls the 
authenticate method in the background.
+    @Test
+    public void testEarlyTokenRefreshCallsAuthenticate() throws Exception {
+        @Cleanup AuthenticationOAuth2 auth = new AuthenticationOAuth2(flow, 
this.clock, 0.1, null);
+        TokenResult tr = 
TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(1).build();
+        doReturn(tr).when(this.flow).authenticate();
+        // Initialize the flow
+        auth.getAuthData();
+        // Give the auth token refresh a chance to run multiple times
+        Thread.sleep(1000);
+        auth.close();
+        verify(this.flow, atLeast(2)).authenticate();
+        verify(this.flow).close();
+    }
+
+    // This test ensures scheduler is used when passed in
+    @Test
+    public void 
testEarlyTokenRefreshCallsAuthenticateWithParameterizedScheduler() throws 
Exception {
+        ScheduledExecutorService scheduler = 
mock(ScheduledExecutorService.class);
+        @Cleanup AuthenticationOAuth2 auth = new AuthenticationOAuth2(flow, 
this.clock, 0.1, scheduler);
+        TokenResult tr = 
TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(1).build();
+        doReturn(tr).when(this.flow).authenticate();
+        // Initialize the flow and trigger scheduling
+        auth.getAuthData();
+        verify(scheduler, times(1)).execute(any(Runnable.class));
+        // Close and verify that the passed in scheduler isn't shutdown
+        auth.close();
+        verify(this.flow).close();
+        verify(scheduler, times(0)).shutdownNow();
+        verify(scheduler, times(2)).execute(any(Runnable.class));
+    }
+
     @Test
     public void testMetadataResolver() throws MalformedURLException {
         URL url = DefaultMetadataResolver.getWellKnownMetadataUrl(
@@ -181,5 +358,6 @@ public class AuthenticationOAuth2Test {
     public void testClose() throws Exception {
         this.auth.close();
         verify(this.flow).close();
+        assertThrows(PulsarClientException.AlreadyClosedException.class, () -> 
this.auth.getAuthData());
     }
 }


Reply via email to