merlimat opened a new pull request, #25648: URL: https://github.com/apache/pulsar/pull/25648
## Summary Foundation for multi-topic `QueueConsumer` / `StreamConsumer` subscriptions filtered by topic properties: a long-lived broker → client watch session over the union of scalable topics in a namespace that match a (possibly empty) set of property filters. The broker pushes a full Snapshot on subscribe and an incremental Diff (with adds + removes batched together) when membership changes. Reliable across reconnects via a hash-skip optimisation — when the client's hash of its current set matches the broker's freshly-computed one, the broker stays silent. This PR lands the wire layer, the broker session, and the V5 client watcher. The multi-topic consumer wrappers themselves come in a follow-up PR. The full design lives in [`multi-topic-consumer-design.md`](https://github.com/merlimat/pulsar/blob/st-scalable-topics-watcher/multi-topic-consumer-design.md) on this branch. ### Wire protocol ``` WATCH_SCALABLE_TOPICS (76) client -> broker open watch WATCH_SCALABLE_TOPICS_UPDATE (77) broker -> client Snapshot or Diff WATCH_SCALABLE_TOPICS_CLOSE (78) client -> broker close watch ``` `CommandWatchScalableTopics` carries `watch_id`, `namespace`, `property_filters`, optional `consumer_name` (hook for a future namespace-level subscription coordinator), and optional `current_hash`. `CommandWatchScalableTopicsUpdate` is either: - `Snapshot { topics: [string] }` — full set, sent on initial subscribe and on reconnect when the hash differs; - `Diff { added: [string], removed: [string] }` — incremental changes, with apply-removed-first semantics (covers rapid remove-then-add). ### Reliability Resync-on-reconnect with a hash short-circuit: 1. Client tracks a CRC32C hash over its current topic set (same function as `CommandGetTopicsOfNamespace`). 2. On reconnect, client sends the hash. If the broker's freshly-computed hash matches, the broker emits **nothing** — the watch is live, the client's local state is correct, future Diffs flow as usual. 3. If the hash differs (or the client didn't have a prior set), the broker emits a fresh Snapshot which the client applies as a full-state replace. For the common short-blip reconnect, the wire cost collapses to one inbound `WatchScalableTopics` frame and zero outbound. ### Broker session `ScalableTopicsWatcherSession`: - Registers a metadata-store listener on `/topics/<tenant>/<ns>` *before* computing the initial set, so events arriving mid-snapshot are captured. - Filters notifications to direct children only (skips `<topic>/subscriptions/...` and the controller lock). - Filter evaluation server-side. Created/Modified events read the new value to test against AND filters; Deleted events emit Removed if the topic was in `currentSet`. - 50 ms coalescing window folds back-to-back events into one Diff frame. Add and Remove for the same topic in the same window cancel out. - Any broker can serve the role — every broker observes the same metadata events, no namespace-level coordinator needed. ### V5 client watcher `ScalableTopicsWatcher`: - Mirrors `DagWatchClient` in shape: opens connection to service URL, registers itself on the cnx, sends `WatchScalableTopics`. - `start()` resolves with the initial topic set so callers can populate their per-topic-consumer map before the listener attaches. - Reconnect with `Backoff` (100 ms initial, 30 s cap). Sends the cached hash on reconnect. - Maintains a synchronised `Set<String>` updated on every Snapshot replace and Diff apply, so the hash is always cheap to compute. ## Test plan - `ScalableTopicsWatcherSessionHashTest` — broker session hash branches (no hash → snapshot; matching → silent; differing → snapshot). Mocked `ServerCnx` + `LocalMemoryMetadataStore`. - `V5ScalableTopicsWatcherTest` — end-to-end through a real shared cluster: create / delete events flow as Diffs; AND property filters narrow the set correctly; pre-existing topics surface in the initial Snapshot. ### Matching PR(s) in forked repositories - area/broker - area/client -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
