This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 27bb3865eb3 [improve][client] Enable configurable preemptive OAuth2
token refresh (#25363)
27bb3865eb3 is described below
commit 27bb3865eb38a6451180f4682393117b8f9be92f
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Mar 21 18:01:56 2026 +0200
[improve][client] Enable configurable preemptive OAuth2 token refresh
(#25363)
Co-authored-by: Michael Marshall <[email protected]>
Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
.../auth/oauth2/AuthenticationFactoryOAuth2.java | 51 ++++-
.../impl/auth/oauth2/AuthenticationOAuth2.java | 252 +++++++++++++++++++--
.../AuthenticationOAuth2StandardAuthzServer.java | 26 +--
.../impl/auth/oauth2/ClientCredentialsFlow.java | 2 +
.../impl/auth/oauth2/AuthenticationOAuth2Test.java | 186 ++++++++++++++-
5 files changed, 469 insertions(+), 48 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
index 7c89c6cde6d..b035b02437b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
@@ -19,14 +19,17 @@
package org.apache.pulsar.client.impl.auth.oauth2;
import java.net.URL;
-import java.time.Clock;
import java.time.Duration;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.pulsar.client.api.Authentication;
import
org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver;
/**
* Factory class that allows to create {@link Authentication} instances
* for OAuth 2.0 authentication methods.
+ *
+ * <p>Use {@link #clientCredentialsBuilder()} to build an {@link
Authentication} object
+ * for the client credentials flow, with optional early token refresh support.
*/
public final class AuthenticationFactoryOAuth2 {
@@ -37,7 +40,9 @@ public final class AuthenticationFactoryOAuth2 {
* @param credentialsUrl the credentials URL
* @param audience An optional field. The audience identifier used
by some Identity Providers, like Auth0.
* @return an Authentication object
+ * @deprecated use {@link #clientCredentialsBuilder()}, instead.
*/
+ @Deprecated
public static Authentication clientCredentials(URL issuerUrl, URL
credentialsUrl, String audience) {
return clientCredentials(issuerUrl, credentialsUrl, audience, null);
}
@@ -55,7 +60,9 @@ public final class AuthenticationFactoryOAuth2 {
* and each string adds an additional access range
to the requested scope.
* From here:
https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
* @return an Authentication object
+ * @deprecated use {@link #clientCredentialsBuilder()}, instead.
*/
+ @Deprecated
public static Authentication clientCredentials(URL issuerUrl, URL
credentialsUrl, String audience, String scope) {
return
clientCredentialsBuilder().issuerUrl(issuerUrl).credentialsUrl(credentialsUrl).audience(audience)
.scope(scope).build();
@@ -91,6 +98,8 @@ public final class AuthenticationFactoryOAuth2 {
private Duration readTimeout;
private String trustCertsFilePath;
private String wellKnownMetadataPath;
+ private double earlyTokenRefreshPercent =
AuthenticationOAuth2.EARLY_TOKEN_REFRESH_PERCENT_DEFAULT;
+ private ScheduledExecutorService scheduler;
private ClientCredentialsBuilder() {
}
@@ -120,7 +129,7 @@ public final class AuthenticationFactoryOAuth2 {
/**
* Optional audience identifier used by some Identity Providers, like
Auth0.
*
- * @param audience the audiance
+ * @param audience the audience
* @return the builder
*/
public ClientCredentialsBuilder audience(String audience) {
@@ -188,7 +197,41 @@ public final class AuthenticationFactoryOAuth2 {
}
/**
- * Authenticate with client credentials.
+ * The fraction of the token's {@code expires_in} time at which the
client starts attempting
+ * a background refresh. Must be greater than 0. Values ≥ 1 disable
early refresh (the default).
+ *
+ * <p>For example, {@code 0.8} means the client will attempt to
refresh after 80% of the
+ * token lifetime has elapsed, leaving a 20% buffer to tolerate a
temporary OAuth server
+ * outage while the existing token is still valid. During an outage
the client keeps retrying
+ * in the background with exponential backoff, continuing to serve
requests with the current
+ * token until it actually expires.
+ *
+ * @param earlyTokenRefreshPercent fractional value in (0, 1) to
enable, or ≥ 1 to disable
+ * @return the builder
+ */
+ public ClientCredentialsBuilder earlyTokenRefreshPercent(double
earlyTokenRefreshPercent) {
+ if (earlyTokenRefreshPercent <= 0) {
+ throw new IllegalArgumentException("earlyTokenRefreshPercent
must be greater than 0.");
+ }
+ this.earlyTokenRefreshPercent = earlyTokenRefreshPercent;
+ return this;
+ }
+
+ /**
+ * Optional scheduler for background token refresh tasks. If not set
and early refresh is
+ * enabled, a shared internal daemon-thread scheduler is used
automatically.
+ * {@link AuthenticationOAuth2} will never shut down a caller-supplied
scheduler.
+ *
+ * @param scheduler the scheduler to use for background token refresh
+ * @return the builder
+ */
+ public ClientCredentialsBuilder scheduler(ScheduledExecutorService
scheduler) {
+ this.scheduler = scheduler;
+ return this;
+ }
+
+ /**
+ * Builds the {@link Authentication} object.
*
* @return an Authentication object
*/
@@ -203,7 +246,7 @@ public final class AuthenticationFactoryOAuth2 {
.trustCertsFilePath(trustCertsFilePath)
.wellKnownMetadataPath(wellKnownMetadataPath)
.build();
- return new AuthenticationOAuth2(flow, Clock.systemDefaultZone());
+ return new AuthenticationOAuth2(flow, earlyTokenRefreshPercent,
scheduler);
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
index 694e4681d97..a8a52909136 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
@@ -18,44 +18,134 @@
*/
package org.apache.pulsar.client.impl.auth.oauth2;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.JavaVersion;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.AuthenticationUtil;
import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
/**
* Pulsar client authentication provider based on OAuth 2.0.
+ *
+ * The first call to {@link #getAuthData()} will result in a blocking network
call to retrieve the OAuth2.0 token from
+ * the Identity Provider. After that, there are two behaviors, depending on
{@link #earlyTokenRefreshPercent}:
+ *
+ * 1. If {@link #earlyTokenRefreshPercent} is less than 1, this authentication
class will schedule a runnable to refresh
+ * the token in n seconds where n is the result of multiplying {@link
#earlyTokenRefreshPercent} and the `expires_in`
+ * value returned by the Identity Provider. If the call to the Identity
Provider fails, this class will retry attempting
+ * to refresh the token using an exponential backoff. If the token is not
refreshed before it expires, the Pulsar client
+ * will make one final blocking call to the Identity Provider. If that call
fails, this class will pass the failure to
+ * the Pulsar client. This proactive approach to token management is good for
use cases that want to avoid latency
+ * spikes from calls to the Identity Provider and that want to be able to
withstand short Identity Provider outages. The
+ * tradeoff is that this class consumes slightly more resources.
+ *
+ * 2. If {@link #earlyTokenRefreshPercent} is greater than or equal to 1, this
class will not retrieve a new token until
+ * the {@link #getAuthData()} method is called while the cached token is
expired. If the call to the Identity Provider
+ * fails, this class will pass the failure to the Pulsar client. This lazy
approach is good for use cases that are not
+ * latency sensitive and that will not use the token frequently.
+ *
+ * {@link #earlyTokenRefreshPercent} must be greater than 0. It defaults to 1,
which means that early token refresh is
+ * disabled by default.
+ *
+ * The current implementation of this class can block the calling thread.
+ *
+ * This class is intended to be called from multiple threads, and is therefore
designed to be thread-safe.
*/
@Slf4j
public class AuthenticationOAuth2 implements Authentication,
EncodedAuthenticationParameterSupport {
public static final String CONFIG_PARAM_TYPE = "type";
+ public static final String CONFIG_PARAM_EARLY_TOKEN_REFRESH_PERCENT =
"earlyTokenRefreshPercent";
public static final String TYPE_CLIENT_CREDENTIALS = "client_credentials";
+ public static final int EARLY_TOKEN_REFRESH_PERCENT_DEFAULT = 1; //
feature disabled by default
public static final String AUTH_METHOD_NAME = "token";
- public static final double EXPIRY_ADJUSTMENT = 0.9;
private static final long serialVersionUID = 1L;
+ // Shared executor used when the caller does not supply one. Uses daemon
threads and scales down to
+ // 0 threads when no instances are actively refreshing tokens.
+ private static final ScheduledExecutorService INTERNAL_SCHEDULER =
createInternalScheduler();
+
+ private static ScheduledExecutorService createInternalScheduler() {
+ // corePoolSize=0 allows the pool to scale down to zero idle threads
(Java 9+).
+ // On Java 8 a corePoolSize of 0 means tasks may never execute, so
fall back to 1.
+ int corePoolSize =
SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9) ? 0 : 1;
+ return Executors.newScheduledThreadPool(corePoolSize,
+ new DefaultThreadFactory("oauth2-token-refresher", true));
+ }
+
+ private transient ScheduledExecutorService scheduler;
+ double earlyTokenRefreshPercent;
+
final Clock clock;
- Flow flow;
- transient CachedToken cachedToken;
+ volatile Flow flow;
+ private transient volatile CachedToken cachedToken;
+
+ // Only ever updated in synchronized block on class.
+ private boolean isClosed = false;
+ // Only ever updated on the single scheduler thread. Do not need to be
volatile.
+ private transient Backoff backoff;
+ private transient ScheduledFuture<?> nextRefreshAttempt;
+
+ // No args constructor used when creating class with reflection
public AuthenticationOAuth2() {
- this.clock = Clock.systemDefaultZone();
+ this(Clock.systemDefaultZone(), EARLY_TOKEN_REFRESH_PERCENT_DEFAULT,
null);
}
AuthenticationOAuth2(Flow flow, Clock clock) {
+ this(flow, clock, EARLY_TOKEN_REFRESH_PERCENT_DEFAULT, null);
+ }
+
+ AuthenticationOAuth2(Flow flow,
+ double earlyTokenRefreshPercent,
+ ScheduledExecutorService scheduler) {
+ this(flow, Clock.systemDefaultZone(), earlyTokenRefreshPercent,
scheduler);
+ }
+
+ AuthenticationOAuth2(Flow flow,
+ Clock clock,
+ double earlyTokenRefreshPercent,
+ ScheduledExecutorService scheduler) {
+ this(clock, earlyTokenRefreshPercent, scheduler);
this.flow = flow;
+ }
+
+ /**
+ * @param clock - clock to use when determining token expiration.
+ * @param earlyTokenRefreshPercent - see javadoc for {@link
AuthenticationOAuth2}. Must be greater than 0.
+ * @param scheduler - The scheduler to use for background token refreshes.
If {@code null} and
+ * {@link #earlyTokenRefreshPercent} is less than 1, the
shared internal daemon-thread
+ * scheduler is used. If the caller supplies a scheduler,
this class will not shut it down.
+ */
+ private AuthenticationOAuth2(Clock clock, double earlyTokenRefreshPercent,
ScheduledExecutorService scheduler) {
+ if (earlyTokenRefreshPercent <= 0) {
+ throw new IllegalArgumentException("EarlyTokenRefreshPercent must
be greater than 0.");
+ }
+ this.earlyTokenRefreshPercent = earlyTokenRefreshPercent;
this.clock = clock;
+ if (scheduler == null && earlyTokenRefreshPercent < 1) {
+ this.scheduler = INTERNAL_SCHEDULER;
+ } else {
+ this.scheduler = scheduler;
+ }
}
@Override
@@ -65,6 +155,16 @@ public class AuthenticationOAuth2 implements
Authentication, EncodedAuthenticati
@Override
public void configure(String encodedAuthParamString) {
+ Map<String, String> params =
parseAuthParameters(encodedAuthParamString);
+ String type = params.getOrDefault(CONFIG_PARAM_TYPE,
TYPE_CLIENT_CREDENTIALS);
+ if (TYPE_CLIENT_CREDENTIALS.equals(type)) {
+ this.flow = ClientCredentialsFlow.fromParameters(params);
+ } else {
+ throw new IllegalArgumentException("Unsupported authentication
type: " + type);
+ }
+ }
+
+ protected Map<String, String> parseAuthParameters(String
encodedAuthParamString) {
if (StringUtils.isBlank(encodedAuthParamString)) {
throw new IllegalArgumentException("No authentication parameters
were provided");
}
@@ -75,13 +175,44 @@ public class AuthenticationOAuth2 implements
Authentication, EncodedAuthenticati
throw new IllegalArgumentException("Malformed authentication
parameters", e);
}
- String type = params.getOrDefault(CONFIG_PARAM_TYPE,
TYPE_CLIENT_CREDENTIALS);
- switch(type) {
- case TYPE_CLIENT_CREDENTIALS:
- this.flow = ClientCredentialsFlow.fromParameters(params);
- break;
- default:
- throw new IllegalArgumentException("Unsupported authentication
type: " + type);
+ String earlyRefreshPercentStr =
params.get(CONFIG_PARAM_EARLY_TOKEN_REFRESH_PERCENT);
+ if (earlyRefreshPercentStr != null) {
+ double percent = parseEarlyRefreshPercent(earlyRefreshPercentStr);
+ this.earlyTokenRefreshPercent = percent;
+ if (percent < 1 && this.scheduler == null) {
+ this.scheduler = INTERNAL_SCHEDULER;
+ }
+ }
+ return params;
+ }
+
+ /**
+ * Parses the {@code earlyRefreshPercent} configuration value.
+ *
+ * <p>If the string contains a decimal point it is interpreted as a
fractional value in [0, 1]
+ * and used directly (e.g. {@code "0.8"} → 0.8). Otherwise the string is
treated as an integer
+ * percentage and divided by 100 (e.g. {@code "80"} → 0.8, {@code "100"} →
1.0).
+ *
+ * @param value the raw string from the configuration map
+ * @return the resolved fractional percent, must be > 0
+ * @throws IllegalArgumentException if the value cannot be parsed or is ≤ 0
+ */
+ static double parseEarlyRefreshPercent(String value) {
+ try {
+ double percent;
+ if (value.contains(".")) {
+ percent = Double.parseDouble(value);
+ } else {
+ percent = Integer.parseInt(value) / 100.0;
+ }
+ if (percent <= 0) {
+ throw new IllegalArgumentException(
+ CONFIG_PARAM_EARLY_TOKEN_REFRESH_PERCENT + " must be
greater than 0, got: " + value);
+ }
+ return percent;
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "Malformed configuration parameter: " +
CONFIG_PARAM_EARLY_TOKEN_REFRESH_PERCENT, e);
}
}
@@ -96,21 +227,109 @@ public class AuthenticationOAuth2 implements
Authentication, EncodedAuthenticati
flow.initialize();
}
+ /**
+ * The first time that this method is called, it retrieves a token. All
subsequent
+ * calls should get a cached value. However, if there is an issue with the
Identity
+ * Provider, there is a chance that the background thread responsible for
keeping
+ * the refresh token hot will
+ * @return The authentication data identifying this client that will be
sent to the broker
+ * @throws PulsarClientException
+ */
@Override
public synchronized AuthenticationDataProvider getAuthData() throws
PulsarClientException {
+ if (isClosed) {
+ throw new
PulsarClientException.AlreadyClosedException("Authentication already closed.");
+ }
if (this.cachedToken == null || this.cachedToken.isExpired()) {
- TokenResult tr = this.flow.authenticate();
- this.cachedToken = new CachedToken(tr);
+ this.authenticate();
}
return this.cachedToken.getAuthData();
}
+ /**
+ * Retrieve the token (synchronously), and then schedule refresh runnable.
+ */
+ private void authenticate() throws PulsarClientException {
+ if (log.isDebugEnabled()) {
+ log.debug("Attempting to retrieve OAuth2 token now.");
+ }
+ TokenResult tr = this.flow.authenticate();
+ this.cachedToken = new CachedToken(tr);
+ handleSuccessfulTokenRefresh();
+ }
+
+ /**
+ * When we successfully get a token, we need to schedule the next attempt
to refresh it.
+ * This is done completely based on the "expires_in" value returned by the
identity provider.
+ * The code is run on the single scheduler thread in order to ensure that
the backoff and the nextRefreshAttempt are
+ * updated safely.
+ */
+ private void handleSuccessfulTokenRefresh() {
+ if (scheduler != null && earlyTokenRefreshPercent < 1) {
+ scheduler.execute(() -> {
+ backoff = buildBackoff(cachedToken.latest.getExpiresIn());
+ long expiresInMillis =
TimeUnit.SECONDS.toMillis(cachedToken.latest.getExpiresIn());
+ scheduleRefresh((long) (expiresInMillis *
earlyTokenRefreshPercent));
+ });
+ }
+ }
+
+ /**
+ * Attempt to refresh the token. If successful, schedule the next refresh
task according to the
+ * {@link #earlyTokenRefreshPercent}. If failed, schedule another attempt
to refresh the token according to the
+ * {@link #backoff} policy.
+ */
+ private void refreshToken() {
+ try {
+ this.authenticate();
+ } catch (PulsarClientException | RuntimeException e) {
+ long delayMillis = backoff.next();
+ log.error("Error refreshing token. Will retry in {} millis.",
delayMillis, e);
+ scheduleRefresh(delayMillis);
+ }
+ }
+
+ /**
+ * Schedule the task to refresh the token.
+ * NOTE: this method must be run on the {@link #scheduler} thread in order
to ensure {@link #nextRefreshAttempt}
+ * is accessed and updated safely.
+ * @param delayMillis the time, in milliseconds, to wait before starting
to attempt to refresh the token.
+ */
+ private void scheduleRefresh(long delayMillis) {
+ nextRefreshAttempt = scheduler.schedule(this::refreshToken,
delayMillis, TimeUnit.MILLISECONDS);
+ }
+
+ private Backoff buildBackoff(int expiresInSeconds) {
+ return new BackoffBuilder()
+ .setInitialTime(1, TimeUnit.SECONDS)
+ .setMax(10, TimeUnit.MINUTES)
+ // Attempt a final token refresh attempt 2 seconds before the
token actually expires, if necessary.
+ .setMandatoryStop(Math.max(0, expiresInSeconds - 2),
TimeUnit.SECONDS)
+ .create();
+ }
+
@Override
- public void close() throws IOException {
+ public synchronized void close() throws IOException {
try {
- flow.close();
+ isClosed = true;
+ if (flow != null) {
+ flow.close();
+ }
} catch (Exception e) {
throw new IOException(e);
+ } finally {
+ if (scheduler != null) {
+ // Cancel all subsequent refresh attempts by canceling the
next token refresh attempt. By running
+ // this command on the single scheduler thread, we remove the
chance for a race condition that could
+ // allow a currently executing refresh attempt to schedule
another refresh attempt.
+ // We never shut down the scheduler here because either it is
the shared INTERNAL_SCHEDULER,
+ // or it was provided by the caller who manages its own
lifecycle.
+ scheduler.execute(() -> {
+ if (nextRefreshAttempt != null) {
+ nextRefreshAttempt.cancel(false);
+ }
+ });
+ }
}
}
@@ -122,8 +341,7 @@ public class AuthenticationOAuth2 implements
Authentication, EncodedAuthenticati
public CachedToken(TokenResult latest) {
this.latest = latest;
- int adjustedExpiresIn = (int) (latest.getExpiresIn() *
EXPIRY_ADJUSTMENT);
- this.expiresAt =
AuthenticationOAuth2.this.clock.instant().plusSeconds(adjustedExpiresIn);
+ this.expiresAt =
AuthenticationOAuth2.this.clock.instant().plusSeconds(latest.getExpiresIn());
this.authData = new
AuthenticationDataOAuth2(latest.getAccessToken());
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2StandardAuthzServer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2StandardAuthzServer.java
index c61d6d7b097..4afd0d15ea4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2StandardAuthzServer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2StandardAuthzServer.java
@@ -18,11 +18,8 @@
*/
package org.apache.pulsar.client.impl.auth.oauth2;
-import java.io.IOException;
import java.time.Clock;
import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.impl.AuthenticationUtil;
import
org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver;
/**
@@ -43,28 +40,11 @@ public class AuthenticationOAuth2StandardAuthzServer
extends AuthenticationOAuth
}
@Override
- public void configure(String encodedAuthParamString) {
- if (StringUtils.isBlank(encodedAuthParamString)) {
- throw new IllegalArgumentException("No authentication parameters
were provided");
- }
- Map<String, String> params;
- try {
- params =
AuthenticationUtil.configureFromJsonString(encodedAuthParamString);
- } catch (IOException e) {
- throw new IllegalArgumentException("Malformed authentication
parameters", e);
- }
-
+ protected Map<String, String> parseAuthParameters(String
encodedAuthParamString) {
+ Map<String, String> params =
super.parseAuthParameters(encodedAuthParamString);
// Always set the OAuth 2.0 standard metadata path
params.put(FlowBase.CONFIG_PARAM_WELL_KNOWN_METADATA_PATH,
DefaultMetadataResolver.OAUTH_WELL_KNOWN_METADATA_PATH);
-
- String type = params.getOrDefault(CONFIG_PARAM_TYPE,
TYPE_CLIENT_CREDENTIALS);
- switch(type) {
- case TYPE_CLIENT_CREDENTIALS:
- this.flow = ClientCredentialsFlow.fromParameters(params);
- break;
- default:
- throw new IllegalArgumentException("Unsupported authentication
type: " + type);
- }
+ return params;
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
index f3c85ca5cdd..d841010add8 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
@@ -47,6 +47,7 @@ import
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
class ClientCredentialsFlow extends FlowBase {
public static final String CONFIG_PARAM_ISSUER_URL = "issuerUrl";
public static final String CONFIG_PARAM_AUDIENCE = "audience";
+ // Maps to the keyFileUrl
public static final String CONFIG_PARAM_KEY_FILE = "privateKey";
public static final String CONFIG_PARAM_SCOPE = "scope";
@@ -70,6 +71,7 @@ class ClientCredentialsFlow extends FlowBase {
this.scope = scope;
}
+
/**
* Constructs a {@link ClientCredentialsFlow} from configuration
parameters.
*
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
index d430d8f0e40..46dbe41b48e 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
@@ -18,12 +18,15 @@
*/
package org.apache.pulsar.client.impl.auth.oauth2;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertThrows;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.MalformedURLException;
import java.net.URI;
@@ -33,7 +36,10 @@ import java.time.Instant;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.Cleanup;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver;
import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
import org.testng.annotations.BeforeMethod;
@@ -54,7 +60,7 @@ public class AuthenticationOAuth2Test {
public void before() {
this.clock = new MockClock(Instant.EPOCH, ZoneOffset.UTC);
this.flow = mock(Flow.class);
- this.auth = new AuthenticationOAuth2(flow, this.clock);
+ this.auth = new AuthenticationOAuth2(flow, this.clock, 1, null);
}
@Test
@@ -104,6 +110,120 @@ public class AuthenticationOAuth2Test {
assertNotNull(this.auth.flow);
}
+ // ----- configure() via default constructor -----
+
+ @Test
+ public void testConfigureViaDefaultConstructorSetsFlow() throws Exception {
+ AuthenticationOAuth2 auth = new AuthenticationOAuth2();
+ auth.configure(minimalCredentialsJson());
+ assertNotNull(auth.flow);
+ }
+
+ @Test
+ public void
testConfigureViaDefaultConstructorDefaultEarlyRefreshIsDisabled() throws
Exception {
+ AuthenticationOAuth2 auth = new AuthenticationOAuth2();
+ auth.configure(minimalCredentialsJson());
+ assertEquals(auth.earlyTokenRefreshPercent, (double)
AuthenticationOAuth2.EARLY_TOKEN_REFRESH_PERCENT_DEFAULT);
+ }
+
+ @Test
+ public void testConfigureViaDefaultConstructorEarlyRefreshDecimal() throws
Exception {
+ AuthenticationOAuth2 auth = new AuthenticationOAuth2();
+ auth.configure(credentialsJsonWithEarlyRefresh("0.8"));
+ assertEquals(auth.earlyTokenRefreshPercent, 0.8);
+ }
+
+ @Test
+ public void testConfigureViaDefaultConstructorEarlyRefreshInteger() throws
Exception {
+ AuthenticationOAuth2 auth = new AuthenticationOAuth2();
+ auth.configure(credentialsJsonWithEarlyRefresh("80"));
+ assertEquals(auth.earlyTokenRefreshPercent, 0.8);
+ }
+
+ @Test
+ public void
testConfigureViaDefaultConstructorEarlyRefreshIntegerDisabled() throws
Exception {
+ // Integer 100 → 1.0, meaning disabled
+ AuthenticationOAuth2 auth = new AuthenticationOAuth2();
+ auth.configure(credentialsJsonWithEarlyRefresh("100"));
+ assertEquals(auth.earlyTokenRefreshPercent, 1.0);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConfigureViaDefaultConstructorEarlyRefreshZero() throws
Exception {
+ AuthenticationOAuth2 auth = new AuthenticationOAuth2();
+ auth.configure(credentialsJsonWithEarlyRefresh("0"));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConfigureViaDefaultConstructorEarlyRefreshInvalid() throws
Exception {
+ AuthenticationOAuth2 auth = new AuthenticationOAuth2();
+ auth.configure(credentialsJsonWithEarlyRefresh("not-a-number"));
+ }
+
+ @Test
+ public void
testConfigureViaDefaultConstructorEarlyRefreshTriggersBackgroundRefresh()
throws Exception {
+ // Verify that when early refresh is enabled via configure(), the
background scheduler
+ // is activated and actually calls authenticate() before token expiry.
+ AuthenticationOAuth2 auth = new AuthenticationOAuth2();
+ auth.configure(credentialsJsonWithEarlyRefresh("10")); // 10% →
refresh at 100ms of a 1s token
+ // Replace the flow with a mock after configure() has set it up
+ auth.flow = this.flow;
+ TokenResult tr =
TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(1).build();
+ doReturn(tr).when(this.flow).authenticate();
+
+ auth.getAuthData();
+ // Give the background scheduler time to trigger at least one
additional refresh
+ Thread.sleep(500);
+ auth.close();
+ verify(this.flow, atLeast(2)).authenticate();
+ }
+
+ // ----- parseEarlyRefreshPercent -----
+
+ @Test
+ public void testParseEarlyRefreshPercentDecimal() {
+ assertEquals(AuthenticationOAuth2.parseEarlyRefreshPercent("0.8"),
0.8);
+ assertEquals(AuthenticationOAuth2.parseEarlyRefreshPercent("0.5"),
0.5);
+ assertEquals(AuthenticationOAuth2.parseEarlyRefreshPercent("1.0"),
1.0);
+ }
+
+ @Test
+ public void testParseEarlyRefreshPercentInteger() {
+ assertEquals(AuthenticationOAuth2.parseEarlyRefreshPercent("80"), 0.8);
+ assertEquals(AuthenticationOAuth2.parseEarlyRefreshPercent("50"), 0.5);
+ assertEquals(AuthenticationOAuth2.parseEarlyRefreshPercent("100"),
1.0);
+ assertEquals(AuthenticationOAuth2.parseEarlyRefreshPercent("1"), 0.01);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testParseEarlyRefreshPercentZeroInteger() {
+ AuthenticationOAuth2.parseEarlyRefreshPercent("0");
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testParseEarlyRefreshPercentInvalid() {
+ AuthenticationOAuth2.parseEarlyRefreshPercent("not-a-number");
+ }
+
+ // ----- helpers -----
+
+ private static String minimalCredentialsJson() throws Exception {
+ Map<String, String> params = new HashMap<>();
+ params.put("type", "client_credentials");
+ params.put("privateKey", "data:base64,e30=");
+ params.put("issuerUrl", "http://localhost");
+ return new ObjectMapper().writeValueAsString(params);
+ }
+
+ private static String credentialsJsonWithEarlyRefresh(String
earlyRefreshValue) throws Exception {
+ Map<String, String> params = new HashMap<>();
+ params.put("type", "client_credentials");
+ params.put("privateKey", "data:base64,e30=");
+ params.put("issuerUrl", "http://localhost");
+
params.put(AuthenticationOAuth2.CONFIG_PARAM_EARLY_TOKEN_REFRESH_PERCENT,
earlyRefreshValue);
+ return new ObjectMapper().writeValueAsString(params);
+ }
+
@Test
public void testStart() throws Exception {
this.auth.start();
@@ -111,7 +231,7 @@ public class AuthenticationOAuth2Test {
}
@Test
- public void testGetAuthData() throws Exception {
+ public void testGetAuthDataNoEarlyRefresh() throws Exception {
AuthenticationDataProvider data;
TokenResult tr =
TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(TEST_EXPIRES_IN).build();
doReturn(tr).when(this.flow).authenticate();
@@ -124,13 +244,70 @@ public class AuthenticationOAuth2Test {
verify(this.flow, times(1)).authenticate();
assertEquals(data.getCommandData(), tr.getAccessToken());
- // cache miss
- clock.advance(Duration.ofSeconds(TEST_EXPIRES_IN));
+ // cache miss (have to move passed expiration b/c we refresh when
token is expired now)
+ // NOTE: this works because the token uses the mocked clock.
+ clock.advance(Duration.ofSeconds(TEST_EXPIRES_IN + 1));
data = this.auth.getAuthData();
verify(this.flow, times(2)).authenticate();
assertEquals(data.getCommandData(), tr.getAccessToken());
}
+ // This test skips the early refresh logic and just ensures that if the
class were to somehow fail
+ // to refresh the token before expiration, the caller will get one final
attempt at calling authenticate
+ @Test
+ public void testGetAuthDataWithEarlyRefresh() throws Exception {
+ @Cleanup AuthenticationOAuth2 auth = new AuthenticationOAuth2(flow,
this.clock, 0.8, null);
+ AuthenticationDataProvider data;
+ TokenResult tr =
TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(TEST_EXPIRES_IN).build();
+ doReturn(tr).when(this.flow).authenticate();
+ data = auth.getAuthData();
+ verify(this.flow, times(1)).authenticate();
+ assertEquals(data.getCommandData(), tr.getAccessToken());
+
+ // cache hit
+ data = auth.getAuthData();
+ verify(this.flow, times(1)).authenticate();
+ assertEquals(data.getCommandData(), tr.getAccessToken());
+
+ // cache miss (have to move passed expiration b/c we refresh when
token is expired now)
+ clock.advance(Duration.ofSeconds(TEST_EXPIRES_IN + 1));
+ data = auth.getAuthData();
+ verify(this.flow, times(2)).authenticate();
+ assertEquals(data.getCommandData(), tr.getAccessToken());
+ }
+
+ // This test ensures that the early token refresh actually calls the
authenticate method in the background.
+ @Test
+ public void testEarlyTokenRefreshCallsAuthenticate() throws Exception {
+ @Cleanup AuthenticationOAuth2 auth = new AuthenticationOAuth2(flow,
this.clock, 0.1, null);
+ TokenResult tr =
TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(1).build();
+ doReturn(tr).when(this.flow).authenticate();
+ // Initialize the flow
+ auth.getAuthData();
+ // Give the auth token refresh a chance to run multiple times
+ Thread.sleep(1000);
+ auth.close();
+ verify(this.flow, atLeast(2)).authenticate();
+ verify(this.flow).close();
+ }
+
+ // This test ensures scheduler is used when passed in
+ @Test
+ public void
testEarlyTokenRefreshCallsAuthenticateWithParameterizedScheduler() throws
Exception {
+ ScheduledExecutorService scheduler =
mock(ScheduledExecutorService.class);
+ @Cleanup AuthenticationOAuth2 auth = new AuthenticationOAuth2(flow,
this.clock, 0.1, scheduler);
+ TokenResult tr =
TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(1).build();
+ doReturn(tr).when(this.flow).authenticate();
+ // Initialize the flow and trigger scheduling
+ auth.getAuthData();
+ verify(scheduler, times(1)).execute(any(Runnable.class));
+ // Close and verify that the passed in scheduler isn't shutdown
+ auth.close();
+ verify(this.flow).close();
+ verify(scheduler, times(0)).shutdownNow();
+ verify(scheduler, times(2)).execute(any(Runnable.class));
+ }
+
@Test
public void testMetadataResolver() throws MalformedURLException {
URL url = DefaultMetadataResolver.getWellKnownMetadataUrl(
@@ -181,5 +358,6 @@ public class AuthenticationOAuth2Test {
public void testClose() throws Exception {
this.auth.close();
verify(this.flow).close();
+ assertThrows(PulsarClientException.AlreadyClosedException.class, () ->
this.auth.getAuthData());
}
}