This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new db95281134a [fix][proxy] Close client connection immediately when
credentials expire and forwardAuthorizationCredentials is disabled (#25179)
db95281134a is described below
commit db95281134a7d026343d7f57429f03a540ea91cd
Author: Zixuan Liu <[email protected]>
AuthorDate: Sat Jan 24 11:20:40 2026 +0800
[fix][proxy] Close client connection immediately when credentials expire
and forwardAuthorizationCredentials is disabled (#25179)
(cherry picked from commit 334847073e2c926eac5c486a5cfda1d1dd42634a)
---
.../pulsar/proxy/server/ProxyConnection.java | 55 +-
.../proxy/server/ProxyAuthenticationTest.java | 562 +++++++++++++--------
.../pulsar/proxy/server/ProxyRefreshAuthTest.java | 48 +-
3 files changed, 411 insertions(+), 254 deletions(-)
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 354d145a954..54df98abb8c 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -146,7 +146,12 @@ public class ProxyConnection extends PulsarHandler {
Closing,
- Closed,
+ Closed;
+
+ boolean isAuthenticatedState() {
+ return this == ProxyLookupRequests
+ || this == ProxyConnectionToBroker;
+ }
}
ConnectionPool getConnectionPool() {
@@ -397,15 +402,7 @@ public class ProxyConnection extends PulsarHandler {
state = State.ProxyLookupRequests;
lookupProxyHandler = service.newLookupProxyHandler(this);
- if (service.getConfiguration().isAuthenticationEnabled()
- &&
service.getConfiguration().getAuthenticationRefreshCheckSeconds() > 0) {
- authRefreshTask = ctx.executor().scheduleAtFixedRate(
- Runnables.catchingAndLoggingThrowables(
-
this::refreshAuthenticationCredentialsAndCloseIfTooExpired),
-
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
-
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
- TimeUnit.SECONDS);
- }
+ startAuthRefreshTaskIfNotStarted();
final ByteBuf msg =
Commands.newConnected(protocolVersionToAdvertise, false);
writeAndFlush(msg);
}
@@ -421,6 +418,10 @@ public class ProxyConnection extends PulsarHandler {
final ByteBuf msg =
Commands.newConnected(connected.getProtocolVersion(), maxMessageSize,
connected.hasFeatureFlags() &&
connected.getFeatureFlags().isSupportsTopicWatchers());
writeAndFlush(msg);
+ // Start auth refresh task only if we are not forwarding
authorization credentials
+ if
(!service.getConfiguration().isForwardAuthorizationCredentials()) {
+ startAuthRefreshTaskIfNotStarted();
+ }
} else {
LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+ "Closing connection to broker '{}'.",
@@ -502,16 +503,44 @@ public class ProxyConnection extends PulsarHandler {
}
}
+ private void startAuthRefreshTaskIfNotStarted() {
+ if (service.getConfiguration().isAuthenticationEnabled()
+ &&
service.getConfiguration().getAuthenticationRefreshCheckSeconds() > 0
+ && authRefreshTask == null) {
+ authRefreshTask = ctx.executor().scheduleAtFixedRate(
+ Runnables.catchingAndLoggingThrowables(
+
this::refreshAuthenticationCredentialsAndCloseIfTooExpired),
+
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
+
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
+ TimeUnit.SECONDS);
+ }
+ }
+
private void refreshAuthenticationCredentialsAndCloseIfTooExpired() {
assert ctx.executor().inEventLoop();
- if (state != State.ProxyLookupRequests) {
- // Happens when an exception is thrown that causes this connection
to close.
+
+ // Only check expiration in authenticated states
+ if (!state.isAuthenticatedState()) {
return;
- } else if (!authState.isExpired()) {
+ }
+
+ if (!authState.isExpired()) {
// Credentials are still valid. Nothing to do at this point
return;
}
+ // If we are not forwarding authorization credentials to the broker,
the broker cannot
+ // refresh the client's credentials. In this case, we must close the
connection immediately
+ // when credentials expire.
+ if (!service.getConfiguration().isForwardAuthorizationCredentials()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Closing connection because client credentials
have expired and "
+ + "forwardAuthorizationCredentials is disabled (broker
cannot refresh)", remoteAddress);
+ }
+ ctx.close();
+ return;
+ }
+
if (System.nanoTime() - authChallengeSentTime
>
TimeUnit.SECONDS.toNanos(service.getConfiguration().getAuthenticationRefreshCheckSeconds()))
{
LOG.warn("[{}] Closing connection after timeout on refreshing auth
credentials", remoteAddress);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 3207c2c3d6a..b255500c463 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -18,13 +18,15 @@
*/
package org.apache.pulsar.proxy.server;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.spy;
-
import com.google.common.collect.Sets;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-
import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -32,14 +34,16 @@ import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-
+import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
-
+import javax.net.ssl.SSLSession;
import lombok.Cleanup;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -48,8 +52,11 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -58,214 +65,347 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class ProxyAuthenticationTest extends ProducerConsumerBase {
- private static final Logger log =
LoggerFactory.getLogger(ProxyAuthenticationTest.class);
-
- public static class BasicAuthenticationData implements
AuthenticationDataProvider {
- private final String authParam;
-
- public BasicAuthenticationData(String authParam) {
- this.authParam = authParam;
- }
-
- public boolean hasDataFromCommand() {
- return true;
- }
-
- public String getCommandData() {
- return authParam;
- }
-
- public boolean hasDataForHttp() {
- return true;
- }
-
- @Override
- public Set<Entry<String, String>> getHttpHeaders() {
- Map<String, String> headers = new HashMap<>();
- headers.put("BasicAuthentication", authParam);
- return headers.entrySet();
- }
- }
-
- public static class BasicAuthentication implements Authentication {
-
- private String authParam;
-
- @Override
- public void close() throws IOException {
- // noop
- }
-
- @Override
- public String getAuthMethodName() {
- return "BasicAuthentication";
- }
-
- @Override
- public AuthenticationDataProvider getAuthData() throws
PulsarClientException {
- try {
- return new BasicAuthenticationData(authParam);
- } catch (Exception e) {
- throw new PulsarClientException(e);
- }
- }
-
- @Override
- public void configure(Map<String, String> authParams) {
- this.authParam = String.format("{\"entityType\":
\"%s\", \"expiryTime\": \"%s\"}",
- authParams.get("entityType"),
authParams.get("expiryTime"));
- }
-
- @Override
- public void start() throws PulsarClientException {
- // noop
- }
- }
-
- public static class BasicAuthenticationProvider implements
AuthenticationProvider {
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public void initialize(ServiceConfiguration config) throws
IOException {
- }
-
- @Override
- public String getAuthMethodName() {
- return "BasicAuthentication";
- }
-
- @Override
- public CompletableFuture<String>
authenticateAsync(AuthenticationDataSource authData) {
- String commandData = null;
- if (authData.hasDataFromCommand()) {
- commandData = authData.getCommandData();
- } else if (authData.hasDataFromHttp()) {
- commandData =
authData.getHttpHeader("BasicAuthentication");
- }
-
- JsonObject element =
JsonParser.parseString(commandData).getAsJsonObject();
- log.info("Have log of {}", element);
- long expiryTimeInMillis =
Long.parseLong(element.get("expiryTime").getAsString());
- long currentTimeInMillis = System.currentTimeMillis();
- if (expiryTimeInMillis < currentTimeInMillis) {
- log.warn("Auth failed due to timeout");
- return CompletableFuture
- .failedFuture(new
AuthenticationException("Authentication data has been expired"));
- }
- final String result =
element.get("entityType").getAsString();
- // Run in another thread to attempt to test the async
logic
- return CompletableFuture.supplyAsync(() -> result);
- }
- }
-
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- conf.setAuthenticationEnabled(true);
- conf.setAuthorizationEnabled(true);
-
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
- // Expires after an hour
- conf.setBrokerClientAuthenticationParameters(
- "entityType:admin,expiryTime:" +
(System.currentTimeMillis() + 3600 * 1000));
-
- Set<String> superUserRoles = new HashSet<>();
- superUserRoles.add("admin");
- conf.setSuperUserRoles(superUserRoles);
-
- Set<String> providers = new HashSet<>();
- providers.add(BasicAuthenticationProvider.class.getName());
- conf.setAuthenticationProviders(providers);
-
- conf.setClusterName("test");
- Set<String> proxyRoles = new HashSet<>();
- proxyRoles.add("proxy");
- conf.setProxyRoles(proxyRoles);
+ private static final Logger log =
LoggerFactory.getLogger(ProxyAuthenticationTest.class);
+ private static final String CLUSTER_NAME = "test";
+
+ public static class BasicAuthenticationData implements
AuthenticationDataProvider {
+ private final String authParam;
+
+ public BasicAuthenticationData(String authParam) {
+ this.authParam = authParam;
+ }
+
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ public String getCommandData() {
+ return authParam;
+ }
+
+ public boolean hasDataForHttp() {
+ return true;
+ }
+
+ @Override
+ public Set<Entry<String, String>> getHttpHeaders() {
+ Map<String, String> headers = new HashMap<>();
+ headers.put("BasicAuthentication", authParam);
+ headers.put("X-Pulsar-Auth-Method-Name", "BasicAuthentication");
+ return headers.entrySet();
+ }
+ }
+
+ public static class BasicAuthentication implements Authentication {
+
+ private String authParam;
+
+ @Override
+ public void close() throws IOException {
+ // noop
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "BasicAuthentication";
+ }
+
+ @Override
+ public AuthenticationDataProvider getAuthData() throws
PulsarClientException {
+ try {
+ return new BasicAuthenticationData(authParam);
+ } catch (Exception e) {
+ throw new PulsarClientException(e);
+ }
+ }
+
+ @Override
+ public void configure(Map<String, String> authParams) {
+ this.authParam = String.format("{\"entityType\": \"%s\",
\"expiryTime\": \"%s\"}",
+ authParams.get("entityType"),
authParams.get("expiryTime"));
+ }
+
+ @Override
+ public void start() throws PulsarClientException {
+ // noop
+ }
+ }
+
+ public static class BasicAuthenticationState implements
AuthenticationState {
+ private final long expiryTimeInMillis;
+ private final String authRole;
+ private final AuthenticationDataSource authenticationDataSource;
+
+ private static boolean isExpired(long expiryTimeInMillis) {
+ return System.currentTimeMillis() > expiryTimeInMillis;
+ }
+
+ private static String[] parseAuthData(String commandData) {
+ JsonObject element =
JsonParser.parseString(commandData).getAsJsonObject();
+ long expiryTimeInMillis =
Long.parseLong(element.get("expiryTime").getAsString());
+ if (isExpired(expiryTimeInMillis)) {
+ throw new IllegalArgumentException("Credentials have expired");
+ }
+ String role = element.get("entityType").getAsString();
+ return new String[]{role, String.valueOf(expiryTimeInMillis)};
+ }
+
+ public BasicAuthenticationState(AuthenticationDataSource authData) {
+ this(authData.hasDataFromCommand() ? authData.getCommandData()
+ : authData.getHttpHeader("BasicAuthentication"));
+ }
+
+ public BasicAuthenticationState(AuthData authData) {
+ this(new String(authData.getBytes(), StandardCharsets.UTF_8));
+ }
+
+ private BasicAuthenticationState(String commandData) {
+ String[] parsed = parseAuthData(commandData);
+ this.authRole = parsed[0];
+ this.expiryTimeInMillis = Long.parseLong(parsed[1]);
+ this.authenticationDataSource = new
AuthenticationDataCommand(commandData, null, null);
+ }
+
+ @Override
+ public String getAuthRole() {
+ return authRole;
+ }
+
+ @Override
+ public AuthData authenticate(AuthData authData) throws
AuthenticationException {
+ return null; // Authentication complete
+ }
+
+ @Override
+ public CompletableFuture<AuthData> authenticateAsync(AuthData
authData) {
+ return CompletableFuture.completedFuture(null); // Authentication
complete
+ }
+
+ @Override
+ public AuthenticationDataSource getAuthDataSource() {
+ return authenticationDataSource;
+ }
+
+ @Override
+ public boolean isComplete() {
+ return authRole != null;
+ }
+
+ @Override
+ public boolean isExpired() {
+ return isExpired(expiryTimeInMillis);
+ }
+ }
+
+ public static class BasicAuthenticationProvider implements
AuthenticationProvider {
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void initialize(ServiceConfiguration config) throws IOException
{
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "BasicAuthentication";
+ }
+
+ @Override
+ public AuthenticationState newAuthState(AuthData authData,
SocketAddress remoteAddress, SSLSession sslSession) {
+ return new BasicAuthenticationState(authData);
+ }
+
+ @Override
+ public CompletableFuture<String>
authenticateAsync(AuthenticationDataSource authData) {
+ BasicAuthenticationState basicAuthenticationState = new
BasicAuthenticationState(authData);
+ return
CompletableFuture.supplyAsync(basicAuthenticationState::getAuthRole);
+ }
+ }
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthorizationEnabled(true);
+
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+ // Expires after an hour
+ conf.setBrokerClientAuthenticationParameters(
+ "entityType:admin,expiryTime:" + (System.currentTimeMillis() +
3600 * 1000));
+
+ Set<String> superUserRoles = new HashSet<>();
+ superUserRoles.add("admin");
+ conf.setSuperUserRoles(superUserRoles);
+
+ Set<String> providers = new HashSet<>();
+ providers.add(BasicAuthenticationProvider.class.getName());
+ conf.setAuthenticationProviders(providers);
+
+ conf.setClusterName(CLUSTER_NAME);
+ Set<String> proxyRoles = new HashSet<>();
+ proxyRoles.add("proxy");
+ conf.setProxyRoles(proxyRoles);
conf.setAuthenticateOriginalAuthData(true);
- super.init();
-
- updateAdminClient();
- producerBaseSetup();
- }
-
- @Override
- @AfterMethod(alwaysRun = true)
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
-
- @Test
- void testAuthentication() throws Exception {
- log.info("-- Starting {} test --", methodName);
-
- // Step 1: Create Admin Client
- updateAdminClient();
- // create a client which connects to proxy and pass authData
- String namespaceName = "my-property/my-ns";
- String topicName = "persistent://my-property/my-ns/my-topic1";
- String subscriptionName = "my-subscriber-name";
- // expires after 60 seconds
- String clientAuthParams = "entityType:client,expiryTime:" +
(System.currentTimeMillis() + 60 * 1000);
- // expires after 60 seconds
- String proxyAuthParams = "entityType:proxy,expiryTime:" +
(System.currentTimeMillis() + 60 * 1000);
-
- admin.namespaces().grantPermissionOnNamespace(namespaceName,
"proxy",
- Sets.newHashSet(AuthAction.consume,
AuthAction.produce));
- admin.namespaces().grantPermissionOnNamespace(namespaceName,
"client",
- Sets.newHashSet(AuthAction.consume,
AuthAction.produce));
-
- // Step 2: Try to use proxy Client as a normal Client - expect
exception
- ProxyConfiguration proxyConfig = new ProxyConfiguration();
- proxyConfig.setAuthenticationEnabled(true);
- proxyConfig.setServicePort(Optional.of(0));
- proxyConfig.setBrokerProxyAllowedTargetPorts("*");
- proxyConfig.setWebServicePort(Optional.of(0));
- proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
-
-
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
-
proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
-
- Set<String> providers = new HashSet<>();
- providers.add(BasicAuthenticationProvider.class.getName());
- proxyConfig.setAuthenticationProviders(providers);
- proxyConfig.setForwardAuthorizationCredentials(true);
+ super.init();
+
+ updateAdminClient();
+ producerBaseSetup();
+ }
+
+ @Override
+ @AfterMethod(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ void testAuthentication() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ // Step 1: Create Admin Client
+ updateAdminClient();
+ // create a client which connects to proxy and pass authData
+ String namespaceName = "my-property/my-ns";
+ String topicName = "persistent://my-property/my-ns/my-topic1";
+ String subscriptionName = "my-subscriber-name";
+ // expires after 60 seconds
+ String clientAuthParams = "entityType:client,expiryTime:" +
(System.currentTimeMillis() + 60 * 1000);
+ // expires after 60 seconds
+ String proxyAuthParams = "entityType:proxy,expiryTime:" +
(System.currentTimeMillis() + 60 * 1000);
+
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy",
+ Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
+ Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+ // Step 2: Try to use proxy Client as a normal Client - expect
exception
+ ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ proxyConfig.setAuthenticationEnabled(true);
+ proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+ proxyConfig.setWebServicePort(Optional.of(0));
+ proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+ proxyConfig.setClusterName(CLUSTER_NAME);
+
+
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+ proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
+
+ Set<String> providers = new HashSet<>();
+ providers.add(BasicAuthenticationProvider.class.getName());
+ proxyConfig.setAuthenticationProviders(providers);
+ proxyConfig.setForwardAuthorizationCredentials(true);
AuthenticationService authenticationService = new
AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
- @Cleanup
- final Authentication proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
-
proxyConfig.getBrokerClientAuthenticationParameters());
- proxyClientAuthentication.start();
- @Cleanup
- ProxyService proxyService = new ProxyService(proxyConfig,
authenticationService, proxyClientAuthentication);
-
- proxyService.start();
- final String proxyServiceUrl = proxyService.getServiceUrl();
-
- // Step 3: Pass correct client params and use multiple
connections
- @Cleanup
- PulsarClient proxyClient = createPulsarClient(proxyServiceUrl,
clientAuthParams, 3);
- proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
- proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
- proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
-
- // Step 4: Ensure that all client contexts share the same auth
provider
- Assert.assertTrue(proxyService.getClientCnxs().size() >= 3,
"expect at least 3 clients");
- proxyService.getClientCnxs().stream().forEach((cnx) -> {
- Assert.assertSame(cnx.authenticationProvider,
proxyService.getAuthenticationService().getAuthenticationProvider("BasicAuthentication"));
- });
- }
-
- private void updateAdminClient() throws PulsarClientException {
- // Expires after an hour
- String adminAuthParams = "entityType:admin,expiryTime:" +
(System.currentTimeMillis() + 3600 * 1000);
- admin =
spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
-
.authentication(BasicAuthentication.class.getName(), adminAuthParams).build());
- }
-
- private PulsarClient createPulsarClient(String proxyServiceUrl, String
authParams, int numberOfConnections) throws PulsarClientException {
- return PulsarClient.builder().serviceUrl(proxyServiceUrl)
-
.authentication(BasicAuthentication.class.getName(),
authParams).connectionsPerBroker(numberOfConnections).build();
- }
+ @Cleanup
+ final Authentication proxyClientAuthentication =
+
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+ @Cleanup
+ ProxyService proxyService = new ProxyService(proxyConfig,
authenticationService, proxyClientAuthentication);
+
+ proxyService.start();
+ final String proxyServiceUrl = proxyService.getServiceUrl();
+
+ // Step 3: Pass correct client params and use multiple connections
+ @Cleanup
+ PulsarClient proxyClient = createPulsarClient(proxyServiceUrl,
clientAuthParams, 3);
+ proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
+ proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
+ proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
+
+ // Step 4: Ensure that all client contexts share the same auth provider
+ Assert.assertTrue(proxyService.getClientCnxs().size() >= 3, "expect at
least 3 clients");
+ proxyService.getClientCnxs().stream().forEach((cnx) -> {
+ Assert.assertSame(cnx.authenticationProvider,
+
proxyService.getAuthenticationService().getAuthenticationProvider("BasicAuthentication"));
+ });
+ }
+
+ private void updateAdminClient() throws PulsarClientException {
+ // Expires after an hour
+ String adminAuthParams = "entityType:admin,expiryTime:" +
(System.currentTimeMillis() + 3600 * 1000);
+ admin.close();
+ admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+ .authentication(BasicAuthentication.class.getName(),
adminAuthParams).build());
+ }
+
+ private PulsarClient createPulsarClient(String proxyServiceUrl, String
authParams, int numberOfConnections)
+ throws PulsarClientException {
+ return PulsarClient.builder().serviceUrl(proxyServiceUrl)
+ .authentication(BasicAuthentication.class.getName(),
authParams)
+ .connectionsPerBroker(numberOfConnections).build();
+ }
+
+ @Test
+ void testClientDisconnectWhenCredentialsExpireWithoutForwardAuth() throws
Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ String namespaceName = "my-property/my-ns";
+ String topicName = "persistent://my-property/my-ns/my-topic1";
+
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy",
+ Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
+ Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+ // Important: When forwardAuthorizationCredentials=false, broker
should not authenticate original auth data
+ // because the proxy doesn't forward it. Set
authenticateOriginalAuthData=false to match this behavior.
+ conf.setAuthenticateOriginalAuthData(false);
+
+ ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ proxyConfig.setAuthenticationEnabled(true);
+ proxyConfig.setAuthenticationRefreshCheckSeconds(2); // Check every 2
seconds
+ proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+ proxyConfig.setWebServicePort(Optional.of(0));
+ proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+ proxyConfig.setClusterName(CLUSTER_NAME);
+
+ // Proxy auth with long expiry
+ String proxyAuthParams = "entityType:proxy,expiryTime:" +
(System.currentTimeMillis() + 3600 * 1000);
+
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+ proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
+
+ Set<String> providers = new HashSet<>();
+ providers.add(BasicAuthenticationProvider.class.getName());
+ proxyConfig.setAuthenticationProviders(providers);
+ proxyConfig.setForwardAuthorizationCredentials(false);
+
+ @Cleanup
+ AuthenticationService authenticationService = new
AuthenticationService(
+ PulsarConfigurationLoader.convertFrom(proxyConfig));
+ @Cleanup
+ final Authentication proxyClientAuthentication =
+
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+ @Cleanup
+ ProxyService proxyService = new ProxyService(proxyConfig,
authenticationService, proxyClientAuthentication);
+ proxyService.start();
+ final String proxyServiceUrl = proxyService.getServiceUrl();
+
+ // Create client with credentials that will expire in 3 seconds
+ long clientExpireTime = System.currentTimeMillis() + 3 * 1000;
+ String clientAuthParams = "entityType:client,expiryTime:" +
clientExpireTime;
+
+ @Cleanup
+ PulsarClient proxyClient = createPulsarClient(proxyServiceUrl,
clientAuthParams, 1);
+
+ @Cleanup
+ var producer =
+
proxyClient.newProducer(Schema.BYTES).topic(topicName).sendTimeout(5,
TimeUnit.SECONDS).create();
+ producer.send("test message".getBytes());
+
+ Awaitility.await().untilAsserted(() -> {
+ assertThatThrownBy(() -> producer.send("test message after
expiry".getBytes()))
+
.isExactlyInstanceOf(PulsarClientException.TimeoutException.class);
+ });
+
+ if (producer instanceof ProducerImpl<byte[]> producerImpl) {
+ long lastDisconnectedTimestamp =
producerImpl.getLastDisconnectedTimestamp();
+
assertThat(lastDisconnectedTimestamp).isGreaterThan(clientExpireTime);
+ }
+ }
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
index b058e4af830..56a6c64395c 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
@@ -177,37 +177,25 @@ public class ProxyRefreshAuthTest extends
ProducerConsumerBase {
PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
pulsarClient.getPartitionsForTopic(topic).get();
- Set<CompletableFuture<ClientCnx>> connections =
pulsarClientImpl.getCnxPool().getConnections();
- Awaitility.await().during(5, SECONDS).untilAsserted(() -> {
- pulsarClient.getPartitionsForTopic(topic).get();
- assertTrue(connections.stream().allMatch(n -> {
- try {
- ClientCnx clientCnx = n.get();
- long timestamp = clientCnx.getLastDisconnectedTimestamp();
- return timestamp == 0;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }));
- });
+ // Verify initial connection state
+ Set<CompletableFuture<ClientCnx>> connections =
pulsarClientImpl.getCnxPool().getConnections();
- // Force all connections from proxy to broker to close and therefore
require the proxy to re-authenticate with
- // the broker. (The client doesn't lose this connection.)
- restartBroker();
-
- // Rerun assertion to ensure that it still works
- Awaitility.await().during(5, SECONDS).untilAsserted(() -> {
- pulsarClient.getPartitionsForTopic(topic).get();
- assertTrue(connections.stream().allMatch(n -> {
- try {
- ClientCnx clientCnx = n.get();
- long timestamp = clientCnx.getLastDisconnectedTimestamp();
- return timestamp == 0;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }));
- });
+ Awaitility.await()
+ .during(5, SECONDS)
+ .untilAsserted(() -> {
+ for (CompletableFuture<ClientCnx> cf : connections) {
+ try {
+ ClientCnx clientCnx = cf.get();
+ long timestamp =
clientCnx.getLastDisconnectedTimestamp();
+ // If forwardAuthData is false, the broker cannot
see the client's authentication data.
+ // As a result, the broker cannot perform any
refresh operations on the client's auth data.
+ // Only the proxy has visibility of the client's
connection state.
+ assertTrue(forwardAuthData ? timestamp == 0 :
timestamp > 0);
+ } catch (Exception e) {
+ throw new AssertionError("Failed to get connection
state", e);
+ }
+ }
+ });
}
}