lhotari opened a new pull request, #25635: URL: https://github.com/apache/pulsar/pull/25635
PIP: #25469 ### Motivation PIP-460 (Scalable Topics) and PIP-466 introduced six new binary protocol commands. The three client→broker handlers in `ServerCnx` — `ScalableTopicLookup` (id 70), `ScalableTopicClose` (id 72), and `ScalableTopicSubscribe` (id 73) — performed no per-operation authorization. Authentication is enforced at connection setup, but every comparable command (`handleLookup`, `handleSubscribe`, `handleProducer`, `handlePartitionMetadataRequest`) gates access through `AuthorizationService` while the scalable-topic equivalents did not. This is a security gap: any authenticated client could open a DAG watch session on any scalable topic and register consumers on any subscription, regardless of namespace/topic permissions. The Admin REST API for scalable topics already enforces authorization (`ScalableTopicsAuthZTest` covers it). This PR closes the parallel gap on the binary protocol path. PIP-460 segments inherit authorization from their parent `topic://`, so authorization happens at the parent topic level — no new permission types are needed. The mapping is the same as classic Pulsar: | New command | Operation | |---|---| | `CommandScalableTopicLookup` | `TopicOperation.LOOKUP` | | `CommandScalableTopicSubscribe` | `TopicOperation.CONSUME` (with subscription) | | `CommandScalableTopicClose` | (no per-call authz — see below) | ### Modifications `pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java` - `handleCommandScalableTopicLookup`: wraps `DagWatchSession` creation in `isTopicOperationAllowed(topic, TopicOperation.LOOKUP, ...)`. On denial → `Commands.newScalableTopicError(sessionId, AuthorizationError, ...)`. On authz exception → `logAuthException` + `AuthorizationError`. The session is registered in `dagWatchSessions` only after authorization passes, so an unauthorized lookup never holds a server-side resource. - `handleCommandScalableTopicSubscribe`: wraps `scalableTopicService.registerConsumer` in `isTopicOperationAllowed(topic, subscription, TopicOperation.CONSUME)` — the 3-arg overload that wraps the auth data in `AuthenticationDataSubscription` so subscription-aware authz plugins receive the subscription name. On denial → `getCommandSender().sendScalableTopicSubscribeError(requestId, AuthorizationError, ...)`. - `handleCommandScalableTopicClose`: no per-call authorization, with a comment documenting the deliberate decision. The session is keyed in this connection's per-instance `dagWatchSessions` map; authentication is enforced at connect; the originating `ScalableTopicLookup` was authorized when the session was created; a close for an unknown `sessionId` is an idempotent no-op. Same pattern as `handleCloseProducer` / `handleCloseConsumer`. Mirrors existing patterns (`handleLookup` line 648, `handleSubscribe` line 1535) for thread model, error frames, and exception logging. `pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java` — added decoder hooks for `CommandScalableTopicUpdate`, `CommandScalableTopicSubscribeResponse`, and `CommandScalableTopicAssignmentUpdate` so the existing test harness can decode broker→client scalable-topic responses. ### Verifying this change This change added tests and can be verified as follows: - New `ScalableTopicBinaryAuthZTest` (6 tests) — dedicated binary-protocol authorization suite covering: authorized lookup proceeds, unauthorized lookup returns `AuthorizationError`, unauthorized subscribe returns `AuthorizationError`, close-no-authz no-op for unknown session, and feature-flag short-circuits authz for both lookup and subscribe. - Extended `ServerCnxTest.testScalableTopicCommandsRequireTopicAuthorization` — combined unauthorized check that mirrors the existing `testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker` pattern. - Existing `ScalableTopicsAuthZTest` (admin REST) continues to pass. - Existing `scalable.*` feature tests (92 tests across `ConsumerSessionTest`, `DagWatchSessionTest`, `ScalableTopicControllerTest`, `ScalableTopicServiceTest`, `SegmentLayoutTest`, `SubscriptionCoordinatorTest`) continue to pass. Run locally: \`\`\` ./gradlew :pulsar-broker:test --tests \"org.apache.pulsar.broker.service.ScalableTopicBinaryAuthZTest\" ./gradlew :pulsar-broker:test --tests \"org.apache.pulsar.broker.service.ServerCnxTest\" ./gradlew :pulsar-broker:test --tests \"org.apache.pulsar.broker.admin.ScalableTopicsAuthZTest\" \`\`\` ### Does this pull request potentially affect one of the following parts: - [x] The binary protocol — adds authorization gating to three existing PIP-460/PIP-466 commands. No wire-format changes; the same `CommandScalableTopicError` / `CommandScalableTopicSubscribeResponse` frames are returned, now with `ServerError.AuthorizationError` for unauthorized clients. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
