merlimat opened a new pull request, #25648:
URL: https://github.com/apache/pulsar/pull/25648

   ## Summary
   
   Foundation for multi-topic `QueueConsumer` / `StreamConsumer` subscriptions
   filtered by topic properties: a long-lived broker → client watch session over
   the union of scalable topics in a namespace that match a (possibly empty) set
   of property filters. The broker pushes a full Snapshot on subscribe and an
   incremental Diff (with adds + removes batched together) when membership
   changes. Reliable across reconnects via a hash-skip optimisation — when the
   client's hash of its current set matches the broker's freshly-computed one,
   the broker stays silent.
   
   This PR lands the wire layer, the broker session, and the V5 client watcher.
   The multi-topic consumer wrappers themselves come in a follow-up PR.
   
   The full design lives in 
[`multi-topic-consumer-design.md`](https://github.com/merlimat/pulsar/blob/st-scalable-topics-watcher/multi-topic-consumer-design.md)
   on this branch.
   
   ### Wire protocol
   
   ```
   WATCH_SCALABLE_TOPICS         (76)  client -> broker  open watch
   WATCH_SCALABLE_TOPICS_UPDATE  (77)  broker -> client  Snapshot or Diff
   WATCH_SCALABLE_TOPICS_CLOSE   (78)  client -> broker  close watch
   ```
   
   `CommandWatchScalableTopics` carries `watch_id`, `namespace`,
   `property_filters`, optional `consumer_name` (hook for a future
   namespace-level subscription coordinator), and optional `current_hash`.
   
   `CommandWatchScalableTopicsUpdate` is either:
   - `Snapshot { topics: [string] }` — full set, sent on initial subscribe and
     on reconnect when the hash differs;
   - `Diff { added: [string], removed: [string] }` — incremental changes, with
     apply-removed-first semantics (covers rapid remove-then-add).
   
   ### Reliability
   
   Resync-on-reconnect with a hash short-circuit:
   
   1. Client tracks a CRC32C hash over its current topic set (same function as
      `CommandGetTopicsOfNamespace`).
   2. On reconnect, client sends the hash. If the broker's freshly-computed
      hash matches, the broker emits **nothing** — the watch is live, the
      client's local state is correct, future Diffs flow as usual.
   3. If the hash differs (or the client didn't have a prior set), the broker
      emits a fresh Snapshot which the client applies as a full-state replace.
   
   For the common short-blip reconnect, the wire cost collapses to one
   inbound `WatchScalableTopics` frame and zero outbound.
   
   ### Broker session
   
   `ScalableTopicsWatcherSession`:
   - Registers a metadata-store listener on `/topics/<tenant>/<ns>` *before*
     computing the initial set, so events arriving mid-snapshot are captured.
   - Filters notifications to direct children only (skips
     `<topic>/subscriptions/...` and the controller lock).
   - Filter evaluation server-side. Created/Modified events read the new
     value to test against AND filters; Deleted events emit Removed if the
     topic was in `currentSet`.
   - 50 ms coalescing window folds back-to-back events into one Diff frame.
     Add and Remove for the same topic in the same window cancel out.
   - Any broker can serve the role — every broker observes the same metadata
     events, no namespace-level coordinator needed.
   
   ### V5 client watcher
   
   `ScalableTopicsWatcher`:
   - Mirrors `DagWatchClient` in shape: opens connection to service URL,
     registers itself on the cnx, sends `WatchScalableTopics`.
   - `start()` resolves with the initial topic set so callers can populate
     their per-topic-consumer map before the listener attaches.
   - Reconnect with `Backoff` (100 ms initial, 30 s cap). Sends the cached
     hash on reconnect.
   - Maintains a synchronised `Set<String>` updated on every Snapshot
     replace and Diff apply, so the hash is always cheap to compute.
   
   ## Test plan
   
   - `ScalableTopicsWatcherSessionHashTest` — broker session hash branches
     (no hash → snapshot; matching → silent; differing → snapshot). Mocked
     `ServerCnx` + `LocalMemoryMetadataStore`.
   - `V5ScalableTopicsWatcherTest` — end-to-end through a real shared
     cluster: create / delete events flow as Diffs; AND property filters
     narrow the set correctly; pre-existing topics surface in the initial
     Snapshot.
   
   ### Matching PR(s) in forked repositories
   
   - area/broker
   - area/client


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to