lhotari opened a new pull request, #25635:
URL: https://github.com/apache/pulsar/pull/25635

   PIP: #25469
   
   ### Motivation
   
   PIP-460 (Scalable Topics) and PIP-466 introduced six new binary protocol 
commands. The three client→broker handlers in `ServerCnx` — 
`ScalableTopicLookup` (id 70), `ScalableTopicClose` (id 72), and 
`ScalableTopicSubscribe` (id 73) — performed no per-operation authorization. 
Authentication is enforced at connection setup, but every comparable command 
(`handleLookup`, `handleSubscribe`, `handleProducer`, 
`handlePartitionMetadataRequest`) gates access through `AuthorizationService` 
while the scalable-topic equivalents did not.
   
   This is a security gap: any authenticated client could open a DAG watch 
session on any scalable topic and register consumers on any subscription, 
regardless of namespace/topic permissions. The Admin REST API for scalable 
topics already enforces authorization (`ScalableTopicsAuthZTest` covers it). 
This PR closes the parallel gap on the binary protocol path.
   
   PIP-460 segments inherit authorization from their parent `topic://`, so 
authorization happens at the parent topic level — no new permission types are 
needed. The mapping is the same as classic Pulsar:
   
   | New command | Operation |
   |---|---|
   | `CommandScalableTopicLookup` | `TopicOperation.LOOKUP` |
   | `CommandScalableTopicSubscribe` | `TopicOperation.CONSUME` (with 
subscription) |
   | `CommandScalableTopicClose` | (no per-call authz — see below) |
   
   ### Modifications
   
   `pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java`
   - `handleCommandScalableTopicLookup`: wraps `DagWatchSession` creation in 
`isTopicOperationAllowed(topic, TopicOperation.LOOKUP, ...)`. On denial → 
`Commands.newScalableTopicError(sessionId, AuthorizationError, ...)`. On authz 
exception → `logAuthException` + `AuthorizationError`. The session is 
registered in `dagWatchSessions` only after authorization passes, so an 
unauthorized lookup never holds a server-side resource.
   - `handleCommandScalableTopicSubscribe`: wraps 
`scalableTopicService.registerConsumer` in `isTopicOperationAllowed(topic, 
subscription, TopicOperation.CONSUME)` — the 3-arg overload that wraps the auth 
data in `AuthenticationDataSubscription` so subscription-aware authz plugins 
receive the subscription name. On denial → 
`getCommandSender().sendScalableTopicSubscribeError(requestId, 
AuthorizationError, ...)`.
   - `handleCommandScalableTopicClose`: no per-call authorization, with a 
comment documenting the deliberate decision. The session is keyed in this 
connection's per-instance `dagWatchSessions` map; authentication is enforced at 
connect; the originating `ScalableTopicLookup` was authorized when the session 
was created; a close for an unknown `sessionId` is an idempotent no-op. Same 
pattern as `handleCloseProducer` / `handleCloseConsumer`.
   
   Mirrors existing patterns (`handleLookup` line 648, `handleSubscribe` line 
1535) for thread model, error frames, and exception logging.
   
   
`pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java`
 — added decoder hooks for `CommandScalableTopicUpdate`, 
`CommandScalableTopicSubscribeResponse`, and 
`CommandScalableTopicAssignmentUpdate` so the existing test harness can decode 
broker→client scalable-topic responses.
   
   ### Verifying this change
   
   This change added tests and can be verified as follows:
   
   - New `ScalableTopicBinaryAuthZTest` (6 tests) — dedicated binary-protocol 
authorization suite covering: authorized lookup proceeds, unauthorized lookup 
returns `AuthorizationError`, unauthorized subscribe returns 
`AuthorizationError`, close-no-authz no-op for unknown session, and 
feature-flag short-circuits authz for both lookup and subscribe.
   - Extended 
`ServerCnxTest.testScalableTopicCommandsRequireTopicAuthorization` — combined 
unauthorized check that mirrors the existing 
`testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker` pattern.
   - Existing `ScalableTopicsAuthZTest` (admin REST) continues to pass.
   - Existing `scalable.*` feature tests (92 tests across 
`ConsumerSessionTest`, `DagWatchSessionTest`, `ScalableTopicControllerTest`, 
`ScalableTopicServiceTest`, `SegmentLayoutTest`, `SubscriptionCoordinatorTest`) 
continue to pass.
   
   Run locally:
   \`\`\`
   ./gradlew :pulsar-broker:test --tests 
\"org.apache.pulsar.broker.service.ScalableTopicBinaryAuthZTest\"
   ./gradlew :pulsar-broker:test --tests 
\"org.apache.pulsar.broker.service.ServerCnxTest\"
   ./gradlew :pulsar-broker:test --tests 
\"org.apache.pulsar.broker.admin.ScalableTopicsAuthZTest\"
   \`\`\`
   
   ### Does this pull request potentially affect one of the following parts:
   
   - [x] The binary protocol — adds authorization gating to three existing 
PIP-460/PIP-466 commands. No wire-format changes; the same 
`CommandScalableTopicError` / `CommandScalableTopicSubscribeResponse` frames 
are returned, now with `ServerError.AuthorizationError` for unauthorized 
clients.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to