+1

-Lari

On 2026/06/22 20:42:35 Matteo Merli wrote:
> https://github.com/apache/pulsar/pull/26077/changes#diff-9e5a7ff65baecf5e891e115572fc177caa1766ae114f51df0c68e7b35f382903R1
> 
> -----
> 
>  PIP-486: Scalable Topic Key-Shared Consumption
> 
> *Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*
> 
> > **Draft.** The design is settled: a configurable per-topic entry-bucket 
> > budget, an explicit 16-bit
> > `hashB` range on the wire, an immutable per-segment bucket count, and 
> > controller-driven assignment.
> > Remaining specifics — the policy threshold values, and the durable 
> > per-bucket checkpoint representation
> > — are tuning defaults and checkpoint-consumer-spec detail rather than open 
> > design choices.
> 
> ## Motivation
> 
> [PIP-468](pip-468.md) and [PIP-483](pip-483.md) give scalable topics a
> DAG of range segments that split
> and merge automatically. The steady-state model for **ordered**
> consumption (stream / checkpoint
> consumers) is **one consumer per active segment**: each segment is an
> independent ordered substream, and
> parallelism comes from having more segments. This is the most
> efficient mode — a segment's entries are
> dispatched whole to a single consumer, with no per-message routing cost.
> 
> > **Scope:** this PIP concerns **ordered consumption** only. Queue (`Shared`) 
> > consumers are already
> > parallelized by round-robin delivery at the batch level and are 
> > **unaffected** — they need no
> > bucketing. Everything below applies to stream and checkpoint consumers.
> 
> There are several situations where one-consumer-per-segment is *not*
> the right mode:
> 
> 1. **Draining a sealed-segment backlog after a scale-up.** Start with
> 1 segment and 1 consumer. The
>    consumer falls behind, the application scales to 10 consumers, and
> auto-split ([PIP-483](pip-483.md))
>    immediately grows the topic to 10 segments — good for *new*
> traffic. But the pre-split backlog is
>    stuck in the old, now-sealed segment, and per-key ordering requires
> that sealed segment to be fully
>    drained before its successors are consumed for the same key range.
> A single consumer draining one
>    sealed segment is exactly the bottleneck the scale-up was meant to
> remove. We want to *temporarily*
>    fan that sealed segment's backlog across several consumers, then revert.
> 
> 2. **Many low-throughput topics.** When there are many topics each
> with low throughput and several
>    consumers, materializing many physical segments per topic is
> wasteful. Key-shared over a small number
>    of segments gives consumer parallelism without paying for segment
> fan-out we do not need.
> 
> 3. **More consumers than the max-segments ceiling.** A topic caps
> segments (say `max-segments = 64`) to
>    bound metadata and resource overhead. With 200 consumers, 64
> segments cannot give every consumer its
>    own substream. Raising the per-segment bucket count — e.g. 4
> buckets on each of 64 segments → 256
>    independently-routable substreams — activates all 200 consumers
> without an excessive segment count.
>    Buckets are the parallelism multiplier *beyond* the segment cap.
> 
> The blocker for all of these is that **Pulsar's key-shared dispatch
> couples the producer's batching mode
> to the consumer's subscription mode.** Today the broker routes by a
> single key per *entry*, so correct
> key-shared delivery requires the producer to disable batching or use
> the key-based batcher (one batch
> per key). That is an unacceptable coupling for scalable topics, where
> the consumption mode is a dynamic,
> consumer-side decision the producer should neither know about nor be
> constrained by.
> 
> This PIP removes that coupling.
> 
> ## Background knowledge
> 
> **Key-shared dispatch today.** For a `Key_Shared` subscription the
> broker computes one sticky-key hash
> per entry and maps it to a consumer **at dispatch time** via a
> consistent-hash ring
> (`ConsistentHashingStickyKeyConsumerSelector`). The key is read from
> the *outer, uncompressed*
> `MessageMetadata` only (`Commands.resolveStickyKey`); the broker
> **never decompresses the batch
> payload**, so per-message keys inside `SingleMessageMetadata` are not
> consulted. A batched entry is
> dispatched *whole* to a *single* consumer.
> 
> **Why `KeyBasedBatcher` is required today.** Because routing is "one
> key per entry," a default-batched
> entry that mixes keys routes entirely to whichever consumer owns the
> entry's outer key, violating
> affinity for the other keys. `BatchMessageKeyBasedContainer` works
> around this by putting one key per
> batch — collapsing to *no batching* at high key cardinality.
> 
> **Per-bucket pending tracking is simpler than today's key tracking.**
> The current Key_Shared dispatcher
> tracks pending/replay state per *key* (a large, dynamic set). Routing
> by a fixed, small set of buckets
> lets the dispatcher track pending messages **per bucket** instead —
> fewer, stable units.
> 
> **Scalable-topic hash ring.** A scalable topic's keyspace is the
> 16-bit segment-routing ring
> (`HashRange`, `0x0000–0xFFFF`). Each segment owns a contiguous
> sub-range; splits/merges adjust range
> ownership without rewriting committed data.
> 
> ## Goals
> 
> ### In Scope
> 
> - Per-message-key (key-shared) **ordered** consumption on a
> scalable-topic segment **without**
>   constraining the producer's batching mode; producer batching and
> consumer mode independently chosen.
> - The three use cases above: temporary sealed-segment drain,
> low-throughput consolidation, and
>   parallelism beyond the max-segments ceiling.
> - Preserve per-key ordering across all transitions.
> 
> ### Out of Scope
> 
> - **Queue (`Shared`) consumers** — already round-robin at the batch
> level; unaffected.
> - Changing the one-consumer-per-segment model for high-throughput
> ordered consumption.
> - Key-shared semantics for classic (non-scalable) topics.
> 
> ## High Level Design
> 
> Routing is **two independent levels**, and the new primitive is the
> **entry bucket** — a contiguous
> sub-range of a second, independent hash ring — together with a
> configurable **per-topic entry-bucket
> budget**.
> 
> ### Entry buckets
> 
> 1. **Segment routing (existing, PIP-468).** A key's segment is chosen
> by the segment-routing hash over
>    the 16-bit ring; the key lands in whichever segment owns that
> sub-range. Happens first — the producer
>    is already writing to a specific segment.
> 2. **Intra-segment bucketing (this PIP).** A **separate, independent**
> hash `hashB(key)` maps keys onto
>    a second ring, and each segment divides *that* ring into `N` equal
> **buckets** — a bucket is a
>    contiguous `hashB` sub-range `[start, end)`. `hashB` must be
> independent of the segment-routing hash
>    (see [the bucket hash](#the-bucket-hash)), so the keys a segment
> actually receives spread evenly
>    across its buckets — with `N = 2`, each bucket gets ≈ half the
> segment's traffic, regardless of which
>    slice of the segment-routing ring the segment owns.
> 
> We call this **entry-bucketing**: the producer's batcher keeps each
> batch — one stored entry — within a
> single bucket, so an entry covers exactly one `hashB` sub-range and
> carries that range in its outer
> metadata.
> 
> ### The per-topic entry-bucket budget
> 
> A topic has a **configurable total entry-bucket count** `T` (default
> **4**). These buckets are
> distributed across the topic's segments: with `S` segments, each
> segment has `N = T / S` buckets (floor
> 1). The key consequence:
> 
> > **A split divides a segment's buckets between its children.** Splitting a 
> > segment with `N` buckets
> > produces two children with `N/2` each — more segments, fewer buckets per 
> > segment — so the topic's
> > total stays ≈ `T`.
> 
> So a single-segment topic starts with all `T = 4` buckets on that one
> segment (room to fan it out to 4
> consumers — the hedge for an unknown initial consumer count); a
> 2-segment topic has 2 buckets per
> segment; a 4-or-more-segment topic settles at 1 bucket per segment (`N
> = 1` = "unconstrained" batching,
> the producer batches the segment's keys freely because the segment
> routes to one consumer). The budget
> `T` is the topic's standing intra-segment fan-out headroom; total
> consumer parallelism is `Σ N` over
> segments ≈ `T` until segments outnumber `T`, after which segments
> alone carry it.
> 
> **`N` is immutable for a segment's life.** A segment's bucket count is
> fixed when the segment is created
> (at `T / S` for the segment count `S` at that moment, or `parent_N /
> 2` for a split child). Changing it
> means **rolling the segment over** (below), never mutating a live one.
> This is what keeps a segment from
> ever carrying two different `N` values at once — so no entry ever
> straddles consumers.
> 
> **`N` is bounded above by a per-segment maximum `N_max`** (default
> 1024). The controller may raise a
> segment's `N` above its budget share on consumer demand (next), up to `N_max`.
> 
> ### Routing: bucket assignment, not per-key consistent hashing
> 
> A deliberate departure from classic `Key_Shared`. There, the broker
> hashes every key at dispatch time.
> Here the per-message decision is made **once, at the producer**
> (segment routing, then the bucket hash);
> the broker does **no per-key hashing** — it reads the entry's `hashB`
> range and dispatches the whole
> entry to the consumer that owns that bucket on the segment.
> 
> The **scalable-topic controller** owns the per-segment bucket→consumer
> assignment — the same component
> that already assigns consumers to segments ([PIP-468](pip-468.md)) —
> and the assignment is **1 bucket →
> exactly 1 consumer** (a consumer may own several buckets; a bucket has
> one owner). So an entry always
> goes to exactly one consumer; the broker never splits or filters an
> entry. This is what carries per-key
> order within a segment.
> 
> Two scaling actions follow from immutable `N`:
> 
> - **Scale consumers within a segment (`N` fixed):** the controller
> redistributes the segment's `N`
>   buckets among more or fewer consumers (up to `N`). Ordering across a
> handoff reuses the existing
>   `Key_Shared` blocked-hash mechanism — block a moving bucket until
> the prior owner's in-flight messages
>   for it are acked, then hand over — tracking pending messages **per
> bucket**, not per key. No new
>   entry-splitting machinery.
> - **Change `N` (rebucket rollover):** when a segment needs more
> buckets than it has (e.g. consumer count
>   exceeds `N`), the controller performs a **no-op split** — seal the
> segment, create a successor with
>   the **same hash range** but a new `N`, redirect producers. The
> sealed predecessor drains under its old
>   `N`; the successor takes new writes under the new `N`. Reuses
> PIP-468's seal → successor → redirect
>   flow and DAG ordering, so per-key order across the change is free
> and no producer ever writes two `N`
>   values into a live segment. (Caveat: like a split, the successor's
> keys aren't consumed until the
>   predecessor drains, so a rollover's larger `N` helps *new* data —
> draining an existing backlog wider
>   is the reassignment action above, bounded by the sealed segment's `N`.)
> 
> ### Entry-bucketed batching (the new default for scalable-topic producers)
> 
> The producer already partitions output by segment (segments are
> separate underlying topics); run the
> existing batch builder **per bucket within each segment** — `N`
> builders per segment, each bounded by
> the usual max-bytes / max-messages / max-delay. Each emitted entry
> then holds keys from a single bucket
> of a single segment and carries that one `hashB` range, so it routes
> to exactly one consumer — no
> fan-out, no filtering, no decompression, no re-serialization. It is a
> *coarsened* key-based batcher
> ("one batch per bucket" instead of "one batch per key") with fan-out
> bounded by the segment's `N`,
> independent of key cardinality.
> 
> The batching cost lands where it does not matter: at high throughput
> each bucket still fills the batch
> limit (≈ zero penalty); at low throughput buckets dribble and close on
> the timer — but low throughput is
> use case #2, where batching efficiency is not the concern. And because
> `N` shrinks as the topic splits,
> high-scale topics converge on `N = 1` (full batching).
> 
> ### How the use cases map
> 
> - **Use case #1 (drain sealed backlog):** the sealed segment's
> messages are already bucketed into its
>   `N` buckets, so draining faster is the controller **reassigning
> those buckets** across up to `N`
>   consumers — whole entries, no filtering. When the backlog clears,
> the segment retires and the group
>   reverts to one-consumer-per-segment. Drain parallelism is bounded by
> the segment's `N`.
> - **Use case #2 (low-throughput consolidation):** key-shared as the
> steady-state mode over few segments.
>   When [PIP-483](pip-483.md) sees many consumers but low per-segment
> throughput, a split would add
>   physical segments the load does not justify, so it
> **rebucket-rolls** the segment to a larger `N`
>   instead of splitting.
> - **Use case #3 (parallelism beyond the segment cap):** at
> `max-segments`, the controller raises `N` on
>   existing segments (rebucket rollover) rather than creating more —
> `64 segments × N = 4 → 256`
>   substreams for 200 consumers. The standing lever: *segments first,
> `N` once segments are capped.*
> 
> ### Closing the loop: the layout channel
> 
> PIP-468 already pushes a `ScalableTopicLayoutResponse` (segment
> hash-ranges) to clients over the DAG
> watch session. Extend it with each segment's **bucket boundaries**
> (the `hashB` sub-ranges for its `N`
> buckets) — what the producer needs to bucket its batches. The producer
> reads a segment's boundaries once
> when it starts producing there; a new `N` arrives only as a new
> same-range successor segment, picked up
> through the existing redirect-on-seal flow, so there is no live
> mutation to race. The bucket→consumer
> assignment stays broker-internal (consumers don't filter, so they need
> no view of it for routing).
> 
> ```mermaid
> flowchart LR
>   subgraph Producer["Producer, within one segment (N=4)"]
>     K[keys routed to this segment] --> B{bucket = hashB key}
>     B --> b0["bucket 0x0000-0x3FFF"]
>     B --> b1["bucket 0x4000-0x7FFF"]
>     B --> b2["bucket 0x8000-0xBFFF"]
>     B --> b3["bucket 0xC000-0xFFFF"]
>   end
>   b0 -->|range stamped on entry| BR[Broker: range -> owning consumer on 
> segment]
>   b1 --> BR
>   b2 --> BR
>   b3 --> BR
>   BR --> C0[Consumer A: buckets 0-1]
>   BR --> C1[Consumer B: buckets 2-3]
> ```
> 
> ## Detailed Design
> 
> ### Design & Implementation Details
> 
> **Binary protocol — `MessageMetadata`.** Entry-bucketing produces
> single-bucket batches, so an entry
> carries the explicit `hashB` sub-range it covers. Carrying the
> **range** (rather than a `(count, index)`
> pair) is deliberate — it is self-describing and unambiguous at the
> dispatch site, where the broker just
> checks range ownership:
> 
> ```protobuf
> message MessageMetadata {
>   // ... existing fields ...
>   optional uint32 entry_bucket_start = 40;  // hashB range start,
> inclusive (16-bit value)
>   optional uint32 entry_bucket_end   = 41;  // hashB range end,
> inclusive (16-bit value)
> }
> ```
> 
> The `hashB` ring is 16 bits (`0x0000–0xFFFF`), so each bound holds a
> 16-bit value in a `uint32`.
> **Both bounds are inclusive** — an exclusive end would force the last
> bucket's end to `0x10000`,
> overflowing the 16-bit ring; with inclusive bounds the last bucket is
> simply `[0xC000, 0xFFFF]`. Both
> fields are optional at the proto level (absent on classic-topic
> messages); on a scalable topic the
> producer always sets them. Exact field numbers TBD against the current proto.
> 
> **The bucket hash.** Both the segment-routing hash and the
> entry-bucket hash are 16-bit, so the simplest
> construction is to compute **one 32-bit avalanche hash** of the key
> and split it into two halves: the
> **high 16 bits → segment-routing ring**, the **low 16 bits → `hashB`**
> (the bucket ring). Because the
> two halves of a good-avalanche hash (murmur3 / xxhash) are
> independent, within a segment's slice (fixed
> high half) the low half is uniform — so a segment's keys spread evenly
> across its buckets, with one hash
> computation and zero collision between the two roles. The function
> must be **fixed and version-pinned**
> (identical across versions and clusters — the latter for
> geo-replication). Implementation aligns this
> with PIP-468's existing segment hash, treating it as the high half of
> the same 32-bit hash.
> 
> **Producer — bucketed batching.** A new batcher (working name
> `BucketedBatchContainer`) maintains, per
> segment it produces to, `N` sub-containers keyed by which bucket
> `hashB(key)` falls in, each behaving
> like the current `BatchMessageContainerImpl`. On flush it stamps
> `entry_bucket_start/end`. It reads each
> segment's bucket boundaries from the layout it already has (no extra
> round-trip). Default batcher for
> scalable-topic (`topic://`) producers; classic topics unaffected. (For
> e2e-encrypted scalable topics the
> SDK disables batching — see [Security](#security-considerations) — and
> each single-message entry is
> still stamped with its bucket range and routed normally.)
> 
> **Broker — routing.** Within a segment's dispatcher
> (`PersistentStickyKeyDispatcherMultipleConsumers`), when an entry
> carries a bucket range, dispatch the
> whole entry to the consumer the controller assigned that bucket — no
> per-key hashing, no decompression.
> Because `N` is immutable, the entry's range always matches a current
> bucket boundary, so this is
> unconditionally a single-consumer dispatch; only *which* consumer owns
> it changes, on reassignment.
> 
> **Broker — bucket→consumer reassignment.** The controller decides the
> assignment (as it does for
> segment→consumer). Moving a bucket reuses the `Key_Shared`
> consumer-change handling: block the moving
> bucket until the prior owner's in-flight messages for it are acked,
> then hand over. Pending state is
> tracked **per bucket**. No entry ever goes to more than one consumer,
> so **no shared-entry dispatch or
> cross-consumer ack aggregation is needed** — that machinery is avoided 
> entirely.
> 
> **Consumer — no filtering.** The broker dispatches an entry only to
> the bucket's current owner, so a
> consumer receives only messages for buckets it owns; during a handoff
> the broker *withholds* the moving
> bucket rather than over-delivering. The consumer side is unchanged
> from a normal subscription.
> 
> **Checkpoint / stream consumers.** The **bucket is the unit of
> checkpointing**: a checkpoint consumer
> records position **per (segment, bucket)**. This generalizes today's
> per-segment checkpoint — at `N = 1`
> a segment *is* one bucket, so it is exactly current behavior; at `N >
> 1` each consumer checkpoints its
> owned buckets, and on reassignment the new owner resumes from the
> durable per-bucket checkpoint.
> *(Per-bucket checkpoint mechanics are detailed in the
> checkpoint-consumer spec.)*
> 
> **Controller — segment operations.** Range split/merge
> ([PIP-468](pip-468.md)) is unchanged and
> orthogonal (it operates on the segment-routing hash); a **split
> divides the parent's buckets between its
> children** (`N/2` each, floor 1), keeping the topic's total ≈ `T`.
> Changing a segment's `N` adds one new
> operation — the **rebucket rollover (no-op split)**: seal, create a
> same-range successor with the new
> `N`, redirect, drain the predecessor — reusing the split machinery and
> its ordering/cursor guarantees.
> The split-vs-rebucket choice is part of [PIP-483](pip-483.md)'s policy
> engine (prefer splitting while
> under `max-segments` and throughput justifies a physical segment;
> otherwise rebucket-up; raise fast,
> lower lazily, with anti-flap); the concrete thresholds
> (split-vs-rebucket throughput cutoff, rollover
> cooldown, rebucket-down idle window) are tunables with defaults, like
> PIP-483's existing split/merge
> thresholds.
> 
> ### Public-facing Changes
> 
> #### Binary protocol
> New optional `MessageMetadata` fields `entry_bucket_start`,
> `entry_bucket_end`. Layout response
> (`ScalableTopicLayoutResponse`) extended with each segment's bucket 
> boundaries.
> 
> #### Configuration
> - Broker only (no client-side configuration): **total entry-buckets
> per topic** (default 4), per-segment
>   maximum `N_max` (default 1024), and the temporary
> sealed-segment-drain toggle. The producer always
>   uses the broker-advertised bucket boundaries.
> 
> #### Client API
> None. Entry-bucketing is an internal client-library detail; the
> consumption mode is decided and engaged
> transparently by the controller. Consumers and producers see no API change.
> 
> #### Metrics
> - Per-segment: bucket count `N`, consumers sharing the segment,
> bucket→consumer reassignments, buckets
>   currently withheld for a handoff (gauge), drain-mode active (gauge).
> - Per-topic: rebucket rollovers (count).
> - Producer: bucket fill ratio / mean batch size per bucket.
> 
> ## Monitoring
> 
> A persistently non-zero **buckets-withheld** gauge means a handoff is
> stuck (a prior owner not acking) —
> worth alerting on. Frequent **rebucket rollovers** suggest the bucket
> budget is mis-sized for the
> workload. A **drain-mode** gauge that stays high signals a sealed
> segment that is not clearing.
> 
> ## Security Considerations
> 
> The bucket range lives in cleartext outer metadata and the broker
> routes whole entries by it — it never
> decrypts a payload, reads a per-message key, or slices an entry.
> **End-to-end encryption works without
> special handling** for dispatch. Separately, the client SDK **disables
> batching for e2e-encrypted
> scalable topics** (so each entry is a single message): an encrypted
> batch is opaque and cannot be
> reshaped if it ever has to be re-routed across a differing layout — a
> requirement that arises for
> geo-replication, which is the subject of a separate, forthcoming PIP.
> Routing is unaffected — each
> single-message entry still carries its bucket range. No new
> authorization surface; bucket assignment is
> internal to a subscription.
> 
> ## Backward & Forward Compatibility
> 
> - **No old-broker / old-producer concern:** `segment://` topics are
> only ever served by Pulsar 5+
>   brokers and clients that understand scalable topics, so the new
> fields never reach a participant that
>   doesn't; they are optional and inert for classic topics regardless.
> - **`N` changes are segment rollovers, not live mutations:** a
> producer always writes a single `N` to a
>   given segment; a new `N` arrives only as a new same-range successor.
> No stale-`N`-on-a-live-segment
>   case.
> 
> ### Upgrade / Downgrade / Rollback
> No metadata-format migration; the proto fields are additive and
> optional. Rollback is safe — the fields
> are ignored by prior versions.
> 
> ### Pulsar Geo-Replication Considerations
> Geo-replication of scalable topics will be specified in a separate,
> forthcoming PIP. This design imposes
> **no additional requirements** for it: the per-segment bucket range
> defined here, combined with a shared
> `hashB` function across clusters, is sufficient for a destination
> cluster to route (or fan out)
> replicated batches into its own layout. The only related constraint is
> the e2e batching rule noted under
> [Security](#security-considerations).
> 
> ## Alternatives
> 
> - **Broker decompress + re-split into per-consumer sub-batches.**
> Fully decouples without producer
>   cooperation, but pays decompress + re-serialize on every dispatch
> and is blocked by encryption.
>   Rejected: producer-stamped bucket ranges achieve the same routing
> with none of those costs.
> - **`KeyBasedBatcher` everywhere.** The status-quo coupling; rejected
> as the whole motivation.
> - **Mutate `N` in place on a live segment.** A live segment would
> carry two `N` values at once, forcing
>   entry fan-out, cross-consumer ack aggregation, and a bespoke
> ordering protocol. Rejected for
>   immutable-`N` + rebucket-rollover, which keeps every entry on one consumer.
> - **Compact `(bucket_count, bucket_id)` wire form** (instead of the
> explicit range). Equivalent in
>   information, but the range is self-describing and less error-prone
> at the dispatch site; rejected for
>   clarity, at a couple of bytes' cost.
> - **Re-partition the sealed backlog into ephemeral key-ranged child
> segments.** Avoids per-dispatch cost
>   but rewrites committed data and re-wires the DAG; kept only as a
> possible future optimization for use
>   case #1.
> 
> 
> --
> Matteo Merli
> <[email protected]>
> 

Reply via email to