lhotari commented on PR #25648:
URL: https://github.com/apache/pulsar/pull/25648#issuecomment-4360787352

   > _Note: this is analysis by Claude Code, reviewed by me before posting._
   
   A few points worth checking before this merges.
   
   **1. Metadata listener leak per `ScalableTopicsWatcherSession`** 
(`ScalableTopicsWatcherSession.start():305`, `close():466-472`)
   
   `start()` calls 
`resources.getStore().registerListener(this::onNotification)` per session, and 
`close()` only flips a `closed` flag — `MetadataStore` exposes no 
`unregisterListener`. So every closed session leaves a stale 
`Consumer<Notification>` registered, and **every** metadata notification fans 
out to all stale listeners over the broker's lifetime (each short-circuits on 
`closed.get()`, but the dispatch cost still scales linearly with total sessions 
ever opened). Not just memory growth — a real per-event throughput tax for 
long-running brokers serving many namespace watches.
   
   **Suggested fix (primary)** — mirror what `TopicResources` does for 
`TopicListService`:
   - Have `ScalableTopicResources` register **one** `handleNotification` 
listener at construction time (`TopicResources.java:52`).
   - Maintain a `Map<ScalableTopicNamespaceListener, NamespaceName>` (or 
similar) inside `ScalableTopicResources`, with `register…Listener(...)` / 
`deregister…Listener(...)` methods (`TopicResources.java:136-142`).
   - `ScalableTopicsWatcherSession.start()` calls `register…`, `close()` calls 
`deregister…`. The single fan-out filters by the watcher's namespace base path.
   
   This is the established pattern in the codebase, and it removes both the 
leak and the linear dispatch cost without needing a `MetadataStore` API change.
   
   **Alternative** — #24256 adds `registerCancellableListener` returning a 
handle that callers cancel on shutdown. With that landed, 
`ScalableTopicsWatcherSession` could store the handle and cancel it in 
`close()`. Works, but keeps one metadata-store listener per session; the 
fan-out approach above is cheaper at runtime and consistent with 
`TopicListService`.
   
   **2. \`consumer_name\` described in PR body but absent from the proto** 
(`PulsarApi.proto:1558-1569`)
   
   The PR description's "Wire protocol" section states 
`CommandWatchScalableTopics` carries `consumer_name`, and the "Cross-topic load 
balancing — deferred" section says "\`consumer_name\` is part of 
\`CommandWatchScalableTopics\` *now*, so the future coordinator has the 
identity it needs." The proto in this PR has only `watch_id`, `namespace`, 
`property_filters`, `current_hash`. Either add the field now (cheap — `optional 
string consumer_name = 5;`) or remove the design language so we don't need 
another wire bump for the future coordinator.
   
   **3. Reconnect backoff never reset on hash-matched reconnect** 
(`ScalableTopicsWatcher.java:1037`)
   
   `reconnectBackoff.reset()` runs only inside `onSnapshot`. The hash-skip 
optimisation is the common short-blip happy path — the broker emits no 
`Snapshot`, so the backoff stays at its last value across successive successful 
reconnects. The next disconnect then waits much longer than expected. Suggest 
resetting on successful reconnect (after the watch frame's write completes, or 
on any successful broker→client traffic).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to