This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 69914616c4b [fix][broker] PIP-460: Authorize scalable topic binary
commands (#25635)
69914616c4b is described below
commit 69914616c4b306017f08ce39aaea1a8901fd7648
Author: Lari Hotari <[email protected]>
AuthorDate: Fri May 1 00:49:16 2026 +0300
[fix][broker] PIP-460: Authorize scalable topic binary commands (#25635)
---
.../apache/pulsar/broker/service/ServerCnx.java | 109 ++++++--
.../service/ScalableTopicBinaryAuthZTest.java | 309 +++++++++++++++++++++
.../pulsar/broker/service/ServerCnxTest.java | 86 ++++++
.../broker/service/utils/ClientChannelHelper.java | 20 ++
4 files changed, 494 insertions(+), 30 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 944148c5dc2..871231f371a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -790,23 +790,44 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
- // Create a DagWatchSession that will send the initial layout and
watch for changes
- var session = new
org.apache.pulsar.broker.service.scalable.DagWatchSession(
- sessionId, topicName, this, resources, service);
- dagWatchSessions.put(sessionId, session);
-
- session.start()
- .thenAcceptAsync(session::pushUpdate, ctx.executor())
- .exceptionally(ex -> {
- Throwable cause = ex.getCause() != null ? ex.getCause() :
ex;
- log.warn().attr("topic", topicName).exception(cause)
- .log("ScalableTopicLookup failed");
- dagWatchSessions.remove(sessionId);
- session.close();
- ctx.executor().execute(() ->
+ isTopicOperationAllowed(topicName, TopicOperation.LOOKUP,
authenticationData, originalAuthData)
+ .thenAccept(isAuthorized -> {
+ if (!isAuthorized) {
+ final String msg = "Client is not authorized to
ScalableTopicLookup";
+ log.warn()
+ .attr("principal", getPrincipal())
+ .attr("topic", topicName)
+ .log(msg);
ctx.writeAndFlush(Commands.newScalableTopicError(sessionId,
- ServerError.TopicNotFound, cause.getMessage()))
- );
+ ServerError.AuthorizationError, msg));
+ return;
+ }
+ // Create a DagWatchSession that will send the initial
layout and watch for changes
+ var session = new
org.apache.pulsar.broker.service.scalable.DagWatchSession(
+ sessionId, topicName, this, resources, service);
+ dagWatchSessions.put(sessionId, session);
+
+ session.start()
+ .thenAcceptAsync(session::pushUpdate,
ctx.executor())
+ .exceptionally(ex -> {
+ Throwable cause = ex.getCause() != null ?
ex.getCause() : ex;
+ log.warn().attr("topic",
topicName).exception(cause)
+ .log("ScalableTopicLookup failed");
+ dagWatchSessions.remove(sessionId);
+ session.close();
+ ctx.executor().execute(() ->
+
ctx.writeAndFlush(Commands.newScalableTopicError(sessionId,
+ ServerError.TopicNotFound,
cause.getMessage()))
+ );
+ return null;
+ });
+ })
+ .exceptionally(ex -> {
+ logAuthException(remoteAddress, "scalable-topic-lookup",
getPrincipal(),
+ Optional.of(topicName), ex);
+ ctx.writeAndFlush(Commands.newScalableTopicError(sessionId,
+ ServerError.AuthorizationError,
+ "Exception occurred while trying to authorize
ScalableTopicLookup"));
return null;
});
}
@@ -814,6 +835,11 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
@Override
protected void handleCommandScalableTopicClose(
CommandScalableTopicClose commandScalableTopicClose) {
+ // No per-call authorization: the session is keyed in this connection's
+ // dagWatchSessions map (per-ServerCnx), authentication is enforced at
connect,
+ // and the originating ScalableTopicLookup was authorized when the
session was
+ // created. A close for an unknown sessionId is an idempotent no-op.
Same
+ // pattern as handleCloseProducer / handleCloseConsumer.
checkArgument(state == State.Connected);
final long sessionId = commandScalableTopicClose.getSessionId();
@@ -880,23 +906,46 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
- scalableTopicService.registerConsumer(topicName, subscription,
consumerName, consumerId, this)
- .whenCompleteAsync((assignment, ex) -> {
- if (ex != null) {
- Throwable cause = ex.getCause() != null ?
ex.getCause() : ex;
- log.warn().attr("topic",
topicName).attr("subscription", subscription)
- .attr("consumerName",
consumerName).exception(cause)
- .log("ScalableTopicSubscribe failed");
+ isTopicOperationAllowed(topicName, subscription,
TopicOperation.CONSUME)
+ .thenAccept(isAuthorized -> {
+ if (!isAuthorized) {
+ final String msg = "Client is not authorized to
ScalableTopicSubscribe";
+ log.warn()
+ .attr("principal", getPrincipal())
+ .attr("topic", topicName)
+ .attr("subscription", subscription)
+ .log(msg);
getCommandSender().sendScalableTopicSubscribeError(requestId,
- ServerError.UnknownError, cause.getMessage());
+ ServerError.AuthorizationError, msg);
return;
}
- // Record the registration so we can call
onConsumerDisconnect on channelInactive.
- scalableConsumerRegistrations.put(consumerId,
- new ScalableConsumerRegistrationRef(topicName,
subscription, consumerName));
-
getCommandSender().sendScalableTopicSubscribeResponse(requestId,
-
org.apache.pulsar.broker.service.scalable.ConsumerSession.toProto(assignment));
- }, ctx.executor());
+ scalableTopicService.registerConsumer(topicName,
subscription, consumerName,
+ consumerId, this)
+ .whenCompleteAsync((assignment, ex) -> {
+ if (ex != null) {
+ Throwable cause = ex.getCause() != null ?
ex.getCause() : ex;
+ log.warn().attr("topic",
topicName).attr("subscription", subscription)
+ .attr("consumerName",
consumerName).exception(cause)
+ .log("ScalableTopicSubscribe
failed");
+
getCommandSender().sendScalableTopicSubscribeError(requestId,
+ ServerError.UnknownError,
cause.getMessage());
+ return;
+ }
+ // Record the registration so we can call
onConsumerDisconnect on channelInactive.
+ scalableConsumerRegistrations.put(consumerId,
+ new
ScalableConsumerRegistrationRef(topicName, subscription, consumerName));
+
getCommandSender().sendScalableTopicSubscribeResponse(requestId,
+
org.apache.pulsar.broker.service.scalable.ConsumerSession.toProto(assignment));
+ }, ctx.executor());
+ })
+ .exceptionally(ex -> {
+ logAuthException(remoteAddress,
"scalable-topic-subscribe", getPrincipal(),
+ Optional.of(topicName), ex);
+
getCommandSender().sendScalableTopicSubscribeError(requestId,
+ ServerError.AuthorizationError,
+ "Exception occurred while trying to authorize
ScalableTopicSubscribe");
+ return null;
+ });
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ScalableTopicBinaryAuthZTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ScalableTopicBinaryAuthZTest.java
new file mode 100644
index 00000000000..bc0c680ea26
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ScalableTopicBinaryAuthZTest.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
+import org.apache.pulsar.broker.auth.MockAuthorizationProvider;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.scalable.ScalableTopicService;
+import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.common.api.proto.CommandConnected;
+import
org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribeResponse;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicUpdate;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
+import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.protocol.Commands;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Authorization tests for the PIP-460/PIP-466 scalable-topic binary protocol
commands.
+ *
+ * <p>Verifies that:
+ * <ul>
+ * <li>{@link Commands#newScalableTopicLookup} requires {@link
TopicOperation#LOOKUP} on the
+ * parent topic.</li>
+ * <li>{@link Commands#newScalableTopicSubscribe} requires {@link
TopicOperation#CONSUME} with
+ * the subscription name passed via {@link
AuthenticationDataSubscription}.</li>
+ * <li>{@link Commands#newScalableTopicClose} performs no per-call
authorization (the session is
+ * per-connection and the originating lookup was already
authorized).</li>
+ * <li>The {@code scalableTopicsEnabled} feature flag short-circuits
authorization so a disabled
+ * feature returns {@link ServerError#NotAllowedError} without
consulting the authorization
+ * service.</li>
+ * </ul>
+ *
+ * <p>Tests drive raw protocol bytes through an {@link EmbeddedChannel} so the
assertions exercise
+ * the wire-level handler paths in {@link ServerCnx} without depending on a
client-side
+ * implementation of the new commands.
+ */
+@Test(groups = "broker")
+public class ScalableTopicBinaryAuthZTest {
+
+ private static final String AUTHORIZED_ROLE = "pass.pass";
+ private static final String UNAUTHORIZED_ROLE = "pass.fail";
+ private static final String TOPIC =
"persistent://public/default/test-scalable-topic";
+ private static final String SUBSCRIPTION = "test-scalable-sub";
+
+ private EmbeddedChannel channel;
+ private ServiceConfiguration svcConfig;
+ private ServerCnx serverCnx;
+ private PulsarTestContext pulsarTestContext;
+ private PulsarService pulsar;
+ private BrokerService brokerService;
+ private ClientChannelHelper clientChannelHelper;
+ private AuthorizationService authorizationService;
+
+ @BeforeMethod(alwaysRun = true)
+ public void setup() throws Exception {
+ svcConfig = new ServiceConfiguration();
+ svcConfig.setBrokerShutdownTimeoutMs(0L);
+ svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+ svcConfig.setKeepAliveIntervalSeconds(60);
+ svcConfig.setBacklogQuotaCheckEnabled(false);
+ svcConfig.setClusterName("use");
+
+ pulsarTestContext = PulsarTestContext.builderForNonStartableContext()
+ .config(svcConfig)
+ .spyByDefault()
+ .build();
+ pulsar = pulsarTestContext.getPulsarService();
+ brokerService = pulsarTestContext.getBrokerService();
+
+ NamespaceService namespaceService = pulsar.getNamespaceService();
+
when(namespaceService.getBundleAsync(any())).thenReturn(CompletableFuture.completedFuture(null));
+
+ clientChannelHelper = new ClientChannelHelper();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void teardown() throws Exception {
+ if (serverCnx != null) {
+ serverCnx.close();
+ }
+ if (channel != null) {
+ channel.close();
+ }
+ if (pulsarTestContext != null) {
+ pulsarTestContext.close();
+ pulsarTestContext = null;
+ }
+ }
+
+ @Test
+ public void testScalableTopicLookupRequiresLookupPermission() throws
Exception {
+ configureAuthAndConnect(UNAUTHORIZED_ROLE);
+
+ long sessionId = 1L;
+ channel.writeInbound(Commands.newScalableTopicLookup(sessionId,
TOPIC));
+
+ Object response = readResponse();
+ assertTrue(response instanceof CommandScalableTopicUpdate,
+ "Expected CommandScalableTopicUpdate, got " + response);
+ CommandScalableTopicUpdate update = (CommandScalableTopicUpdate)
response;
+ assertEquals(update.getSessionId(), sessionId);
+ assertTrue(update.hasError());
+ assertEquals(update.getError(), ServerError.AuthorizationError);
+
+ TopicName topicName = TopicName.get(TOPIC);
+ verify(authorizationService, times(1))
+ .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
UNAUTHORIZED_ROLE,
+ serverCnx.getAuthData());
+ }
+
+ @Test
+ public void testScalableTopicLookupAllowsAuthorizedRole() throws Exception
{
+ configureAuthAndConnect(AUTHORIZED_ROLE);
+
+ long sessionId = 2L;
+ channel.writeInbound(Commands.newScalableTopicLookup(sessionId,
TOPIC));
+
+ // The session may eventually emit a TopicNotFound error from the
metadata-store watch (the
+ // topic does not exist in the test harness), but it must NOT emit
AuthorizationError.
+ Object response = readResponse();
+ assertTrue(response instanceof CommandScalableTopicUpdate,
+ "Expected CommandScalableTopicUpdate, got " + response);
+ CommandScalableTopicUpdate update = (CommandScalableTopicUpdate)
response;
+ if (update.hasError()) {
+ assertEquals(update.getError(), ServerError.TopicNotFound,
+ "Authorized role must not be rejected with
AuthorizationError");
+ }
+
+ TopicName topicName = TopicName.get(TOPIC);
+ verify(authorizationService, times(1))
+ .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
AUTHORIZED_ROLE,
+ serverCnx.getAuthData());
+ }
+
+ @Test
+ public void testScalableTopicSubscribeRequiresConsumePermission() throws
Exception {
+ configureAuthAndConnect(UNAUTHORIZED_ROLE);
+ // Pre-stub the scalable-topic service so authorization (not service
availability) is what
+ // determines the response. The mock is never invoked because
authorization fails first.
+
when(brokerService.getScalableTopicService()).thenReturn(mock(ScalableTopicService.class));
+
+ long requestId = 10L;
+ channel.writeInbound(Commands.newScalableTopicSubscribe(requestId,
TOPIC, SUBSCRIPTION,
+ "test-consumer", 1L, ScalableConsumerType.STREAM));
+
+ Object response = readResponse();
+ assertTrue(response instanceof CommandScalableTopicSubscribeResponse,
+ "Expected CommandScalableTopicSubscribeResponse, got " +
response);
+ CommandScalableTopicSubscribeResponse subResp =
(CommandScalableTopicSubscribeResponse) response;
+ assertEquals(subResp.getRequestId(), requestId);
+ assertTrue(subResp.hasError());
+ assertEquals(subResp.getError(), ServerError.AuthorizationError);
+
+ TopicName topicName = TopicName.get(TOPIC);
+ verify(authorizationService, times(1)).allowTopicOperationAsync(
+ eq(topicName), eq(TopicOperation.CONSUME),
eq(UNAUTHORIZED_ROLE),
+ argThat(arg -> arg instanceof AuthenticationDataSubscription
+ &&
SUBSCRIPTION.equals(((AuthenticationDataSubscription) arg).getSubscription())));
+ }
+
+ @Test
+ public void testScalableTopicCloseDoesNotCallAuthorization() throws
Exception {
+ configureAuthAndConnect(UNAUTHORIZED_ROLE);
+
+ // Send a Close for an unknown sessionId. The handler must treat this
as a no-op:
+ // no response, no AuthorizationError, and the AuthorizationService
must not be consulted.
+ channel.writeInbound(Commands.newScalableTopicClose(99999L));
+ channel.runPendingTasks();
+
+ assertTrue(channel.outboundMessages().isEmpty(),
+ "ScalableTopicClose for an unknown session must not emit any
response");
+ verify(authorizationService, never()).allowTopicOperationAsync(any(),
any(), any(), any());
+ }
+
+ @Test
+ public void testScalableTopicLookupSkipsAuthzWhenFeatureDisabled() throws
Exception {
+ svcConfig.setScalableTopicsEnabled(false);
+ configureAuthAndConnect(AUTHORIZED_ROLE);
+
+ long sessionId = 3L;
+ channel.writeInbound(Commands.newScalableTopicLookup(sessionId,
TOPIC));
+
+ Object response = readResponse();
+ assertTrue(response instanceof CommandScalableTopicUpdate);
+ CommandScalableTopicUpdate update = (CommandScalableTopicUpdate)
response;
+ assertEquals(update.getSessionId(), sessionId);
+ assertTrue(update.hasError());
+ assertEquals(update.getError(), ServerError.NotAllowedError,
+ "Disabled feature must short-circuit before authorization");
+
+ verify(authorizationService, never()).allowTopicOperationAsync(any(),
any(), any(), any());
+ }
+
+ @Test
+ public void testScalableTopicSubscribeSkipsAuthzWhenFeatureDisabled()
throws Exception {
+ svcConfig.setScalableTopicsEnabled(false);
+ configureAuthAndConnect(AUTHORIZED_ROLE);
+
+ long requestId = 11L;
+ channel.writeInbound(Commands.newScalableTopicSubscribe(requestId,
TOPIC, SUBSCRIPTION,
+ "test-consumer", 1L, ScalableConsumerType.STREAM));
+
+ Object response = readResponse();
+ assertTrue(response instanceof CommandScalableTopicSubscribeResponse);
+ CommandScalableTopicSubscribeResponse subResp =
(CommandScalableTopicSubscribeResponse) response;
+ assertEquals(subResp.getRequestId(), requestId);
+ assertTrue(subResp.hasError());
+ assertEquals(subResp.getError(), ServerError.NotAllowedError);
+
+ verify(authorizationService, never()).allowTopicOperationAsync(any(),
any(), any(), any());
+ }
+
+ private void configureAuthAndConnect(String role) throws Exception {
+ AuthenticationService authenticationService =
mock(AuthenticationService.class);
+ AuthenticationProvider authenticationProvider = new
MockAuthenticationProvider();
+ String authMethodName = authenticationProvider.getAuthMethodName();
+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+ svcConfig.setAuthenticationEnabled(true);
+
+
svcConfig.setAuthorizationProvider(MockAuthorizationProvider.class.getName());
+ authorizationService =
spyWithClassAndConstructorArgsRecordingInvocations(
+ AuthorizationService.class, svcConfig,
pulsarTestContext.getPulsarResources());
+
when(brokerService.getAuthorizationService()).thenReturn(authorizationService);
+ svcConfig.setAuthorizationEnabled(true);
+
+ resetChannel();
+ ByteBuf connect = Commands.newConnect(authMethodName, role, "test");
+ channel.writeInbound(connect);
+
+ Object response = readResponse();
+ assertTrue(response instanceof CommandConnected, "Expected
CommandConnected, got " + response);
+ }
+
+ private void resetChannel() throws Exception {
+ if (channel != null && channel.isActive()) {
+ serverCnx.close();
+ channel.close().get();
+ }
+ serverCnx = new ServerCnx(pulsar);
+ serverCnx.setAuthRole("");
+ channel = new EmbeddedChannel(
+ new LengthFieldBasedFrameDecoder(5 * 1024 * 1024, 0, 4, 0, 4),
+ (ChannelHandler) serverCnx);
+ }
+
+ private Object readResponse() throws Exception {
+ long sleepMs = 10;
+ long iterations = TimeUnit.SECONDS.toMillis(10) / sleepMs;
+ for (int i = 0; i < iterations; i++) {
+ channel.runPendingTasks();
+ if (!channel.outboundMessages().isEmpty()) {
+ Object outbound = channel.outboundMessages().remove();
+ Object cmd = clientChannelHelper.getCommand(outbound);
+ if (cmd != null) {
+ return cmd;
+ }
+ }
+ Thread.sleep(sleepMs);
+ }
+ throw new AssertionError("Timed out waiting for response");
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index e37556a41a4..7de4c198f30 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -138,6 +138,8 @@ import
org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.api.proto.CommandPing;
import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
+import
org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribeResponse;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicUpdate;
import org.apache.pulsar.common.api.proto.CommandSendError;
import org.apache.pulsar.common.api.proto.CommandSendReceipt;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
@@ -147,6 +149,7 @@ import org.apache.pulsar.common.api.proto.CommandSuccess;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.Subscription;
import org.apache.pulsar.common.api.proto.TxnAction;
@@ -1505,6 +1508,89 @@ public class ServerCnxTest {
}));
}
+ /**
+ * PIP-460/PIP-466: ScalableTopicLookup must require {@link
TopicOperation#LOOKUP} on the parent
+ * topic, and ScalableTopicSubscribe must require {@link
TopicOperation#CONSUME} with the
+ * subscription. ScalableTopicClose carries no authorization (the session
is per-connection).
+ */
+ @Test
+ public void testScalableTopicCommandsRequireTopicAuthorization() throws
Exception {
+ AuthenticationService authenticationService =
mock(AuthenticationService.class);
+ AuthenticationProvider authenticationProvider = new
MockAuthenticationProvider();
+ String authMethodName = authenticationProvider.getAuthMethodName();
+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+ svcConfig.setAuthenticationEnabled(true);
+
+
svcConfig.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider");
+ AuthorizationService authorizationService =
+
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class,
svcConfig,
+ pulsarTestContext.getPulsarResources());
+
when(brokerService.getAuthorizationService()).thenReturn(authorizationService);
+ svcConfig.setAuthorizationEnabled(true);
+
+ // Pre-stub the scalable-topic prerequisites so authorization (not
service availability)
+ // is what determines the response. The mocks are never invoked
because authorization fails first.
+ when(brokerService.getScalableTopicService()).thenReturn(
+
mock(org.apache.pulsar.broker.service.scalable.ScalableTopicService.class));
+
+ resetChannel();
+ assertTrue(channel.isActive());
+ assertEquals(serverCnx.getState(), State.Start);
+
+ // Connect with a role that passes authentication and fails
authorization in MockAuthorizationProvider.
+ String clientRole = "pass.fail";
+ ByteBuf connect = Commands.newConnect(authMethodName, clientRole,
"test");
+ channel.writeInbound(connect);
+ Object connectResponse = getResponse();
+ assertTrue(connectResponse instanceof CommandConnected);
+
+ String topicStr = "persistent://public/default/test-scalable-topic";
+ TopicName topicName = TopicName.get(topicStr);
+
+ // ScalableTopicLookup -> CommandScalableTopicUpdate with
AuthorizationError
+ long sessionId = 100L;
+ ByteBuf lookup = Commands.newScalableTopicLookup(sessionId, topicStr);
+ channel.writeInbound(lookup);
+ Object lookupResponse = getResponse();
+ assertTrue(lookupResponse instanceof CommandScalableTopicUpdate);
+ CommandScalableTopicUpdate update = (CommandScalableTopicUpdate)
lookupResponse;
+ assertEquals(update.getSessionId(), sessionId);
+ assertTrue(update.hasError());
+ assertEquals(update.getError(), ServerError.AuthorizationError);
+ verify(authorizationService, times(1))
+ .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP,
clientRole, serverCnx.getAuthData());
+
+ // ScalableTopicSubscribe -> CommandScalableTopicSubscribeResponse
with AuthorizationError
+ String subscriptionName = "test-scalable-sub";
+ long requestId = 200L;
+ ByteBuf subscribe = Commands.newScalableTopicSubscribe(requestId,
topicStr, subscriptionName,
+ "test-consumer", 1L, ScalableConsumerType.STREAM);
+ channel.writeInbound(subscribe);
+ Object subscribeResponse = getResponse();
+ assertTrue(subscribeResponse instanceof
CommandScalableTopicSubscribeResponse);
+ CommandScalableTopicSubscribeResponse subResp =
(CommandScalableTopicSubscribeResponse) subscribeResponse;
+ assertEquals(subResp.getRequestId(), requestId);
+ assertTrue(subResp.hasError());
+ assertEquals(subResp.getError(), ServerError.AuthorizationError);
+ verify(authorizationService, times(1)).allowTopicOperationAsync(
+ eq(topicName), eq(TopicOperation.CONSUME), eq(clientRole),
argThat(arg -> {
+ assertTrue(arg instanceof AuthenticationDataSubscription);
+ AuthenticationDataSubscription authData =
(AuthenticationDataSubscription) arg;
+ assertEquals(authData.getSubscription(), subscriptionName);
+ return true;
+ }));
+
+ // ScalableTopicClose for an unknown sessionId is an idempotent no-op:
no response, no error.
+ ByteBuf close = Commands.newScalableTopicClose(99999L);
+ channel.writeInbound(close);
+ channel.runPendingTasks();
+ assertTrue(channel.outboundMessages().isEmpty(),
+ "ScalableTopicClose for an unknown session must not emit any
response");
+
+ channel.finish();
+ }
+
@Test
public void testRefreshOriginalPrincipalWithAuthDataForwardedFromProxy()
throws Exception {
AuthenticationService authenticationService =
mock(AuthenticationService.class);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index d6210767ed1..32382cdb6fe 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -45,6 +45,9 @@ import org.apache.pulsar.common.api.proto.CommandPing;
import org.apache.pulsar.common.api.proto.CommandPong;
import org.apache.pulsar.common.api.proto.CommandProducer;
import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicAssignmentUpdate;
+import
org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribeResponse;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicUpdate;
import org.apache.pulsar.common.api.proto.CommandSend;
import org.apache.pulsar.common.api.proto.CommandSendError;
import org.apache.pulsar.common.api.proto.CommandSendReceipt;
@@ -218,6 +221,23 @@ public class ClientChannelHelper {
protected void handlePong(CommandPong pong) {
return;
}
+
+ @Override
+ protected void
handleCommandScalableTopicUpdate(CommandScalableTopicUpdate update) {
+ queue.offer(new CommandScalableTopicUpdate().copyFrom(update));
+ }
+
+ @Override
+ protected void handleCommandScalableTopicSubscribeResponse(
+ CommandScalableTopicSubscribeResponse response) {
+ queue.offer(new
CommandScalableTopicSubscribeResponse().copyFrom(response));
+ }
+
+ @Override
+ protected void handleCommandScalableTopicAssignmentUpdate(
+ CommandScalableTopicAssignmentUpdate update) {
+ queue.offer(new
CommandScalableTopicAssignmentUpdate().copyFrom(update));
+ }
};
}