This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new fe6d02812cb [improve][client] Implement tls_client_auth for
AuthenticationOAuth2 (#25538)
fe6d02812cb is described below
commit fe6d02812cb9abad2c756bf19eca68d8a66fd0e7
Author: Hideaki Oguni <[email protected]>
AuthorDate: Wed Apr 29 00:41:09 2026 +0900
[improve][client] Implement tls_client_auth for AuthenticationOAuth2
(#25538)
Co-authored-by: hoguni <[email protected]>
(cherry-pick from commit d6ca792aa1b09708673f5f632829adb932fcfa0f)
---
.../auth/oauth2/AuthenticationFactoryOAuth2.java | 113 +++++++++++++++++--
.../impl/auth/oauth2/AuthenticationOAuth2.java | 13 ++-
.../impl/auth/oauth2/ClientCredentialsFlow.java | 14 ++-
.../pulsar/client/impl/auth/oauth2/FlowBase.java | 108 ++++++++++++++-----
...CredentialsFlow.java => TlsClientAuthFlow.java} | 119 ++++++++-------------
.../protocol/ClientCredentialsExchangeRequest.java | 3 +
.../impl/auth/oauth2/protocol/TokenClient.java | 7 +-
...geRequest.java => TokenEndpointAuthMethod.java} | 40 ++++---
.../oauth2/AuthenticationFactoryOAuth2Test.java | 49 ++++++++-
.../impl/auth/oauth2/AuthenticationOAuth2Test.java | 47 +++++++-
.../impl/auth/oauth2/OAuth2MockHttpClient.java | 49 +++++++++
.../impl/auth/oauth2/TlsClientAuthFlowTest.java} | 44 ++++----
.../impl/auth/oauth2/protocol/TokenClientTest.java | 88 ++++++++++++++-
13 files changed, 533 insertions(+), 161 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
index b035b02437b..2b4ee9940b8 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
@@ -21,8 +21,10 @@ package org.apache.pulsar.client.impl.auth.oauth2;
import java.net.URL;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Authentication;
import
org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver;
+import
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenEndpointAuthMethod;
/**
* Factory class that allows to create {@link Authentication} instances
@@ -92,18 +94,36 @@ public final class AuthenticationFactoryOAuth2 {
private URL issuerUrl;
private URL credentialsUrl;
+ private TokenEndpointAuthMethod tokenEndpointAuthMethod =
+ TokenEndpointAuthMethod.CLIENT_SECRET_POST;
+ private String clientId;
+ private String tlsCertFile;
+ private String tlsKeyFile;
private String audience;
private String scope;
private Duration connectTimeout;
private Duration readTimeout;
private String trustCertsFilePath;
private String wellKnownMetadataPath;
+ private Duration autoCertRefreshDuration;
private double earlyTokenRefreshPercent =
AuthenticationOAuth2.EARLY_TOKEN_REFRESH_PERCENT_DEFAULT;
private ScheduledExecutorService scheduler;
private ClientCredentialsBuilder() {
}
+ /**
+ * Optional token endpoint auth method.
+ * Defaults to {@code client_secret_post}.
+ *
+ * @param tokenEndpointAuthMethod the token endpoint auth method
+ * @return the builder
+ */
+ public ClientCredentialsBuilder
tokenEndpointAuthMethod(TokenEndpointAuthMethod tokenEndpointAuthMethod) {
+ this.tokenEndpointAuthMethod = tokenEndpointAuthMethod;
+ return this;
+ }
+
/**
* Required issuer URL.
*
@@ -126,6 +146,43 @@ public final class AuthenticationFactoryOAuth2 {
return this;
}
+ /**
+ * Optional path to the file for a client certificate.
+ * Required when {@code tokenEndpointAuthMethod} is {@code
tls_client_auth}
+ *
+ * @param tlsCertFile the path to the file for a client certificate
+ * @return the builder
+ */
+ public ClientCredentialsBuilder tlsCertFile(String tlsCertFile) {
+ this.tlsCertFile = tlsCertFile;
+ return this;
+ }
+
+ /**
+ * Optional path to the file for a client private key.
+ * Required when {@code tokenEndpointAuthMethod} is {@code
tls_client_auth}
+ *
+ * @param tlsKeyFile the path to the file for a client private key
+ * @return the builder
+ */
+ public ClientCredentialsBuilder tlsKeyFile(String tlsKeyFile) {
+ this.tlsKeyFile = tlsKeyFile;
+ return this;
+ }
+
+ /**
+ * Optional client identifier issued by the authorization server.
+ * Only used by {@code tls_client_auth}.
+ * Defaults to {@code pulsar-client} when not provided.
+ *
+ * @param clientId the client identifier
+ * @return the builder
+ */
+ public ClientCredentialsBuilder clientId(String clientId) {
+ this.clientId = clientId;
+ return this;
+ }
+
/**
* Optional audience identifier used by some Identity Providers, like
Auth0.
*
@@ -137,6 +194,17 @@ public final class AuthenticationFactoryOAuth2 {
return this;
}
+ /**
+ * Optional certificate refresh interval.
+ *
+ * @param autoCertRefreshDuration the Certificate refresh interval
+ * @return the builder
+ */
+ public ClientCredentialsBuilder autoCertRefreshDuration(Duration
autoCertRefreshDuration) {
+ this.autoCertRefreshDuration = autoCertRefreshDuration;
+ return this;
+ }
+
/**
* Optional scope expressed as a list of space-delimited,
case-sensitive strings.
* The strings are defined by the authorization server.
@@ -236,16 +304,41 @@ public final class AuthenticationFactoryOAuth2 {
* @return an Authentication object
*/
public Authentication build() {
- ClientCredentialsFlow flow = ClientCredentialsFlow.builder()
- .issuerUrl(issuerUrl)
- .privateKey(credentialsUrl == null ? null :
credentialsUrl.toExternalForm())
- .audience(audience)
- .scope(scope)
- .connectTimeout(connectTimeout)
- .readTimeout(readTimeout)
- .trustCertsFilePath(trustCertsFilePath)
- .wellKnownMetadataPath(wellKnownMetadataPath)
- .build();
+ Flow flow;
+ if (tokenEndpointAuthMethod ==
TokenEndpointAuthMethod.CLIENT_SECRET_POST) {
+ flow = ClientCredentialsFlow.builder()
+ .issuerUrl(issuerUrl)
+ .privateKey(credentialsUrl == null ? null :
credentialsUrl.toExternalForm())
+ .audience(audience)
+ .scope(scope)
+ .connectTimeout(connectTimeout)
+ .readTimeout(readTimeout)
+ .trustCertsFilePath(trustCertsFilePath)
+ .certFile(tlsCertFile)
+ .keyFile(tlsKeyFile)
+ .autoCertRefreshDuration(autoCertRefreshDuration)
+ .wellKnownMetadataPath(wellKnownMetadataPath)
+ .build();
+ } else if (tokenEndpointAuthMethod ==
TokenEndpointAuthMethod.TLS_CLIENT_AUTH) {
+ if (StringUtils.isBlank(tlsCertFile) ||
StringUtils.isBlank(tlsKeyFile)) {
+ throw new IllegalArgumentException("Required configuration
parameters: tlsCertFile, tlsKeyFile");
+ }
+ flow = TlsClientAuthFlow.builder()
+ .issuerUrl(issuerUrl)
+ .clientId(clientId)
+ .certFile(tlsCertFile)
+ .keyFile(tlsKeyFile)
+ .audience(audience)
+ .scope(scope)
+ .connectTimeout(connectTimeout)
+ .readTimeout(readTimeout)
+ .trustCertsFilePath(trustCertsFilePath)
+ .wellKnownMetadataPath(wellKnownMetadataPath)
+ .autoCertRefreshDuration(autoCertRefreshDuration)
+ .build();
+ } else {
+ throw new IllegalArgumentException("Unsupported auth method: "
+ tokenEndpointAuthMethod);
+ }
return new AuthenticationOAuth2(flow, earlyTokenRefreshPercent,
scheduler);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
index a8a52909136..b760b16c532 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
@@ -38,6 +38,7 @@ import
org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.AuthenticationUtil;
+import
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenEndpointAuthMethod;
import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
@@ -73,6 +74,7 @@ import org.apache.pulsar.common.util.BackoffBuilder;
public class AuthenticationOAuth2 implements Authentication,
EncodedAuthenticationParameterSupport {
public static final String CONFIG_PARAM_TYPE = "type";
+ public static final String CONFIG_PARAM_TOKEN_ENDPOINT_AUTH_METHOD =
"tokenEndpointAuthMethod";
public static final String CONFIG_PARAM_EARLY_TOKEN_REFRESH_PERCENT =
"earlyTokenRefreshPercent";
public static final String TYPE_CLIENT_CREDENTIALS = "client_credentials";
public static final int EARLY_TOKEN_REFRESH_PERCENT_DEFAULT = 1; //
feature disabled by default
@@ -158,7 +160,16 @@ public class AuthenticationOAuth2 implements
Authentication, EncodedAuthenticati
Map<String, String> params =
parseAuthParameters(encodedAuthParamString);
String type = params.getOrDefault(CONFIG_PARAM_TYPE,
TYPE_CLIENT_CREDENTIALS);
if (TYPE_CLIENT_CREDENTIALS.equals(type)) {
- this.flow = ClientCredentialsFlow.fromParameters(params);
+ TokenEndpointAuthMethod authMethod =
TokenEndpointAuthMethod.fromValue(
+
params.getOrDefault(CONFIG_PARAM_TOKEN_ENDPOINT_AUTH_METHOD,
+
TokenEndpointAuthMethod.CLIENT_SECRET_POST.value()));
+ if (authMethod == TokenEndpointAuthMethod.CLIENT_SECRET_POST) {
+ this.flow = ClientCredentialsFlow.fromParameters(params);
+ } else if (authMethod == TokenEndpointAuthMethod.TLS_CLIENT_AUTH) {
+ this.flow = TlsClientAuthFlow.fromParameters(params);
+ } else {
+ throw new IllegalArgumentException("Unsupported auth method: "
+ authMethod);
+ }
} else {
throw new IllegalArgumentException("Unsupported authentication
type: " + type);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
index d841010add8..6692e86848b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchangeRequest;
import
org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchanger;
import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenClient;
+import
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenEndpointAuthMethod;
import
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenExchangeException;
import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
@@ -64,8 +65,10 @@ class ClientCredentialsFlow extends FlowBase {
@Builder
public ClientCredentialsFlow(URL issuerUrl, String audience, String
privateKey, String scope,
Duration connectTimeout, Duration
readTimeout, String trustCertsFilePath,
+ String certFile, String keyFile, Duration
autoCertRefreshDuration,
String wellKnownMetadataPath) {
- super(issuerUrl, connectTimeout, readTimeout, trustCertsFilePath,
wellKnownMetadataPath);
+ super(issuerUrl, connectTimeout, readTimeout, trustCertsFilePath,
certFile, keyFile, autoCertRefreshDuration,
+ wellKnownMetadataPath);
this.audience = audience;
this.privateKey = privateKey;
this.scope = scope;
@@ -87,6 +90,9 @@ class ClientCredentialsFlow extends FlowBase {
Duration connectTimeout = parseParameterDuration(params,
CONFIG_PARAM_CONNECT_TIMEOUT);
Duration readTimeout = parseParameterDuration(params,
CONFIG_PARAM_READ_TIMEOUT);
String trustCertsFilePath =
params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH);
+ String certFile = params.get(CONFIG_PARAM_CERT_FILE);
+ String keyFile = params.get(CONFIG_PARAM_TLS_KEY_FILE);
+ Duration autoCertRefreshDuration = parseParameterDuration(params,
CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION);
String wellKnownMetadataPath =
params.get(CONFIG_PARAM_WELL_KNOWN_METADATA_PATH);
return ClientCredentialsFlow.builder()
@@ -97,6 +103,9 @@ class ClientCredentialsFlow extends FlowBase {
.connectTimeout(connectTimeout)
.readTimeout(readTimeout)
.trustCertsFilePath(trustCertsFilePath)
+ .certFile(certFile)
+ .keyFile(keyFile)
+ .autoCertRefreshDuration(autoCertRefreshDuration)
.wellKnownMetadataPath(wellKnownMetadataPath)
.build();
}
@@ -138,7 +147,7 @@ class ClientCredentialsFlow extends FlowBase {
assert this.metadata != null;
URL tokenUrl = this.metadata.getTokenEndpoint();
- this.exchanger = new TokenClient(tokenUrl, getHttpClient());
+ this.exchanger = new TokenClient(tokenUrl, httpClient);
initialized = true;
}
@@ -157,6 +166,7 @@ class ClientCredentialsFlow extends FlowBase {
.clientSecret(keyFile.getClientSecret())
.audience(this.audience)
.scope(this.scope)
+ .authMethod(TokenEndpointAuthMethod.CLIENT_SECRET_POST)
.build();
TokenResult tr;
if (!initialized) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
index 63248e86051..d1caa82e07c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
@@ -26,6 +26,9 @@ import java.net.URL;
import java.time.Duration;
import java.time.format.DateTimeParseException;
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -34,9 +37,14 @@ import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver;
import org.apache.pulsar.client.impl.auth.oauth2.protocol.Metadata;
import org.apache.pulsar.client.impl.auth.oauth2.protocol.MetadataResolver;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.client.util.PulsarHttpAsyncSslEngineFactory;
+import org.apache.pulsar.common.util.PulsarSslConfiguration;
+import org.apache.pulsar.common.util.PulsarSslFactory;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.SslEngineFactory;
/**
* An abstract OAuth 2.0 authorization flow.
@@ -47,34 +55,37 @@ abstract class FlowBase implements Flow {
public static final String CONFIG_PARAM_CONNECT_TIMEOUT = "connectTimeout";
public static final String CONFIG_PARAM_READ_TIMEOUT = "readTimeout";
public static final String CONFIG_PARAM_TRUST_CERTS_FILE_PATH =
"trustCertsFilePath";
+ public static final String CONFIG_PARAM_CERT_FILE = "tlsCertFile";
+ public static final String CONFIG_PARAM_TLS_KEY_FILE = "tlsKeyFile";
+ public static final String CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION =
"autoCertRefreshDuration";
public static final String CONFIG_PARAM_WELL_KNOWN_METADATA_PATH =
"wellKnownMetadataPath";
protected static final Duration DEFAULT_CONNECT_TIMEOUT =
Duration.ofSeconds(10);
protected static final Duration DEFAULT_READ_TIMEOUT =
Duration.ofSeconds(30);
+ protected static final Duration DEFAULT_AUTO_CERT_REFRESH_DURATION =
Duration.ofSeconds(300);
private static final long serialVersionUID = 1L;
protected final URL issuerUrl;
- private final Duration connectTimeout;
- private final Duration readTimeout;
- private final String trustCertsFilePath;
+ protected transient AsyncHttpClient httpClient;
protected final String wellKnownMetadataPath;
-
+ protected transient PulsarSslFactory sslFactory;
+ protected transient ScheduledExecutorService sslRefreshScheduler;
protected transient Metadata metadata;
- private transient AsyncHttpClient httpClient;
protected FlowBase(URL issuerUrl, Duration connectTimeout, Duration
readTimeout, String trustCertsFilePath,
+ String certFile, String keyFile, Duration
autoCertRefreshDuration,
String wellKnownMetadataPath) {
this.issuerUrl = issuerUrl;
- this.connectTimeout = connectTimeout;
- this.readTimeout = readTimeout;
- this.trustCertsFilePath = trustCertsFilePath;
+ this.httpClient = defaultHttpClient(readTimeout, connectTimeout,
trustCertsFilePath, certFile, keyFile);
+ long autoCertRefreshSeconds =
getParameterDurationToSeconds(CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION,
+ autoCertRefreshDuration, DEFAULT_AUTO_CERT_REFRESH_DURATION);
+ scheduleSslContextRefreshIfEnabled(autoCertRefreshSeconds);
this.wellKnownMetadataPath = wellKnownMetadataPath;
- getHttpClient();
}
private AsyncHttpClient defaultHttpClient(Duration readTimeout, Duration
connectTimeout,
- String trustCertsFilePath) {
+ String trustCertsFilePath,
String certFile, String keyFile) {
DefaultAsyncHttpClientConfig.Builder confBuilder = new
DefaultAsyncHttpClientConfig.Builder();
confBuilder.setCookieStore(null);
confBuilder.setUseProxyProperties(true);
@@ -85,7 +96,31 @@ abstract class FlowBase implements Flow {
confBuilder.setReadTimeout(
getParameterDurationToMillis(CONFIG_PARAM_READ_TIMEOUT,
readTimeout, DEFAULT_READ_TIMEOUT));
confBuilder.setUserAgent(String.format("Pulsar-Java-v%s",
PulsarVersion.getVersion()));
- if (StringUtils.isNotBlank(trustCertsFilePath)) {
+ boolean hasCertFile = StringUtils.isNotBlank(certFile);
+ boolean hasKeyFile = StringUtils.isNotBlank(keyFile);
+ if (hasCertFile != hasKeyFile) {
+ throw new IllegalArgumentException("Invalid TLS client certificate
configuration: " + CONFIG_PARAM_CERT_FILE
+ + " and " + CONFIG_PARAM_TLS_KEY_FILE + " must be provided
together");
+ }
+ if (hasCertFile && hasKeyFile) {
+ try {
+ PulsarSslConfiguration sslConfiguration =
PulsarSslConfiguration.builder()
+ .tlsCertificateFilePath(certFile)
+ .tlsKeyFilePath(keyFile)
+ .tlsTrustCertsFilePath(trustCertsFilePath)
+ .allowInsecureConnection(false)
+ .serverMode(false)
+ .isHttps(true)
+ .build();
+ sslFactory = new
org.apache.pulsar.common.util.DefaultPulsarSslFactory();
+ sslFactory.initialize(sslConfiguration);
+ sslFactory.createInternalSslContext();
+ SslEngineFactory sslEngineFactory = new
PulsarHttpAsyncSslEngineFactory(sslFactory, null);
+ confBuilder.setSslEngineFactory(sslEngineFactory);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid TLS client
certificate configuration", e);
+ }
+ } else if (StringUtils.isNotBlank(trustCertsFilePath)) {
try {
confBuilder.setSslContext(SslContextBuilder.forClient()
.trustManager(new File(trustCertsFilePath))
@@ -97,28 +132,47 @@ abstract class FlowBase implements Flow {
return new DefaultAsyncHttpClient(confBuilder.build());
}
- protected synchronized AsyncHttpClient getHttpClient() {
- if (httpClient == null) {
- httpClient = defaultHttpClient(readTimeout, connectTimeout,
trustCertsFilePath);
+ private void scheduleSslContextRefreshIfEnabled(long refreshSeconds) {
+ if (sslFactory == null || refreshSeconds <= 0 || sslRefreshScheduler
!= null) {
+ return;
+ }
+ sslRefreshScheduler = Executors.newSingleThreadScheduledExecutor(
+ new
ExecutorProvider.ExtendedThreadFactory("oauth2-tls-cert-refresher", true));
+ sslRefreshScheduler.scheduleWithFixedDelay(this::refreshSslContext,
+ refreshSeconds, refreshSeconds, TimeUnit.SECONDS);
+ log.info("Scheduled TLS certificate refresh, refreshSeconds {}",
refreshSeconds);
+ }
+
+ private void refreshSslContext() {
+ if (this.sslFactory == null) {
+ return;
+ }
+ try {
+ this.sslFactory.update();
+ log.debug("Successfully refreshed SSL context");
+ } catch (Exception e) {
+ log.error("Failed to refresh SSL context", e);
}
- return httpClient;
}
private int getParameterDurationToMillis(String name, Duration value,
Duration defaultValue) {
+ return (int) getParameterDuration(name, value,
defaultValue).toMillis();
+ }
+
+ private long getParameterDurationToSeconds(String name, Duration value,
Duration defaultValue) {
+ return getParameterDuration(name, value, defaultValue).getSeconds();
+ }
+
+ private Duration getParameterDuration(String name, Duration value,
Duration defaultValue) {
Duration duration;
if (value == null) {
- if (log.isDebugEnabled()) {
- log.debug("Configuration for [{}] is using the default value:
[{}]", name, defaultValue);
- }
+ log.debug("Configuration is using the default value, name {},
defaultValue {}", name, defaultValue);
duration = defaultValue;
} else {
- if (log.isDebugEnabled()) {
- log.debug("Configuration for [{}] is: [{}]", name, value);
- }
+ log.debug("Configuration, name {}, value {}", name, value);
duration = value;
}
-
- return (int) duration.toMillis();
+ return duration;
}
public void initialize() throws PulsarClientException {
@@ -131,7 +185,7 @@ abstract class FlowBase implements Flow {
}
protected MetadataResolver createMetadataResolver() {
- return DefaultMetadataResolver.fromIssuerUrl(issuerUrl,
getHttpClient(), wellKnownMetadataPath);
+ return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, httpClient,
wellKnownMetadataPath);
}
static String parseParameterString(Map<String, String> params, String
name) {
@@ -168,8 +222,14 @@ abstract class FlowBase implements Flow {
@Override
public void close() throws Exception {
+ if (sslRefreshScheduler != null) {
+ sslRefreshScheduler.shutdownNow();
+ }
if (httpClient != null) {
httpClient.close();
}
+ if (sslFactory != null) {
+ sslFactory.close();
+ }
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlow.java
similarity index 53%
copy from
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
copy to
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlow.java
index d841010add8..d61dadef83b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlow.java
@@ -18,145 +18,113 @@
*/
package org.apache.pulsar.client.impl.auth.oauth2;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.URISyntaxException;
import java.net.URL;
-import java.net.URLConnection;
-import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchangeRequest;
-import
org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchanger;
import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenClient;
+import
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenEndpointAuthMethod;
import
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenExchangeException;
import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
/**
- * Implementation of OAuth 2.0 Client Credentials flow.
+ * Implementation of OAuth 2.0 Client TLS Authentication flow.
*
- * @see <a href="https://tools.ietf.org/html/rfc6749#section-4.4">OAuth 2.0
RFC 6749, section 4.4</a>
+ * @see <a href="https://datatracker.ietf.org/doc/html/rfc8705">RFC 8705 -
OAuth 2.0 Mutual-TLS Client Authentication</a>
*/
@Slf4j
-class ClientCredentialsFlow extends FlowBase {
+class TlsClientAuthFlow extends FlowBase {
public static final String CONFIG_PARAM_ISSUER_URL = "issuerUrl";
+ public static final String CONFIG_PARAM_CLIENT_ID = "clientId";
public static final String CONFIG_PARAM_AUDIENCE = "audience";
- // Maps to the keyFileUrl
- public static final String CONFIG_PARAM_KEY_FILE = "privateKey";
public static final String CONFIG_PARAM_SCOPE = "scope";
+ public static final String CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION =
+ FlowBase.CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION;
+
+ private static final String DEFAULT_CLIENT_ID = "pulsar-client";
private static final long serialVersionUID = 1L;
+ private final String clientId;
private final String audience;
- private final String privateKey;
private final String scope;
- private transient ClientCredentialsExchanger exchanger;
+ private transient TokenClient exchanger;
private boolean initialized = false;
@Builder
- public ClientCredentialsFlow(URL issuerUrl, String audience, String
privateKey, String scope,
- Duration connectTimeout, Duration
readTimeout, String trustCertsFilePath,
- String wellKnownMetadataPath) {
- super(issuerUrl, connectTimeout, readTimeout, trustCertsFilePath,
wellKnownMetadataPath);
+ public TlsClientAuthFlow(URL issuerUrl, String clientId, String certFile,
String keyFile, String audience,
+ String scope, Duration connectTimeout, Duration
readTimeout, String trustCertsFilePath,
+ String wellKnownMetadataPath, Duration
autoCertRefreshDuration) {
+ super(issuerUrl, connectTimeout, readTimeout, trustCertsFilePath,
certFile, keyFile, autoCertRefreshDuration,
+ wellKnownMetadataPath);
+ this.clientId = StringUtils.defaultIfBlank(clientId,
DEFAULT_CLIENT_ID);
this.audience = audience;
- this.privateKey = privateKey;
this.scope = scope;
}
-
/**
- * Constructs a {@link ClientCredentialsFlow} from configuration
parameters.
+ * Constructs a {@link TlsClientAuthFlow} from configuration parameters.
*
- * @param params
- * @return
+ * @param params Configuration parameters
+ * @return A new TlsClientAuthFlow instance
*/
- public static ClientCredentialsFlow fromParameters(Map<String, String>
params) {
+ public static TlsClientAuthFlow fromParameters(Map<String, String> params)
{
URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL);
- String privateKeyUrl = parseParameterString(params,
CONFIG_PARAM_KEY_FILE);
- // These are optional parameters, so we only perform a get
+ // In mTLS-based providers, caller input for client_id can be optional.
+ // Keep sending client_id in token request for RFC compatibility by
applying a default value.
+ String clientId = params.getOrDefault(CONFIG_PARAM_CLIENT_ID,
DEFAULT_CLIENT_ID);
+ String certFile = parseParameterString(params, CONFIG_PARAM_CERT_FILE);
+ String keyFile = parseParameterString(params,
CONFIG_PARAM_TLS_KEY_FILE);
+ // These are optional parameters, so we allow null values
String scope = params.get(CONFIG_PARAM_SCOPE);
String audience = params.get(CONFIG_PARAM_AUDIENCE);
Duration connectTimeout = parseParameterDuration(params,
CONFIG_PARAM_CONNECT_TIMEOUT);
Duration readTimeout = parseParameterDuration(params,
CONFIG_PARAM_READ_TIMEOUT);
String trustCertsFilePath =
params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH);
String wellKnownMetadataPath =
params.get(CONFIG_PARAM_WELL_KNOWN_METADATA_PATH);
+ Duration autoCertRefreshDuration = parseParameterDuration(params,
CONFIG_PARAM_AUTO_CERT_REFRESH_DURATION);
- return ClientCredentialsFlow.builder()
+ return TlsClientAuthFlow.builder()
.issuerUrl(issuerUrl)
+ .clientId(clientId)
+ .certFile(certFile)
+ .keyFile(keyFile)
.audience(audience)
- .privateKey(privateKeyUrl)
.scope(scope)
.connectTimeout(connectTimeout)
.readTimeout(readTimeout)
.trustCertsFilePath(trustCertsFilePath)
.wellKnownMetadataPath(wellKnownMetadataPath)
+ .autoCertRefreshDuration(autoCertRefreshDuration)
.build();
}
- /**
- * Loads the private key from the given URL.
- *
- * @param privateKeyURL
- * @return
- * @throws IOException
- */
- private static KeyFile loadPrivateKey(String privateKeyURL) throws
IOException {
- try {
- URLConnection urlConnection = new
org.apache.pulsar.client.api.url.URL(privateKeyURL).openConnection();
- try {
- String protocol = urlConnection.getURL().getProtocol();
- String contentType = urlConnection.getContentType();
- if ("data".equals(protocol) &&
!"application/json".equals(contentType)) {
- throw new IllegalArgumentException(
- "Unsupported media type or encoding format: " +
urlConnection.getContentType());
- }
- KeyFile privateKey;
- try (Reader r = new InputStreamReader((InputStream)
urlConnection.getContent(),
- StandardCharsets.UTF_8)) {
- privateKey = KeyFile.fromJson(r);
- }
- return privateKey;
- } finally {
- IOUtils.close(urlConnection);
- }
- } catch (URISyntaxException | InstantiationException |
IllegalAccessException e) {
- throw new IOException("Invalid privateKey format", e);
- }
- }
-
@Override
public void initialize() throws PulsarClientException {
super.initialize();
assert this.metadata != null;
URL tokenUrl = this.metadata.getTokenEndpoint();
- this.exchanger = new TokenClient(tokenUrl, getHttpClient());
+ this.exchanger = new TokenClient(tokenUrl, httpClient);
+
initialized = true;
}
public TokenResult authenticate() throws PulsarClientException {
- // read the private key from storage
- KeyFile keyFile;
- try {
- keyFile = loadPrivateKey(this.privateKey);
- } catch (IOException e) {
- throw new PulsarClientException.AuthenticationException("Unable to
read private key: " + e.getMessage());
- }
-
- // request an access token using client credentials
+ // request an access token using TLS client authentication
ClientCredentialsExchangeRequest req =
ClientCredentialsExchangeRequest.builder()
- .clientId(keyFile.getClientId())
- .clientSecret(keyFile.getClientSecret())
+ .clientId(this.clientId)
.audience(this.audience)
.scope(this.scope)
+ .authMethod(TokenEndpointAuthMethod.TLS_CLIENT_AUTH)
.build();
TokenResult tr;
if (!initialized) {
@@ -179,4 +147,9 @@ class ClientCredentialsFlow extends FlowBase {
exchanger.close();
}
}
-}
+
+ @VisibleForTesting
+ String getClientId() {
+ return clientId;
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
index da026bce47e..de36da5ad5b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
@@ -42,4 +42,7 @@ public class ClientCredentialsExchangeRequest {
@JsonProperty("scope")
private String scope;
+
+ @JsonProperty("token_endpoint_auth_method")
+ private TokenEndpointAuthMethod authMethod;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
index 6eee7847535..10a587ddbca 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
@@ -56,10 +56,15 @@ public class TokenClient implements
ClientCredentialsExchanger {
* @return Generate the final request body from a map.
*/
String buildClientCredentialsBody(ClientCredentialsExchangeRequest req) {
+ TokenEndpointAuthMethod authMethod = req.getAuthMethod() == null
+ ? TokenEndpointAuthMethod.CLIENT_SECRET_POST
+ : req.getAuthMethod();
Map<String, String> bodyMap = new TreeMap<>();
bodyMap.put("grant_type", "client_credentials");
bodyMap.put("client_id", req.getClientId());
- bodyMap.put("client_secret", req.getClientSecret());
+ if (authMethod == TokenEndpointAuthMethod.CLIENT_SECRET_POST) {
+ bodyMap.put("client_secret", req.getClientSecret());
+ }
// Only set audience and scope if they are non-empty.
if (!StringUtils.isBlank(req.getAudience())) {
bodyMap.put("audience", req.getAudience());
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenEndpointAuthMethod.java
similarity index 58%
copy from
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
copy to
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenEndpointAuthMethod.java
index da026bce47e..104a64bd756 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenEndpointAuthMethod.java
@@ -18,28 +18,26 @@
*/
package org.apache.pulsar.client.impl.auth.oauth2.protocol;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import lombok.Builder;
-import lombok.Data;
+public enum TokenEndpointAuthMethod {
+ CLIENT_SECRET_POST("client_secret_post"),
+ TLS_CLIENT_AUTH("tls_client_auth");
-/**
- * A token request based on the exchange of client credentials.
- *
- * @see <a href="https://tools.ietf.org/html/rfc6749#section-4.4">OAuth 2.0
RFC 6749, section 4.4</a>
- */
-@Data
-@Builder
-public class ClientCredentialsExchangeRequest {
-
- @JsonProperty("client_id")
- private String clientId;
+ private final String value;
- @JsonProperty("client_secret")
- private String clientSecret;
+ TokenEndpointAuthMethod(String value) {
+ this.value = value;
+ }
- @JsonProperty("audience")
- private String audience;
+ public String value() {
+ return value;
+ }
- @JsonProperty("scope")
- private String scope;
-}
+ public static TokenEndpointAuthMethod fromValue(String value) {
+ for (TokenEndpointAuthMethod method : values()) {
+ if (method.value.equalsIgnoreCase(value)) {
+ return method;
+ }
+ }
+ throw new IllegalArgumentException("Unsupported token endpoint auth
method: " + value);
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java
index f76fee6e10d..ea675af9974 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java
@@ -18,11 +18,14 @@
*/
package org.apache.pulsar.client.impl.auth.oauth2;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.net.URL;
import java.time.Duration;
import org.apache.pulsar.client.api.Authentication;
+import
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenEndpointAuthMethod;
import org.testng.annotations.Test;
public class AuthenticationFactoryOAuth2Test {
@@ -36,17 +39,56 @@ public class AuthenticationFactoryOAuth2Test {
Duration connectTimeout = Duration.parse("PT11S");
Duration readTimeout = Duration.ofSeconds(31);
String trustCertsFilePath = null;
+ String tlsCertFile = "";
+ String tlsKeyFile = "";
String wellKnownMetadataPath = "/.well-known/custom-path";
try (Authentication authentication =
AuthenticationFactoryOAuth2.clientCredentialsBuilder().issuerUrl(issuerUrl)
.credentialsUrl(credentialsUrl).audience(audience).scope(scope)
.connectTimeout(connectTimeout).readTimeout(readTimeout)
- .trustCertsFilePath(trustCertsFilePath)
-
.wellKnownMetadataPath(wellKnownMetadataPath).build()) {
+
.trustCertsFilePath(trustCertsFilePath).tlsCertFile(tlsCertFile)
+
.tlsKeyFile(tlsKeyFile).wellKnownMetadataPath(wellKnownMetadataPath).build()) {
assertTrue(authentication instanceof AuthenticationOAuth2);
+ assertEquals(((AuthenticationOAuth2)
authentication).flow.getClass(), ClientCredentialsFlow.class);
}
}
+ @Test
+ public void testBuilderWithTlsClientAuthFlow() throws Exception {
+ URL issuerUrl = new URL("http://localhost");
+ String clientId = "test-client";
+ String tlsCertFile = "/path/to/cert.pem";
+ String tlsKeyFile = "/path/to/key.pem";
+ String audience = "audience";
+ String scope = "scope";
+ Duration autoCertRefreshDuration = Duration.ofSeconds(123);
+ OAuth2MockHttpClient.withMockedSslFactory(() -> {
+ try (Authentication authentication =
+
AuthenticationFactoryOAuth2.clientCredentialsBuilder().issuerUrl(issuerUrl)
+
.tokenEndpointAuthMethod(TokenEndpointAuthMethod.TLS_CLIENT_AUTH)
+ .clientId(clientId)
+ .tlsCertFile(tlsCertFile)
+ .tlsKeyFile(tlsKeyFile)
+ .audience(audience)
+ .scope(scope)
+
.autoCertRefreshDuration(autoCertRefreshDuration)
+ .build()) {
+ assertTrue(authentication instanceof AuthenticationOAuth2);
+ assertEquals(((AuthenticationOAuth2)
authentication).flow.getClass(), TlsClientAuthFlow.class);
+ }
+ });
+ }
+
+ @Test
+ public void testBuilderWithTlsClientAuthMissingCertOrKey() throws
IOException {
+ URL issuerUrl = new URL("http://localhost");
+ assertThrows(IllegalArgumentException.class, () ->
+ AuthenticationFactoryOAuth2.clientCredentialsBuilder()
+ .issuerUrl(issuerUrl)
+
.tokenEndpointAuthMethod(TokenEndpointAuthMethod.TLS_CLIENT_AUTH)
+ .build());
+ }
+
@Test
public void testStandardAuthzServerBuilder() throws IOException {
URL issuerUrl = new URL("http://localhost");
@@ -60,6 +102,7 @@ public class AuthenticationFactoryOAuth2Test {
}
}
+ @SuppressWarnings("deprecation")
@Test
public void testClientCredentials() throws IOException {
URL issuerUrl = new URL("http://localhost");
@@ -71,4 +114,4 @@ public class AuthenticationFactoryOAuth2Test {
}
}
-}
+}
\ No newline at end of file
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
index 46dbe41b48e..6b956957d30 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
@@ -41,7 +41,9 @@ import lombok.Cleanup;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver;
+import
org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenEndpointAuthMethod;
import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -63,6 +65,13 @@ public class AuthenticationOAuth2Test {
this.auth = new AuthenticationOAuth2(flow, this.clock, 1, null);
}
+ @AfterMethod(alwaysRun = true)
+ public void after() throws Exception {
+ if (this.auth != null) {
+ this.auth.close();
+ }
+ }
+
@Test
public void testGetAuthMethodName() {
assertEquals(this.auth.getAuthMethodName(), "token");
@@ -110,6 +119,42 @@ public class AuthenticationOAuth2Test {
assertNotNull(this.auth.flow);
}
+ @Test
+ public void testConfigureWithTlsClientAuth() throws Exception {
+ Map<String, String> params = new HashMap<>();
+ params.put("type", "client_credentials");
+
params.put(AuthenticationOAuth2.CONFIG_PARAM_TOKEN_ENDPOINT_AUTH_METHOD,
+ TokenEndpointAuthMethod.TLS_CLIENT_AUTH.value());
+ params.put("clientId", "test-client");
+ params.put("tlsCertFile", "/path/to/cert.pem");
+ params.put("tlsKeyFile", "/path/to/key.pem");
+ params.put("issuerUrl", "http://localhost");
+ ObjectMapper mapper = new ObjectMapper();
+ String authParams = mapper.writeValueAsString(params);
+ OAuth2MockHttpClient.withMockedSslFactory(() -> {
+ this.auth.configure(authParams);
+ assertNotNull(this.auth.flow);
+ assertEquals(this.auth.flow.getClass(), TlsClientAuthFlow.class);
+ });
+ }
+
+ @Test
+ public void testConfigureCredentialsWithTlsValues() throws Exception {
+ Map<String, String> params = new HashMap<>();
+ params.put("type", "client_credentials");
+ params.put("privateKey", "data:base64,e30=");
+ params.put("tlsCertFile", "/path/to/cert.pem");
+ params.put("tlsKeyFile", "/path/to/key.pem");
+ params.put("issuerUrl", "http://localhost");
+ ObjectMapper mapper = new ObjectMapper();
+ String authParams = mapper.writeValueAsString(params);
+ OAuth2MockHttpClient.withMockedSslFactory(() -> {
+ this.auth.configure(authParams);
+ assertNotNull(this.auth.flow);
+ assertEquals(this.auth.flow.getClass(),
ClientCredentialsFlow.class);
+ });
+ }
+
// ----- configure() via default constructor -----
@Test
@@ -360,4 +405,4 @@ public class AuthenticationOAuth2Test {
verify(this.flow).close();
assertThrows(PulsarClientException.AlreadyClosedException.class, () ->
this.auth.getAuthData());
}
-}
+}
\ No newline at end of file
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/OAuth2MockHttpClient.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/OAuth2MockHttpClient.java
new file mode 100644
index 00000000000..7a0bb0663e1
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/OAuth2MockHttpClient.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mockConstruction;
+import org.apache.pulsar.common.util.DefaultPulsarSslFactory;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.mockito.MockedConstruction;
+
+final class OAuth2MockHttpClient {
+
+ private OAuth2MockHttpClient() {
+ }
+
+ @FunctionalInterface
+ interface ThrowingRunnable {
+ void run() throws Exception;
+ }
+
+ static void withMockedSslFactory(ThrowingRunnable runnable) throws
Exception {
+ try (MockedConstruction<DefaultPulsarSslFactory> ignoredSslFactory =
+ mockConstruction(DefaultPulsarSslFactory.class, (mock,
context) -> {
+ doNothing().when(mock).initialize(any());
+ doNothing().when(mock).createInternalSslContext();
+ });
+ MockedConstruction<DefaultAsyncHttpClient> ignoredHttpClient =
+ mockConstruction(DefaultAsyncHttpClient.class)) {
+ runnable.run();
+ }
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlowTest.java
similarity index 50%
copy from
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
copy to
pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlowTest.java
index da026bce47e..203a0f6f15c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/TlsClientAuthFlowTest.java
@@ -16,30 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+package org.apache.pulsar.client.impl.auth.oauth2;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import lombok.Builder;
-import lombok.Data;
+import static org.testng.Assert.assertEquals;
+import java.util.HashMap;
+import java.util.Map;
+import org.testng.annotations.Test;
-/**
- * A token request based on the exchange of client credentials.
- *
- * @see <a href="https://tools.ietf.org/html/rfc6749#section-4.4">OAuth 2.0
RFC 6749, section 4.4</a>
- */
-@Data
-@Builder
-public class ClientCredentialsExchangeRequest {
-
- @JsonProperty("client_id")
- private String clientId;
-
- @JsonProperty("client_secret")
- private String clientSecret;
-
- @JsonProperty("audience")
- private String audience;
+public class TlsClientAuthFlowTest {
- @JsonProperty("scope")
- private String scope;
-}
+ @Test
+ public void testFromParametersWithoutClientId() throws Exception {
+ Map<String, String> params = new HashMap<>();
+ params.put("tlsCertFile", "/path/to/cert.pem");
+ params.put("tlsKeyFile", "/path/to/key.pem");
+ params.put("issuerUrl", "http://localhost");
+ params.put("scope", "http://localhost");
+ OAuth2MockHttpClient.withMockedSslFactory(() -> {
+ TlsClientAuthFlow flow = TlsClientAuthFlow.fromParameters(params);
+ assertEquals(flow.getClientId(), "pulsar-client");
+ flow.close();
+ });
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
index 13142768207..0992eef0409 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertNotNull;
@@ -48,8 +49,15 @@ public class TokenClientTest {
.clientId("test-client-id")
.clientSecret("test-client-secret")
.scope("test-scope")
+ .authMethod(TokenEndpointAuthMethod.CLIENT_SECRET_POST)
.build();
String body = tokenClient.buildClientCredentialsBody(request);
+ assertThat(body)
+ .contains("grant_type=")
+ .contains("client_id=")
+ .contains("client_secret=")
+ .contains("audience=")
+ .contains("scope=");
BoundRequestBuilder boundRequestBuilder =
mock(BoundRequestBuilder.class);
Response response = mock(Response.class);
ListenableFuture<Response> listenableFuture =
mock(ListenableFuture.class);
@@ -81,6 +89,10 @@ public class TokenClientTest {
.clientSecret("test-client-secret")
.build();
String body = tokenClient.buildClientCredentialsBody(request);
+ assertThat(body)
+ .contains("grant_type=")
+ .contains("client_id=")
+ .contains("client_secret=");
BoundRequestBuilder boundRequestBuilder =
mock(BoundRequestBuilder.class);
Response response = mock(Response.class);
ListenableFuture<Response> listenableFuture =
mock(ListenableFuture.class);
@@ -99,4 +111,78 @@ public class TokenClientTest {
TokenResult tr = tokenClient.exchangeClientCredentials(request);
assertNotNull(tr);
}
-}
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void exchangeTlsClientAuthSuccessTest() throws
+ IOException, TokenExchangeException, ExecutionException,
InterruptedException {
+ DefaultAsyncHttpClient defaultAsyncHttpClient =
mock(DefaultAsyncHttpClient.class);
+ URL url = new URL("http://localhost");
+ TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient);
+ ClientCredentialsExchangeRequest request =
ClientCredentialsExchangeRequest.builder()
+ .clientId("test-client-id")
+ .audience("test-audience")
+ .scope("test-scope")
+ .authMethod(TokenEndpointAuthMethod.TLS_CLIENT_AUTH)
+ .build();
+ String body = tokenClient.buildClientCredentialsBody(request);
+ assertThat(body)
+ .contains("grant_type=")
+ .contains("client_id=")
+ .contains("audience=")
+ .contains("scope=")
+ .doesNotContain("client_secret=");
+ BoundRequestBuilder boundRequestBuilder =
mock(BoundRequestBuilder.class);
+ Response response = mock(Response.class);
+ ListenableFuture<Response> listenableFuture =
mock(ListenableFuture.class);
+
when(defaultAsyncHttpClient.preparePost(url.toString())).thenReturn(boundRequestBuilder);
+ when(boundRequestBuilder.setHeader("Accept",
"application/json")).thenReturn(boundRequestBuilder);
+ when(boundRequestBuilder.setHeader("Content-Type",
+
"application/x-www-form-urlencoded")).thenReturn(boundRequestBuilder);
+
when(boundRequestBuilder.setBody(body)).thenReturn(boundRequestBuilder);
+ when(boundRequestBuilder.execute()).thenReturn(listenableFuture);
+ when(listenableFuture.get()).thenReturn(response);
+ when(response.getStatusCode()).thenReturn(200);
+ TokenResult tokenResult = new TokenResult();
+ tokenResult.setAccessToken("test-access-token");
+ tokenResult.setIdToken("test-id");
+ when(response.getResponseBodyAsBytes()).thenReturn(new
Gson().toJson(tokenResult).getBytes());
+ TokenResult tr = tokenClient.exchangeClientCredentials(request);
+ assertNotNull(tr);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void exchangeTlsClientAuthSuccessWithoutOptionalParamsTest() throws
+ IOException, TokenExchangeException, ExecutionException,
InterruptedException {
+ DefaultAsyncHttpClient defaultAsyncHttpClient =
mock(DefaultAsyncHttpClient.class);
+ URL url = new URL("http://localhost");
+ TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient);
+ ClientCredentialsExchangeRequest request =
ClientCredentialsExchangeRequest.builder()
+ .clientId("test-client-id")
+ .authMethod(TokenEndpointAuthMethod.TLS_CLIENT_AUTH)
+ .build();
+ String body = tokenClient.buildClientCredentialsBody(request);
+ assertThat(body)
+ .contains("grant_type=")
+ .contains("client_id=")
+ .doesNotContain("client_secret=");
+ BoundRequestBuilder boundRequestBuilder =
mock(BoundRequestBuilder.class);
+ Response response = mock(Response.class);
+ ListenableFuture<Response> listenableFuture =
mock(ListenableFuture.class);
+
when(defaultAsyncHttpClient.preparePost(url.toString())).thenReturn(boundRequestBuilder);
+ when(boundRequestBuilder.setHeader("Accept",
"application/json")).thenReturn(boundRequestBuilder);
+ when(boundRequestBuilder.setHeader("Content-Type",
+
"application/x-www-form-urlencoded")).thenReturn(boundRequestBuilder);
+
when(boundRequestBuilder.setBody(body)).thenReturn(boundRequestBuilder);
+ when(boundRequestBuilder.execute()).thenReturn(listenableFuture);
+ when(listenableFuture.get()).thenReturn(response);
+ when(response.getStatusCode()).thenReturn(200);
+ TokenResult tokenResult = new TokenResult();
+ tokenResult.setAccessToken("test-access-token");
+ tokenResult.setIdToken("test-id");
+ when(response.getResponseBodyAsBytes()).thenReturn(new
Gson().toJson(tokenResult).getBytes());
+ TokenResult tr = tokenClient.exchangeClientCredentials(request);
+ assertNotNull(tr);
+ }
+}
\ No newline at end of file