This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 69914616c4b [fix][broker] PIP-460: Authorize scalable topic binary 
commands (#25635)
69914616c4b is described below

commit 69914616c4b306017f08ce39aaea1a8901fd7648
Author: Lari Hotari <[email protected]>
AuthorDate: Fri May 1 00:49:16 2026 +0300

    [fix][broker] PIP-460: Authorize scalable topic binary commands (#25635)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 109 ++++++--
 .../service/ScalableTopicBinaryAuthZTest.java      | 309 +++++++++++++++++++++
 .../pulsar/broker/service/ServerCnxTest.java       |  86 ++++++
 .../broker/service/utils/ClientChannelHelper.java  |  20 ++
 4 files changed, 494 insertions(+), 30 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 944148c5dc2..871231f371a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -790,23 +790,44 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return;
         }
 
-        // Create a DagWatchSession that will send the initial layout and 
watch for changes
-        var session = new 
org.apache.pulsar.broker.service.scalable.DagWatchSession(
-                sessionId, topicName, this, resources, service);
-        dagWatchSessions.put(sessionId, session);
-
-        session.start()
-                .thenAcceptAsync(session::pushUpdate, ctx.executor())
-                .exceptionally(ex -> {
-                    Throwable cause = ex.getCause() != null ? ex.getCause() : 
ex;
-                    log.warn().attr("topic", topicName).exception(cause)
-                            .log("ScalableTopicLookup failed");
-                    dagWatchSessions.remove(sessionId);
-                    session.close();
-                    ctx.executor().execute(() ->
+        isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, 
authenticationData, originalAuthData)
+                .thenAccept(isAuthorized -> {
+                    if (!isAuthorized) {
+                        final String msg = "Client is not authorized to 
ScalableTopicLookup";
+                        log.warn()
+                                .attr("principal", getPrincipal())
+                                .attr("topic", topicName)
+                                .log(msg);
                         
ctx.writeAndFlush(Commands.newScalableTopicError(sessionId,
-                                ServerError.TopicNotFound, cause.getMessage()))
-                    );
+                                ServerError.AuthorizationError, msg));
+                        return;
+                    }
+                    // Create a DagWatchSession that will send the initial 
layout and watch for changes
+                    var session = new 
org.apache.pulsar.broker.service.scalable.DagWatchSession(
+                            sessionId, topicName, this, resources, service);
+                    dagWatchSessions.put(sessionId, session);
+
+                    session.start()
+                            .thenAcceptAsync(session::pushUpdate, 
ctx.executor())
+                            .exceptionally(ex -> {
+                                Throwable cause = ex.getCause() != null ? 
ex.getCause() : ex;
+                                log.warn().attr("topic", 
topicName).exception(cause)
+                                        .log("ScalableTopicLookup failed");
+                                dagWatchSessions.remove(sessionId);
+                                session.close();
+                                ctx.executor().execute(() ->
+                                    
ctx.writeAndFlush(Commands.newScalableTopicError(sessionId,
+                                            ServerError.TopicNotFound, 
cause.getMessage()))
+                                );
+                                return null;
+                            });
+                })
+                .exceptionally(ex -> {
+                    logAuthException(remoteAddress, "scalable-topic-lookup", 
getPrincipal(),
+                            Optional.of(topicName), ex);
+                    ctx.writeAndFlush(Commands.newScalableTopicError(sessionId,
+                            ServerError.AuthorizationError,
+                            "Exception occurred while trying to authorize 
ScalableTopicLookup"));
                     return null;
                 });
     }
@@ -814,6 +835,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     @Override
     protected void handleCommandScalableTopicClose(
             CommandScalableTopicClose commandScalableTopicClose) {
+        // No per-call authorization: the session is keyed in this connection's
+        // dagWatchSessions map (per-ServerCnx), authentication is enforced at 
connect,
+        // and the originating ScalableTopicLookup was authorized when the 
session was
+        // created. A close for an unknown sessionId is an idempotent no-op. 
Same
+        // pattern as handleCloseProducer / handleCloseConsumer.
         checkArgument(state == State.Connected);
 
         final long sessionId = commandScalableTopicClose.getSessionId();
@@ -880,23 +906,46 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return;
         }
 
-        scalableTopicService.registerConsumer(topicName, subscription, 
consumerName, consumerId, this)
-                .whenCompleteAsync((assignment, ex) -> {
-                    if (ex != null) {
-                        Throwable cause = ex.getCause() != null ? 
ex.getCause() : ex;
-                        log.warn().attr("topic", 
topicName).attr("subscription", subscription)
-                                .attr("consumerName", 
consumerName).exception(cause)
-                                .log("ScalableTopicSubscribe failed");
+        isTopicOperationAllowed(topicName, subscription, 
TopicOperation.CONSUME)
+                .thenAccept(isAuthorized -> {
+                    if (!isAuthorized) {
+                        final String msg = "Client is not authorized to 
ScalableTopicSubscribe";
+                        log.warn()
+                                .attr("principal", getPrincipal())
+                                .attr("topic", topicName)
+                                .attr("subscription", subscription)
+                                .log(msg);
                         
getCommandSender().sendScalableTopicSubscribeError(requestId,
-                                ServerError.UnknownError, cause.getMessage());
+                                ServerError.AuthorizationError, msg);
                         return;
                     }
-                    // Record the registration so we can call 
onConsumerDisconnect on channelInactive.
-                    scalableConsumerRegistrations.put(consumerId,
-                            new ScalableConsumerRegistrationRef(topicName, 
subscription, consumerName));
-                    
getCommandSender().sendScalableTopicSubscribeResponse(requestId,
-                            
org.apache.pulsar.broker.service.scalable.ConsumerSession.toProto(assignment));
-                }, ctx.executor());
+                    scalableTopicService.registerConsumer(topicName, 
subscription, consumerName,
+                                    consumerId, this)
+                            .whenCompleteAsync((assignment, ex) -> {
+                                if (ex != null) {
+                                    Throwable cause = ex.getCause() != null ? 
ex.getCause() : ex;
+                                    log.warn().attr("topic", 
topicName).attr("subscription", subscription)
+                                            .attr("consumerName", 
consumerName).exception(cause)
+                                            .log("ScalableTopicSubscribe 
failed");
+                                    
getCommandSender().sendScalableTopicSubscribeError(requestId,
+                                            ServerError.UnknownError, 
cause.getMessage());
+                                    return;
+                                }
+                                // Record the registration so we can call 
onConsumerDisconnect on channelInactive.
+                                scalableConsumerRegistrations.put(consumerId,
+                                        new 
ScalableConsumerRegistrationRef(topicName, subscription, consumerName));
+                                
getCommandSender().sendScalableTopicSubscribeResponse(requestId,
+                                        
org.apache.pulsar.broker.service.scalable.ConsumerSession.toProto(assignment));
+                            }, ctx.executor());
+                })
+                .exceptionally(ex -> {
+                    logAuthException(remoteAddress, 
"scalable-topic-subscribe", getPrincipal(),
+                            Optional.of(topicName), ex);
+                    
getCommandSender().sendScalableTopicSubscribeError(requestId,
+                            ServerError.AuthorizationError,
+                            "Exception occurred while trying to authorize 
ScalableTopicSubscribe");
+                    return null;
+                });
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ScalableTopicBinaryAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ScalableTopicBinaryAuthZTest.java
new file mode 100644
index 00000000000..bc0c680ea26
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ScalableTopicBinaryAuthZTest.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
+import org.apache.pulsar.broker.auth.MockAuthorizationProvider;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.scalable.ScalableTopicService;
+import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.common.api.proto.CommandConnected;
+import 
org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribeResponse;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicUpdate;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
+import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.protocol.Commands;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Authorization tests for the PIP-460/PIP-466 scalable-topic binary protocol 
commands.
+ *
+ * <p>Verifies that:
+ * <ul>
+ *   <li>{@link Commands#newScalableTopicLookup} requires {@link 
TopicOperation#LOOKUP} on the
+ *       parent topic.</li>
+ *   <li>{@link Commands#newScalableTopicSubscribe} requires {@link 
TopicOperation#CONSUME} with
+ *       the subscription name passed via {@link 
AuthenticationDataSubscription}.</li>
+ *   <li>{@link Commands#newScalableTopicClose} performs no per-call 
authorization (the session is
+ *       per-connection and the originating lookup was already 
authorized).</li>
+ *   <li>The {@code scalableTopicsEnabled} feature flag short-circuits 
authorization so a disabled
+ *       feature returns {@link ServerError#NotAllowedError} without 
consulting the authorization
+ *       service.</li>
+ * </ul>
+ *
+ * <p>Tests drive raw protocol bytes through an {@link EmbeddedChannel} so the 
assertions exercise
+ * the wire-level handler paths in {@link ServerCnx} without depending on a 
client-side
+ * implementation of the new commands.
+ */
+@Test(groups = "broker")
+public class ScalableTopicBinaryAuthZTest {
+
+    private static final String AUTHORIZED_ROLE = "pass.pass";
+    private static final String UNAUTHORIZED_ROLE = "pass.fail";
+    private static final String TOPIC = 
"persistent://public/default/test-scalable-topic";
+    private static final String SUBSCRIPTION = "test-scalable-sub";
+
+    private EmbeddedChannel channel;
+    private ServiceConfiguration svcConfig;
+    private ServerCnx serverCnx;
+    private PulsarTestContext pulsarTestContext;
+    private PulsarService pulsar;
+    private BrokerService brokerService;
+    private ClientChannelHelper clientChannelHelper;
+    private AuthorizationService authorizationService;
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup() throws Exception {
+        svcConfig = new ServiceConfiguration();
+        svcConfig.setBrokerShutdownTimeoutMs(0L);
+        svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+        svcConfig.setKeepAliveIntervalSeconds(60);
+        svcConfig.setBacklogQuotaCheckEnabled(false);
+        svcConfig.setClusterName("use");
+
+        pulsarTestContext = PulsarTestContext.builderForNonStartableContext()
+                .config(svcConfig)
+                .spyByDefault()
+                .build();
+        pulsar = pulsarTestContext.getPulsarService();
+        brokerService = pulsarTestContext.getBrokerService();
+
+        NamespaceService namespaceService = pulsar.getNamespaceService();
+        
when(namespaceService.getBundleAsync(any())).thenReturn(CompletableFuture.completedFuture(null));
+
+        clientChannelHelper = new ClientChannelHelper();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void teardown() throws Exception {
+        if (serverCnx != null) {
+            serverCnx.close();
+        }
+        if (channel != null) {
+            channel.close();
+        }
+        if (pulsarTestContext != null) {
+            pulsarTestContext.close();
+            pulsarTestContext = null;
+        }
+    }
+
+    @Test
+    public void testScalableTopicLookupRequiresLookupPermission() throws 
Exception {
+        configureAuthAndConnect(UNAUTHORIZED_ROLE);
+
+        long sessionId = 1L;
+        channel.writeInbound(Commands.newScalableTopicLookup(sessionId, 
TOPIC));
+
+        Object response = readResponse();
+        assertTrue(response instanceof CommandScalableTopicUpdate,
+                "Expected CommandScalableTopicUpdate, got " + response);
+        CommandScalableTopicUpdate update = (CommandScalableTopicUpdate) 
response;
+        assertEquals(update.getSessionId(), sessionId);
+        assertTrue(update.hasError());
+        assertEquals(update.getError(), ServerError.AuthorizationError);
+
+        TopicName topicName = TopicName.get(TOPIC);
+        verify(authorizationService, times(1))
+                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, 
UNAUTHORIZED_ROLE,
+                        serverCnx.getAuthData());
+    }
+
+    @Test
+    public void testScalableTopicLookupAllowsAuthorizedRole() throws Exception 
{
+        configureAuthAndConnect(AUTHORIZED_ROLE);
+
+        long sessionId = 2L;
+        channel.writeInbound(Commands.newScalableTopicLookup(sessionId, 
TOPIC));
+
+        // The session may eventually emit a TopicNotFound error from the 
metadata-store watch (the
+        // topic does not exist in the test harness), but it must NOT emit 
AuthorizationError.
+        Object response = readResponse();
+        assertTrue(response instanceof CommandScalableTopicUpdate,
+                "Expected CommandScalableTopicUpdate, got " + response);
+        CommandScalableTopicUpdate update = (CommandScalableTopicUpdate) 
response;
+        if (update.hasError()) {
+            assertEquals(update.getError(), ServerError.TopicNotFound,
+                    "Authorized role must not be rejected with 
AuthorizationError");
+        }
+
+        TopicName topicName = TopicName.get(TOPIC);
+        verify(authorizationService, times(1))
+                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, 
AUTHORIZED_ROLE,
+                        serverCnx.getAuthData());
+    }
+
+    @Test
+    public void testScalableTopicSubscribeRequiresConsumePermission() throws 
Exception {
+        configureAuthAndConnect(UNAUTHORIZED_ROLE);
+        // Pre-stub the scalable-topic service so authorization (not service 
availability) is what
+        // determines the response. The mock is never invoked because 
authorization fails first.
+        
when(brokerService.getScalableTopicService()).thenReturn(mock(ScalableTopicService.class));
+
+        long requestId = 10L;
+        channel.writeInbound(Commands.newScalableTopicSubscribe(requestId, 
TOPIC, SUBSCRIPTION,
+                "test-consumer", 1L, ScalableConsumerType.STREAM));
+
+        Object response = readResponse();
+        assertTrue(response instanceof CommandScalableTopicSubscribeResponse,
+                "Expected CommandScalableTopicSubscribeResponse, got " + 
response);
+        CommandScalableTopicSubscribeResponse subResp = 
(CommandScalableTopicSubscribeResponse) response;
+        assertEquals(subResp.getRequestId(), requestId);
+        assertTrue(subResp.hasError());
+        assertEquals(subResp.getError(), ServerError.AuthorizationError);
+
+        TopicName topicName = TopicName.get(TOPIC);
+        verify(authorizationService, times(1)).allowTopicOperationAsync(
+                eq(topicName), eq(TopicOperation.CONSUME), 
eq(UNAUTHORIZED_ROLE),
+                argThat(arg -> arg instanceof AuthenticationDataSubscription
+                        && 
SUBSCRIPTION.equals(((AuthenticationDataSubscription) arg).getSubscription())));
+    }
+
+    @Test
+    public void testScalableTopicCloseDoesNotCallAuthorization() throws 
Exception {
+        configureAuthAndConnect(UNAUTHORIZED_ROLE);
+
+        // Send a Close for an unknown sessionId. The handler must treat this 
as a no-op:
+        // no response, no AuthorizationError, and the AuthorizationService 
must not be consulted.
+        channel.writeInbound(Commands.newScalableTopicClose(99999L));
+        channel.runPendingTasks();
+
+        assertTrue(channel.outboundMessages().isEmpty(),
+                "ScalableTopicClose for an unknown session must not emit any 
response");
+        verify(authorizationService, never()).allowTopicOperationAsync(any(), 
any(), any(), any());
+    }
+
+    @Test
+    public void testScalableTopicLookupSkipsAuthzWhenFeatureDisabled() throws 
Exception {
+        svcConfig.setScalableTopicsEnabled(false);
+        configureAuthAndConnect(AUTHORIZED_ROLE);
+
+        long sessionId = 3L;
+        channel.writeInbound(Commands.newScalableTopicLookup(sessionId, 
TOPIC));
+
+        Object response = readResponse();
+        assertTrue(response instanceof CommandScalableTopicUpdate);
+        CommandScalableTopicUpdate update = (CommandScalableTopicUpdate) 
response;
+        assertEquals(update.getSessionId(), sessionId);
+        assertTrue(update.hasError());
+        assertEquals(update.getError(), ServerError.NotAllowedError,
+                "Disabled feature must short-circuit before authorization");
+
+        verify(authorizationService, never()).allowTopicOperationAsync(any(), 
any(), any(), any());
+    }
+
+    @Test
+    public void testScalableTopicSubscribeSkipsAuthzWhenFeatureDisabled() 
throws Exception {
+        svcConfig.setScalableTopicsEnabled(false);
+        configureAuthAndConnect(AUTHORIZED_ROLE);
+
+        long requestId = 11L;
+        channel.writeInbound(Commands.newScalableTopicSubscribe(requestId, 
TOPIC, SUBSCRIPTION,
+                "test-consumer", 1L, ScalableConsumerType.STREAM));
+
+        Object response = readResponse();
+        assertTrue(response instanceof CommandScalableTopicSubscribeResponse);
+        CommandScalableTopicSubscribeResponse subResp = 
(CommandScalableTopicSubscribeResponse) response;
+        assertEquals(subResp.getRequestId(), requestId);
+        assertTrue(subResp.hasError());
+        assertEquals(subResp.getError(), ServerError.NotAllowedError);
+
+        verify(authorizationService, never()).allowTopicOperationAsync(any(), 
any(), any(), any());
+    }
+
+    private void configureAuthAndConnect(String role) throws Exception {
+        AuthenticationService authenticationService = 
mock(AuthenticationService.class);
+        AuthenticationProvider authenticationProvider = new 
MockAuthenticationProvider();
+        String authMethodName = authenticationProvider.getAuthMethodName();
+        
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+        
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+        svcConfig.setAuthenticationEnabled(true);
+
+        
svcConfig.setAuthorizationProvider(MockAuthorizationProvider.class.getName());
+        authorizationService = 
spyWithClassAndConstructorArgsRecordingInvocations(
+                AuthorizationService.class, svcConfig, 
pulsarTestContext.getPulsarResources());
+        
when(brokerService.getAuthorizationService()).thenReturn(authorizationService);
+        svcConfig.setAuthorizationEnabled(true);
+
+        resetChannel();
+        ByteBuf connect = Commands.newConnect(authMethodName, role, "test");
+        channel.writeInbound(connect);
+
+        Object response = readResponse();
+        assertTrue(response instanceof CommandConnected, "Expected 
CommandConnected, got " + response);
+    }
+
+    private void resetChannel() throws Exception {
+        if (channel != null && channel.isActive()) {
+            serverCnx.close();
+            channel.close().get();
+        }
+        serverCnx = new ServerCnx(pulsar);
+        serverCnx.setAuthRole("");
+        channel = new EmbeddedChannel(
+                new LengthFieldBasedFrameDecoder(5 * 1024 * 1024, 0, 4, 0, 4),
+                (ChannelHandler) serverCnx);
+    }
+
+    private Object readResponse() throws Exception {
+        long sleepMs = 10;
+        long iterations = TimeUnit.SECONDS.toMillis(10) / sleepMs;
+        for (int i = 0; i < iterations; i++) {
+            channel.runPendingTasks();
+            if (!channel.outboundMessages().isEmpty()) {
+                Object outbound = channel.outboundMessages().remove();
+                Object cmd = clientChannelHelper.getCommand(outbound);
+                if (cmd != null) {
+                    return cmd;
+                }
+            }
+            Thread.sleep(sleepMs);
+        }
+        throw new AssertionError("Timed out waiting for response");
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index e37556a41a4..7de4c198f30 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -138,6 +138,8 @@ import 
org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
 import 
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
 import org.apache.pulsar.common.api.proto.CommandPing;
 import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
+import 
org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribeResponse;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicUpdate;
 import org.apache.pulsar.common.api.proto.CommandSendError;
 import org.apache.pulsar.common.api.proto.CommandSendReceipt;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
@@ -147,6 +149,7 @@ import org.apache.pulsar.common.api.proto.CommandSuccess;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.api.proto.Subscription;
 import org.apache.pulsar.common.api.proto.TxnAction;
@@ -1505,6 +1508,89 @@ public class ServerCnxTest {
                 }));
     }
 
+    /**
+     * PIP-460/PIP-466: ScalableTopicLookup must require {@link 
TopicOperation#LOOKUP} on the parent
+     * topic, and ScalableTopicSubscribe must require {@link 
TopicOperation#CONSUME} with the
+     * subscription. ScalableTopicClose carries no authorization (the session 
is per-connection).
+     */
+    @Test
+    public void testScalableTopicCommandsRequireTopicAuthorization() throws 
Exception {
+        AuthenticationService authenticationService = 
mock(AuthenticationService.class);
+        AuthenticationProvider authenticationProvider = new 
MockAuthenticationProvider();
+        String authMethodName = authenticationProvider.getAuthMethodName();
+        
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+        
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+        svcConfig.setAuthenticationEnabled(true);
+
+        
svcConfig.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider");
+        AuthorizationService authorizationService =
+                
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class, 
svcConfig,
+                        pulsarTestContext.getPulsarResources());
+        
when(brokerService.getAuthorizationService()).thenReturn(authorizationService);
+        svcConfig.setAuthorizationEnabled(true);
+
+        // Pre-stub the scalable-topic prerequisites so authorization (not 
service availability)
+        // is what determines the response. The mocks are never invoked 
because authorization fails first.
+        when(brokerService.getScalableTopicService()).thenReturn(
+                
mock(org.apache.pulsar.broker.service.scalable.ScalableTopicService.class));
+
+        resetChannel();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        // Connect with a role that passes authentication and fails 
authorization in MockAuthorizationProvider.
+        String clientRole = "pass.fail";
+        ByteBuf connect = Commands.newConnect(authMethodName, clientRole, 
"test");
+        channel.writeInbound(connect);
+        Object connectResponse = getResponse();
+        assertTrue(connectResponse instanceof CommandConnected);
+
+        String topicStr = "persistent://public/default/test-scalable-topic";
+        TopicName topicName = TopicName.get(topicStr);
+
+        // ScalableTopicLookup -> CommandScalableTopicUpdate with 
AuthorizationError
+        long sessionId = 100L;
+        ByteBuf lookup = Commands.newScalableTopicLookup(sessionId, topicStr);
+        channel.writeInbound(lookup);
+        Object lookupResponse = getResponse();
+        assertTrue(lookupResponse instanceof CommandScalableTopicUpdate);
+        CommandScalableTopicUpdate update = (CommandScalableTopicUpdate) 
lookupResponse;
+        assertEquals(update.getSessionId(), sessionId);
+        assertTrue(update.hasError());
+        assertEquals(update.getError(), ServerError.AuthorizationError);
+        verify(authorizationService, times(1))
+                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, 
clientRole, serverCnx.getAuthData());
+
+        // ScalableTopicSubscribe -> CommandScalableTopicSubscribeResponse 
with AuthorizationError
+        String subscriptionName = "test-scalable-sub";
+        long requestId = 200L;
+        ByteBuf subscribe = Commands.newScalableTopicSubscribe(requestId, 
topicStr, subscriptionName,
+                "test-consumer", 1L, ScalableConsumerType.STREAM);
+        channel.writeInbound(subscribe);
+        Object subscribeResponse = getResponse();
+        assertTrue(subscribeResponse instanceof 
CommandScalableTopicSubscribeResponse);
+        CommandScalableTopicSubscribeResponse subResp = 
(CommandScalableTopicSubscribeResponse) subscribeResponse;
+        assertEquals(subResp.getRequestId(), requestId);
+        assertTrue(subResp.hasError());
+        assertEquals(subResp.getError(), ServerError.AuthorizationError);
+        verify(authorizationService, times(1)).allowTopicOperationAsync(
+                eq(topicName), eq(TopicOperation.CONSUME), eq(clientRole), 
argThat(arg -> {
+                    assertTrue(arg instanceof AuthenticationDataSubscription);
+                    AuthenticationDataSubscription authData = 
(AuthenticationDataSubscription) arg;
+                    assertEquals(authData.getSubscription(), subscriptionName);
+                    return true;
+                }));
+
+        // ScalableTopicClose for an unknown sessionId is an idempotent no-op: 
no response, no error.
+        ByteBuf close = Commands.newScalableTopicClose(99999L);
+        channel.writeInbound(close);
+        channel.runPendingTasks();
+        assertTrue(channel.outboundMessages().isEmpty(),
+                "ScalableTopicClose for an unknown session must not emit any 
response");
+
+        channel.finish();
+    }
+
     @Test
     public void testRefreshOriginalPrincipalWithAuthDataForwardedFromProxy() 
throws Exception {
         AuthenticationService authenticationService = 
mock(AuthenticationService.class);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index d6210767ed1..32382cdb6fe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -45,6 +45,9 @@ import org.apache.pulsar.common.api.proto.CommandPing;
 import org.apache.pulsar.common.api.proto.CommandPong;
 import org.apache.pulsar.common.api.proto.CommandProducer;
 import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicAssignmentUpdate;
+import 
org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribeResponse;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicUpdate;
 import org.apache.pulsar.common.api.proto.CommandSend;
 import org.apache.pulsar.common.api.proto.CommandSendError;
 import org.apache.pulsar.common.api.proto.CommandSendReceipt;
@@ -218,6 +221,23 @@ public class ClientChannelHelper {
         protected void handlePong(CommandPong pong) {
             return;
         }
+
+        @Override
+        protected void 
handleCommandScalableTopicUpdate(CommandScalableTopicUpdate update) {
+            queue.offer(new CommandScalableTopicUpdate().copyFrom(update));
+        }
+
+        @Override
+        protected void handleCommandScalableTopicSubscribeResponse(
+                CommandScalableTopicSubscribeResponse response) {
+            queue.offer(new 
CommandScalableTopicSubscribeResponse().copyFrom(response));
+        }
+
+        @Override
+        protected void handleCommandScalableTopicAssignmentUpdate(
+                CommandScalableTopicAssignmentUpdate update) {
+            queue.offer(new 
CommandScalableTopicAssignmentUpdate().copyFrom(update));
+        }
     };
 
 }

Reply via email to