Closing the vote with 2 +1s: * Matteo * Lari
-- Matteo Merli <[email protected]> On Thu, Jun 11, 2026 at 8:37 AM Lari Hotari <[email protected]> wrote: > > +1 (binding) > > -Lari > > On 2026/06/07 22:21:56 Matteo Merli wrote: > > https://github.com/apache/pulsar/pull/25938 > > > > ------ > > > > # PIP-483: Scalable Topic Auto Split/Merge > > > > *Sub-PIP of [PIP-468: Scalable Topic Controller](pip-468.md)* > > > > ## Motivation > > > > [PIP-468](pip-468.md) gives the Scalable Topic Controller the ability > > to **split** and **merge** segments, but only on explicit operator > > request via the admin API. An operator has to watch the topic, decide > > it is hot, and issue a split — and later notice it has gone cold and > > issue a merge. This is the same operational toil that partition-count > > management imposes on classic Pulsar topics, which scalable topics > > were meant to eliminate. > > > > This PIP adds an **auto-scaling policy** to the controller: the > > controller leader observes per-segment load and per-subscription > > consumer pressure, and autonomously splits hot segments and merges > > cold ones, within hard caps that prevent runaway growth and > > split/merge flip-flopping. > > > > The design is built around three principles that came out of the > > design discussion: > > > > 1. **Splits are fast; merges are lazy.** A split protects throughput > > and latency under load, so it fires quickly — with only a short > > cooldown to coalesce bursts of near-simultaneous triggers (e.g. a > > group of consumers connecting in rapid succession). A merge is purely > > an efficiency reclaim, so it can wait, be rate-limited, and be skipped > > when in doubt. > > 2. **The controller reacts, it does not poll.** New stream/checkpoint > > consumers register directly with the controller, so consumer-count > > changes are handled event-driven within seconds. Load data is *pushed* > > into the metadata store by each segment's owning broker (only when it > > changes materially) and read by the controller leader. The controller > > never fans out RPCs to segment owners. > > 3. **The decision is a pure function.** Given a snapshot of load + > > layout + policy, the split/merge decision is deterministic and > > unit-testable in isolation from all I/O. > > > > --- > > > > ## Goals > > > > - Automatically increase segment count when a topic is under > > ingest/dispatch load or has more stream/checkpoint consumers than > > segments. > > - Automatically decrease segment count when load subsides, reclaiming > > broker resources. > > - Bound growth (`maxSegments`) and bound split↔merge churn > > (`maxDagDepth`, asymmetric cooldown). > > - Default-on cluster-wide, with per-namespace and per-topic overrides, > > following Pulsar's existing policy-resolution conventions. > > > > ### Non-goals > > > > - **Broker placement / rebalancing.** Which broker owns a segment's > > bundle is the load balancer's job; this PIP only changes *how many* > > segments exist. > > - **Key-aware or non-midpoint splits.** Splits use the existing > > midpoint-split mechanism from PIP-468. > > - **Cross-topic global optimization.** Each topic's controller decides > > independently. > > > > --- > > > > ## Design > > > > ### Overview > > > > ``` > > ┌────────────────────────────────────────────────────────────────┐ > > │ Segment-owning broker (per active segment) │ > > │ SegmentLoadReporter │ > > │ - samples the segment topic's TopicStats │ > > │ - writes SegmentLoadStats to metadata ONLY on material change │ > > └───────────────────────────────┬────────────────────────────────┘ > > │ (metadata store, push-on-change) > > ▼ > > ┌────────────────────────────────────────────────────────────────┐ > > │ Controller leader (per scalable topic) │ > > │ │ > > │ Event-driven — within seconds: │ > > │ on STREAM/CHECKPOINT consumer register/unregister │ > > │ (consumers already register with the controller — no poll) │ > > │ → evaluate the consumer-count split rule immediately │ > > │ │ > > │ Periodic AutoScaleTick — traffic, default 60s: │ > > │ 1. read SegmentLoadStats for all active segments │ > > │ 2. AutoScalePolicyEvaluator.decide(...) → Split|Merge|None │ > > │ 3. dispatch to existing splitSegment / mergeSegments │ > > └────────────────────────────────────────────────────────────────┘ > > ``` > > > > The two trigger sources reflect their different latency needs: a new > > consumer should get its own segment **within seconds**, so it is > > handled the instant the consumer registers with the controller; > > traffic shifts up or down over **a minute or more**, so they are > > evaluated on a slower periodic tick. The only new persistent state is > > `SegmentLoadStats`. The split/merge *mechanics* are entirely reused > > from PIP-468. > > > > ### Load reporting: push-to-metadata, not pull-per-tick > > > > Each segment's owning broker runs a **`SegmentLoadReporter`** for > > every ACTIVE `segment://` topic it hosts. The broker writes > > `SegmentLoadStats` **directly to the metadata store** — it already has > > the rates in memory, so there is no REST round-trip and no > > controller-initiated pull. On a fixed sampling interval it compares > > the current rates to the last ones written and writes **only when a > > rate changes by more than a significant threshold** (default ±25%) > > since the last write. A steady-state segment writes once and then goes > > silent, keeping metadata write volume bounded regardless of traffic. > > > > #### `SegmentLoadStats` (new metadata record) > > > > Stored at `/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load`: > > > > ```json > > { > > "msgRateIn": 12000.0, > > "bytesRateIn": 64000000.0, > > "msgRateOut": 48000.0, > > "bytesRateOut": 256000000.0 > > } > > ``` > > > > | Field | Source on the owning broker | Meaning for auto split/merge | > > |-------|------------------------------|---------------------------| > > | `msgRateIn` / `bytesRateIn` | segment topic `TopicStats` (60s > > rolling) | ingest load | > > | `msgRateOut` / `bytesRateOut` | segment topic `TopicStats` | > > dispatch/fanout load (high for topics with many subscriptions) | > > > > The record carries no timestamp of its own: the metadata store's > > `Stat` for the znode already exposes creation and last-modified > > timestamps, and the controller uses the **modified timestamp** for > > windowing. A record that still reads "cold" with an old modified time > > proves the segment has been cold for `now − modifiedAt` — so > > split/merge **windows derive from the store's `Stat`** with no > > per-tick history buffer and no extra field. > > > > #### Significant-change threshold > > > > To avoid rewriting on every minor wobble, the reporter only writes > > when a rate has moved by more than > > `scalableTopicLoadReportRateChangeThreshold` (default 25%) relative to > > the last value written for that segment. Sampling cadence is > > `scalableTopicLoadReportInterval` (default 10s). Both are configurable > > via `broker.conf`. > > > > ### Subscription types and what each load type drives > > > > Recall from PIP-468 that scalable-topic subscriptions are `STREAM` > > (controller-managed, 1:1 segment↔consumer assignment; covers both > > StreamConsumer and CheckpointConsumer) or `QUEUE` > > (controller-bypassing; every consumer attaches to every segment and > > the broker round-robins). > > > > | Trigger | STREAM subscriptions | QUEUE subscriptions | > > |---------|----------------------|----------------------| > > | Consumer-count scale-up | **Yes** — more segments give more 1:1 > > parallelism | **No** — queue consumers share segments; more segments > > don't add parallelism for them | > > | Traffic (in/out, msg/bytes) | Yes | **Yes** — queue traffic still > > loads the segment's broker and counts toward the per-segment rate | > > > > So a topic with only QUEUE subscriptions never splits on consumer > > count, but still splits when any segment's in/out rate crosses > > threshold. > > > > ### The decision: `AutoScalePolicyEvaluator` > > > > A pure function with no I/O: > > > > ``` > > decide(layout, loadBySegment, streamConsumerCountBySub, policy, now) > > → Split(segmentId) | Merge(segmentId1, segmentId2) | None > > ``` > > > > It runs in two passes — **split first (short cooldown), then merge > > (long cooldown)** — and emits at most one action per invocation. > > > > #### Pass 1 — SPLIT (fast, lightly coalesced) > > > > Splits fire as soon as conditions are met, bounded by `maxSegments`, > > an in-flight-operation guard, and a **short `splitCooldown` (default 1 > > min)**. The cooldown is deliberately short: it exists only to coalesce > > a burst of near-simultaneous triggers — e.g. a group of consumers > > connecting in rapid succession should cause one split, not N — while > > still letting a genuinely growing topic split again on the next > > minute. > > > > ``` > > if activeSegments >= maxSegments: skip split pass > > if now - lastSplitAtMs < splitCooldown: skip split pass > > > > (a) Consumer-driven: > > required = max over STREAM subscriptions of consumerCount // > > per-subscription max > > if required > activeSegments: > > → Split(busiest active segment by msgRateIn) > > > > (b) Load-driven (if (a) didn't fire): > > candidate segments = active segments where ANY of: > > - msgRateIn > splitMsgRateInThreshold > > - bytesRateIn > splitBytesRateInThreshold > > - msgRateOut > splitMsgRateOutThreshold > > - bytesRateOut> splitBytesRateOutThreshold > > if candidates non-empty: > > → Split(most-overloaded candidate); set lastSplitAtMs = now > > ``` > > > > The consumer-driven rule (a) is what the **event-driven path** > > evaluates the moment a consumer registers, so a new consumer gets a > > segment within seconds (subject to `splitCooldown`). The load-driven > > rule (b) runs on the periodic tick. Because `msgRateIn` etc. are > > already 60-second rolling averages on the broker, a value over > > threshold already represents *sustained* load — no extra split window > > is needed to filter transient spikes. > > > > #### Pass 2 — MERGE (lazy, rate-limited) > > > > Merges run only if no split fired this tick, the topic is not within > > `mergeCooldown` of its last merge, and the result respects > > `maxDagDepth`. > > > > ``` > > if a split fired this tick: skip merge pass > > if now - lastMergeAtMs < mergeCooldown: skip merge pass > > if activeSegments <= minSegments: skip merge pass > > > > candidate pairs = adjacent ACTIVE segment pairs where BOTH segments satisfy, > > for at least mergeWindow (checked via the store > > Stat's modified time): > > - msgRateIn < mergeMsgRateInThreshold > > - bytesRateIn < mergeBytesRateInThreshold > > - msgRateOut < mergeMsgRateOutThreshold > > - bytesRateOut< mergeBytesRateOutThreshold > > AND neither segment's lineage is already at maxDagDepth merges > > > > if candidate pairs non-empty: > > → Merge(coldest pair by combined rate); set lastMergeAtMs = now > > ``` > > > > Adjacency is required because the existing `mergeSegments` API only > > merges hash-range-adjacent active segments. > > > > ### Anti-flip-flop: three independent guards > > > > 1. **Threshold gap (hysteresis).** Split thresholds are well above > > merge thresholds for every metric. The dead-band between them is what > > prevents a just-merged segment from immediately re-qualifying for a > > split. > > 2. **Asymmetric cooldown.** Splits: a short `splitCooldown` (default 1 > > min) that only coalesces bursts. Merges: a longer `mergeCooldown` > > (default 5 min) plus a `mergeWindow` (default 5 min) during which the > > segment must have stayed cold. A pair must be *durably* cold to merge, > > but a segment can split again within a minute of getting hot. > > 3. **Max DAG depth on merges.** `maxDagDepth` (default 10) caps how > > many merges a given lineage can accumulate. Once reached, that lineage > > stops being a merge candidate — **but load-driven splits still fire.** > > This bounds the number of split↔merge cycles a hash range can churn > > through while never blocking a split that throughput requires. > > > > > **Design note — direction of the depth cap.** The cap restricts *merges*, > > > not splits. Splits are needed for correctness/performance and must always > > > be available; merges are the optional efficiency step and are the ones > > > that, combined with splits, could oscillate. `dagDepth` therefore counts > > > **merges in a segment's lineage**, derived from the existing > > > `parentIds`/`childIds` DAG in `ScalableTopicMetadata` — splits do not > > > consume depth budget. > > > > ### Caps > > > > | Cap | Default | Effect | > > |-----|---------|--------| > > | `maxSegments` | 64 | Splits stop once `activeSegments == maxSegments`. | > > | `minSegments` | 1 | Merges stop once `activeSegments == minSegments`. | > > | `maxDagDepth` | 10 | Merges stop for a lineage at the cap; splits > > unaffected. | > > > > ### Manual operations and cooldown > > > > - Manual `admin.scalableTopics().splitSegment(...)` **sets > > `lastSplitAtMs`**, so a manual split also starts the short auto-split > > cooldown. > > - Manual `admin.scalableTopics().mergeSegments(...)` **sets > > `lastMergeAtMs`**, so the operator's manual efficiency action also > > rate-limits the controller's automatic merges. > > > > ### Evaluation triggers > > > > The controller leader evaluates auto split/merge from two sources: > > > > - **Event-driven (within seconds)** — when a STREAM/CHECKPOINT > > consumer registers with or unregisters from the controller, it > > immediately evaluates the consumer-count split rule. No polling: > > consumer registration already flows through the controller (PIP-468). > > - **Periodic tick** — a `scheduleAutoScaleTick` (separate from the GC > > tick from PIP-468), default cadence `scalableTopicAutoScaleInterval = > > 60s`, evaluates the traffic-driven split rules and the merge pass. Per > > tick it does one metadata batch-read of the topic's `segments/*/load` > > records (or maintains a watch cache), evaluates, and dispatches. > > > > Both sources call the same `AutoScalePolicyEvaluator`; the > > event-driven path only needs the consumer-count rule, so it is cheap. > > Both are cancelled on leadership loss / close. > > > > --- > > > > ## Public-Facing Changes > > > > ### Configuration (`broker.conf`) > > > > Auto-scaling is **default-on cluster-wide**; these are the defaults > > applied to every scalable topic that does not override them. > > > > | Property | Default | Description | > > |----------|---------|-------------| > > | `scalableTopicAutoScaleEnabled` | `true` | Master switch for auto > > split/merge. | > > | `scalableTopicAutoScaleInterval` | `60s` | Periodic (traffic) > > evaluation cadence. Consumer-count changes are handled event-driven, > > independent of this. | > > | `scalableTopicMaxSegments` | `64` | Hard ceiling on active segments. | > > | `scalableTopicMinSegments` | `1` | Hard floor on active segments. | > > | `scalableTopicMaxDagDepth` | `10` | Max merges in a lineage before > > merges are disabled for it. | > > | `scalableTopicSplitCooldown` | `1m` | Minimum time between automatic > > splits on a topic (coalesces bursts). | > > | `scalableTopicMergeCooldown` | `5m` | Minimum time between automatic > > merges on a topic. | > > | `scalableTopicMergeWindow` | `5m` | Duration a pair must stay cold > > before merging. | > > | `scalableTopicSplitMsgRateInThreshold` | `10000` | msg/s ingest > > split trigger. | > > | `scalableTopicSplitBytesRateInThreshold` | `50MB` | bytes/s ingest > > split trigger. | > > | `scalableTopicSplitMsgRateOutThreshold` | `50000` | msg/s dispatch > > split trigger. | > > | `scalableTopicSplitBytesRateOutThreshold` | `250MB` | bytes/s > > dispatch split trigger. | > > | `scalableTopicMergeMsgRateInThreshold` | `1000` | msg/s ingest merge > > trigger. | > > | `scalableTopicMergeBytesRateInThreshold` | `5MB` | bytes/s ingest > > merge trigger. | > > | `scalableTopicMergeMsgRateOutThreshold` | `5000` | msg/s dispatch > > merge trigger. | > > | `scalableTopicMergeBytesRateOutThreshold` | `25MB` | bytes/s > > dispatch merge trigger. | > > | `scalableTopicLoadReportInterval` | `10s` | Segment owner sampling > > interval. | > > | `scalableTopicLoadReportRateChangeThreshold` | `25%` | Minimum rate > > change since the last write that triggers a new `SegmentLoadStats` > > write. | > > > > ### Policy resolution (namespace + topic overrides) > > > > Following the existing `autoTopicCreationOverride` pattern, an > > `AutoScalePolicyOverride` can be set at two levels; resolution is > > most-specific-wins, falling back to `broker.conf`: > > > > 1. **Per-topic** — a new `autoScalePolicy` field on > > `ScalableTopicMetadata`, set via > > `admin.scalableTopics().setAutoScalePolicy(topic, policy)` / > > `getAutoScalePolicy(topic)`. > > 2. **Per-namespace** — a new `scalableTopicAutoScalePolicy` field on > > `Policies`, set via > > `admin.namespaces().setScalableTopicAutoScalePolicy(ns, policy)`. > > > > `AutoScalePolicyOverride` carries the same knobs as the broker config > > (all optional; unset fields fall through). Setting `enabled = false` > > opts a topic or namespace out entirely. > > > > ### Admin Client API > > > > ```java > > interface ScalableTopics { > > // ... existing ... > > void setAutoScalePolicy(String topic, AutoScalePolicyOverride > > policy) throws PulsarAdminException; > > AutoScalePolicyOverride getAutoScalePolicy(String topic) throws > > PulsarAdminException; > > void removeAutoScalePolicy(String topic) throws PulsarAdminException; > > } > > > > interface Namespaces { > > // ... existing ... > > void setScalableTopicAutoScalePolicy(String namespace, > > AutoScalePolicyOverride policy) ...; > > AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String > > namespace) ...; > > void removeScalableTopicAutoScalePolicy(String namespace) ...; > > } > > ``` > > > > ### Metadata Store Paths > > > > | Path | Content | Writer | > > |------|---------|--------| > > | `/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load` | > > `SegmentLoadStats` JSON | segment-owning broker | > > > > (`autoScalePolicy` rides inside the existing `ScalableTopicMetadata` > > blob; the namespace override rides inside `Policies`. No other new > > paths.) > > > > ### Observability > > > > - New per-topic metrics: `pulsar_scalable_topic_active_segments`, > > `pulsar_scalable_topic_auto_splits_total`, > > `pulsar_scalable_topic_auto_merges_total`, > > `pulsar_scalable_topic_split_suppressed_max_segments_total`, > > `pulsar_scalable_topic_merge_suppressed_max_depth_total`. > > - The existing `ScalableTopicStats` is extended with the most recent > > `SegmentLoadStats` per segment and the resolved effective policy, so > > operators can see *why* the controller did or did not act. > > > > --- > > > > ## Operational Safety > > > > The `maxSegments`, `maxDagDepth`, asymmetric cooldown, and > > threshold-gap guards together bound both the rate and the total amount > > of structural change a topic can undergo, so enabling auto split/merge > > cannot cause unbounded segment growth or split/merge storms. > > > > Operators who want manual-only control set > > `scalableTopicAutoScaleEnabled=false` (cluster) or an `enabled=false` > > override (namespace/topic). > > > > > **Compatibility:** scalable topics are a new, as-yet-unreleased feature > > > ([PIP-460](pip-460.md)), so there is no backward/forward compatibility to > > > consider — `SegmentLoadStats`, the policy fields, and the config knobs > > > all ship together with the rest of the scalable-topic feature. > > > > --- > > > > ## Security Considerations > > > > `setAutoScalePolicy` / `getAutoScalePolicy` (topic and namespace > > variants) require the same admin permissions as the corresponding > > existing scalable-topic and namespace policy operations. > > `SegmentLoadStats` is written by brokers via their authenticated > > internal identity and is not client-writable. > > > > --- > > > > ## Links > > > > - Parent PIP: [PIP-468: Scalable Topic Controller](pip-468.md) > > - Grand-parent PIP: [PIP-460: Scalable Topics](pip-460.md) > > - V5 Client API: [PIP-466: New Java Client API (V5)](pip-466.md) > > > > Matteo Merli <[email protected]> > > > > Thu, Jun 4, 4:36 PM (3 days ago) > > to Dev > > https://github.com/apache/pulsar/pull/25938 > > > > > > ------ > > > > > > # PIP-483: Scalable Topic Auto Split/Merge > > > > *Sub-PIP of [PIP-468: Scalable Topic Controller](pip-468.md)* > > > > ## Motivation > > > > [PIP-468](pip-468.md) gives the Scalable Topic Controller the ability > > to **split** and **merge** segments, but only on explicit operator > > request via the admin API. An operator has to watch the topic, decide > > it is hot, and issue a split — and later notice it has gone cold and > > issue a merge. This is the same operational toil that partition-count > > management imposes on classic Pulsar topics, which scalable topics > > were meant to eliminate. > > > > This PIP adds an **auto-scaling policy** to the controller: the > > controller leader observes per-segment load and per-subscription > > consumer pressure, and autonomously splits hot segments and merges > > cold ones, within hard caps that prevent runaway growth and > > split/merge flip-flopping. > > > > The design is built around three principles that came out of the > > design discussion: > > > > 1. **Splits are fast; merges are lazy.** A split protects throughput > > and latency under load, so it fires quickly — with only a short > > cooldown to coalesce bursts of near-simultaneous triggers (e.g. a > > group of consumers connecting in rapid succession). A merge is purely > > an efficiency reclaim, so it can wait, be rate-limited, and be skipped > > when in doubt. > > 2. **The controller reacts, it does not poll.** New stream/checkpoint > > consumers register directly with the controller, so consumer-count > > changes are handled event-driven within seconds. Load data is *pushed* > > into the metadata store by each segment's owning broker (only when it > > changes materially) and read by the controller leader. The controller > > never fans out RPCs to segment owners. > > 3. **The decision is a pure function.** Given a snapshot of load + > > layout + policy, the split/merge decision is deterministic and > > unit-testable in isolation from all I/O. > > > > --- > > > > ## Goals > > > > - Automatically increase segment count when a topic is under > > ingest/dispatch load or has more stream/checkpoint consumers than > > segments. > > - Automatically decrease segment count when load subsides, reclaiming > > broker resources. > > - Bound growth (`maxSegments`) and bound split↔merge churn > > (`maxDagDepth`, asymmetric cooldown). > > - Default-on cluster-wide, with per-namespace and per-topic overrides, > > following Pulsar's existing policy-resolution conventions. > > > > ### Non-goals > > > > - **Broker placement / rebalancing.** Which broker owns a segment's > > bundle is the load balancer's job; this PIP only changes *how many* > > segments exist. > > - **Key-aware or non-midpoint splits.** Splits use the existing > > midpoint-split mechanism from PIP-468. > > - **Cross-topic global optimization.** Each topic's controller decides > > independently. > > > > --- > > > > ## Design > > > > ### Overview > > > > ``` > > ┌────────────────────────────────────────────────────────────────┐ > > │ Segment-owning broker (per active segment) │ > > │ SegmentLoadReporter │ > > │ - samples the segment topic's TopicStats │ > > │ - writes SegmentLoadStats to metadata ONLY on material change │ > > └───────────────────────────────┬────────────────────────────────┘ > > │ (metadata store, push-on-change) > > ▼ > > ┌────────────────────────────────────────────────────────────────┐ > > │ Controller leader (per scalable topic) │ > > │ │ > > │ Event-driven — within seconds: │ > > │ on STREAM/CHECKPOINT consumer register/unregister │ > > │ (consumers already register with the controller — no poll) │ > > │ → evaluate the consumer-count split rule immediately │ > > │ │ > > │ Periodic AutoScaleTick — traffic, default 60s: │ > > │ 1. read SegmentLoadStats for all active segments │ > > │ 2. AutoScalePolicyEvaluator.decide(...) → Split|Merge|None │ > > │ 3. dispatch to existing splitSegment / mergeSegments │ > > └────────────────────────────────────────────────────────────────┘ > > ``` > > > > The two trigger sources reflect their different latency needs: a new > > consumer should get its own segment **within seconds**, so it is > > handled the instant the consumer registers with the controller; > > traffic shifts up or down over **a minute or more**, so they are > > evaluated on a slower periodic tick. The only new persistent state is > > `SegmentLoadStats`. The split/merge *mechanics* are entirely reused > > from PIP-468. > > > > ### Load reporting: push-to-metadata, not pull-per-tick > > > > Each segment's owning broker runs a **`SegmentLoadReporter`** for > > every ACTIVE `segment://` topic it hosts. The broker writes > > `SegmentLoadStats` **directly to the metadata store** — it already has > > the rates in memory, so there is no REST round-trip and no > > controller-initiated pull. On a fixed sampling interval it compares > > the current rates to the last ones written and writes **only when a > > rate changes by more than a significant threshold** (default ±25%) > > since the last write. A steady-state segment writes once and then goes > > silent, keeping metadata write volume bounded regardless of traffic. > > > > #### `SegmentLoadStats` (new metadata record) > > > > Stored at `/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load`: > > > > ```json > > { > > "msgRateIn": 12000.0, > > "bytesRateIn": 64000000.0, > > "msgRateOut": 48000.0, > > "bytesRateOut": 256000000.0 > > } > > ``` > > > > | Field | Source on the owning broker | Meaning for auto split/merge | > > |-------|------------------------------|---------------------------| > > | `msgRateIn` / `bytesRateIn` | segment topic `TopicStats` (60s > > rolling) | ingest load | > > | `msgRateOut` / `bytesRateOut` | segment topic `TopicStats` | > > dispatch/fanout load (high for topics with many subscriptions) | > > > > The record carries no timestamp of its own: the metadata store's > > `Stat` for the znode already exposes creation and last-modified > > timestamps, and the controller uses the **modified timestamp** for > > windowing. A record that still reads "cold" with an old modified time > > proves the segment has been cold for `now − modifiedAt` — so > > split/merge **windows derive from the store's `Stat`** with no > > per-tick history buffer and no extra field. > > > > #### Significant-change threshold > > > > To avoid rewriting on every minor wobble, the reporter only writes > > when a rate has moved by more than > > `scalableTopicLoadReportRateChangeThreshold` (default 25%) relative to > > the last value written for that segment. Sampling cadence is > > `scalableTopicLoadReportInterval` (default 10s). Both are configurable > > via `broker.conf`. > > > > ### Subscription types and what each load type drives > > > > Recall from PIP-468 that scalable-topic subscriptions are `STREAM` > > (controller-managed, 1:1 segment↔consumer assignment; covers both > > StreamConsumer and CheckpointConsumer) or `QUEUE` > > (controller-bypassing; every consumer attaches to every segment and > > the broker round-robins). > > > > | Trigger | STREAM subscriptions | QUEUE subscriptions | > > |---------|----------------------|----------------------| > > | Consumer-count scale-up | **Yes** — more segments give more 1:1 > > parallelism | **No** — queue consumers share segments; more segments > > don't add parallelism for them | > > | Traffic (in/out, msg/bytes) | Yes | **Yes** — queue traffic still > > loads the segment's broker and counts toward the per-segment rate | > > > > So a topic with only QUEUE subscriptions never splits on consumer > > count, but still splits when any segment's in/out rate crosses > > threshold. > > > > ### The decision: `AutoScalePolicyEvaluator` > > > > A pure function with no I/O: > > > > ``` > > decide(layout, loadBySegment, streamConsumerCountBySub, policy, now) > > → Split(segmentId) | Merge(segmentId1, segmentId2) | None > > ``` > > > > It runs in two passes — **split first (short cooldown), then merge > > (long cooldown)** — and emits at most one action per invocation. > > > > #### Pass 1 — SPLIT (fast, lightly coalesced) > > > > Splits fire as soon as conditions are met, bounded by `maxSegments`, > > an in-flight-operation guard, and a **short `splitCooldown` (default 1 > > min)**. The cooldown is deliberately short: it exists only to coalesce > > a burst of near-simultaneous triggers — e.g. a group of consumers > > connecting in rapid succession should cause one split, not N — while > > still letting a genuinely growing topic split again on the next > > minute. > > > > ``` > > if activeSegments >= maxSegments: skip split pass > > if now - lastSplitAtMs < splitCooldown: skip split pass > > > > (a) Consumer-driven: > > required = max over STREAM subscriptions of consumerCount // > > per-subscription max > > if required > activeSegments: > > → Split(busiest active segment by msgRateIn) > > > > (b) Load-driven (if (a) didn't fire): > > candidate segments = active segments where ANY of: > > - msgRateIn > splitMsgRateInThreshold > > - bytesRateIn > splitBytesRateInThreshold > > - msgRateOut > splitMsgRateOutThreshold > > - bytesRateOut> splitBytesRateOutThreshold > > if candidates non-empty: > > → Split(most-overloaded candidate); set lastSplitAtMs = now > > ``` > > > > The consumer-driven rule (a) is what the **event-driven path** > > evaluates the moment a consumer registers, so a new consumer gets a > > segment within seconds (subject to `splitCooldown`). The load-driven > > rule (b) runs on the periodic tick. Because `msgRateIn` etc. are > > already 60-second rolling averages on the broker, a value over > > threshold already represents *sustained* load — no extra split window > > is needed to filter transient spikes. > > > > #### Pass 2 — MERGE (lazy, rate-limited) > > > > Merges run only if no split fired this tick, the topic is not within > > `mergeCooldown` of its last merge, and the result respects > > `maxDagDepth`. > > > > ``` > > if a split fired this tick: skip merge pass > > if now - lastMergeAtMs < mergeCooldown: skip merge pass > > if activeSegments <= minSegments: skip merge pass > > > > candidate pairs = adjacent ACTIVE segment pairs where BOTH segments satisfy, > > for at least mergeWindow (checked via the store > > Stat's modified time): > > - msgRateIn < mergeMsgRateInThreshold > > - bytesRateIn < mergeBytesRateInThreshold > > - msgRateOut < mergeMsgRateOutThreshold > > - bytesRateOut< mergeBytesRateOutThreshold > > AND neither segment's lineage is already at maxDagDepth merges > > > > if candidate pairs non-empty: > > → Merge(coldest pair by combined rate); set lastMergeAtMs = now > > ``` > > > > Adjacency is required because the existing `mergeSegments` API only > > merges hash-range-adjacent active segments. > > > > ### Anti-flip-flop: three independent guards > > > > 1. **Threshold gap (hysteresis).** Split thresholds are well above > > merge thresholds for every metric. The dead-band between them is what > > prevents a just-merged segment from immediately re-qualifying for a > > split. > > 2. **Asymmetric cooldown.** Splits: a short `splitCooldown` (default 1 > > min) that only coalesces bursts. Merges: a longer `mergeCooldown` > > (default 5 min) plus a `mergeWindow` (default 5 min) during which the > > segment must have stayed cold. A pair must be *durably* cold to merge, > > but a segment can split again within a minute of getting hot. > > 3. **Max DAG depth on merges.** `maxDagDepth` (default 10) caps how > > many merges a given lineage can accumulate. Once reached, that lineage > > stops being a merge candidate — **but load-driven splits still fire.** > > This bounds the number of split↔merge cycles a hash range can churn > > through while never blocking a split that throughput requires. > > > > > **Design note — direction of the depth cap.** The cap restricts *merges*, > > > not splits. Splits are needed for correctness/performance and must always > > > be available; merges are the optional efficiency step and are the ones > > > that, combined with splits, could oscillate. `dagDepth` therefore counts > > > **merges in a segment's lineage**, derived from the existing > > > `parentIds`/`childIds` DAG in `ScalableTopicMetadata` — splits do not > > > consume depth budget. > > > > ### Caps > > > > | Cap | Default | Effect | > > |-----|---------|--------| > > | `maxSegments` | 64 | Splits stop once `activeSegments == maxSegments`. | > > | `minSegments` | 1 | Merges stop once `activeSegments == minSegments`. | > > | `maxDagDepth` | 10 | Merges stop for a lineage at the cap; splits > > unaffected. | > > > > ### Manual operations and cooldown > > > > - Manual `admin.scalableTopics().splitSegment(...)` **sets > > `lastSplitAtMs`**, so a manual split also starts the short auto-split > > cooldown. > > - Manual `admin.scalableTopics().mergeSegments(...)` **sets > > `lastMergeAtMs`**, so the operator's manual efficiency action also > > rate-limits the controller's automatic merges. > > > > ### Evaluation triggers > > > > The controller leader evaluates auto split/merge from two sources: > > > > - **Event-driven (within seconds)** — when a STREAM/CHECKPOINT > > consumer registers with or unregisters from the controller, it > > immediately evaluates the consumer-count split rule. No polling: > > consumer registration already flows through the controller (PIP-468). > > - **Periodic tick** — a `scheduleAutoScaleTick` (separate from the GC > > tick from PIP-468), default cadence `scalableTopicAutoScaleInterval = > > 60s`, evaluates the traffic-driven split rules and the merge pass. Per > > tick it does one metadata batch-read of the topic's `segments/*/load` > > records (or maintains a watch cache), evaluates, and dispatches. > > > > Both sources call the same `AutoScalePolicyEvaluator`; the > > event-driven path only needs the consumer-count rule, so it is cheap. > > Both are cancelled on leadership loss / close. > > > > --- > > > > ## Public-Facing Changes > > > > ### Configuration (`broker.conf`) > > > > Auto-scaling is **default-on cluster-wide**; these are the defaults > > applied to every scalable topic that does not override them. > > > > | Property | Default | Description | > > |----------|---------|-------------| > > | `scalableTopicAutoScaleEnabled` | `true` | Master switch for auto > > split/merge. | > > | `scalableTopicAutoScaleInterval` | `60s` | Periodic (traffic) > > evaluation cadence. Consumer-count changes are handled event-driven, > > independent of this. | > > | `scalableTopicMaxSegments` | `64` | Hard ceiling on active segments. | > > | `scalableTopicMinSegments` | `1` | Hard floor on active segments. | > > | `scalableTopicMaxDagDepth` | `10` | Max merges in a lineage before > > merges are disabled for it. | > > | `scalableTopicSplitCooldown` | `1m` | Minimum time between automatic > > splits on a topic (coalesces bursts). | > > | `scalableTopicMergeCooldown` | `5m` | Minimum time between automatic > > merges on a topic. | > > | `scalableTopicMergeWindow` | `5m` | Duration a pair must stay cold > > before merging. | > > | `scalableTopicSplitMsgRateInThreshold` | `10000` | msg/s ingest > > split trigger. | > > | `scalableTopicSplitBytesRateInThreshold` | `50MB` | bytes/s ingest > > split trigger. | > > | `scalableTopicSplitMsgRateOutThreshold` | `50000` | msg/s dispatch > > split trigger. | > > | `scalableTopicSplitBytesRateOutThreshold` | `250MB` | bytes/s > > dispatch split trigger. | > > | `scalableTopicMergeMsgRateInThreshold` | `1000` | msg/s ingest merge > > trigger. | > > | `scalableTopicMergeBytesRateInThreshold` | `5MB` | bytes/s ingest > > merge trigger. | > > | `scalableTopicMergeMsgRateOutThreshold` | `5000` | msg/s dispatch > > merge trigger. | > > | `scalableTopicMergeBytesRateOutThreshold` | `25MB` | bytes/s > > dispatch merge trigger. | > > | `scalableTopicLoadReportInterval` | `10s` | Segment owner sampling > > interval. | > > | `scalableTopicLoadReportRateChangeThreshold` | `25%` | Minimum rate > > change since the last write that triggers a new `SegmentLoadStats` > > write. | > > > > ### Policy resolution (namespace + topic overrides) > > > > Following the existing `autoTopicCreationOverride` pattern, an > > `AutoScalePolicyOverride` can be set at two levels; resolution is > > most-specific-wins, falling back to `broker.conf`: > > > > 1. **Per-topic** — a new `autoScalePolicy` field on > > `ScalableTopicMetadata`, set via > > `admin.scalableTopics().setAutoScalePolicy(topic, policy)` / > > `getAutoScalePolicy(topic)`. > > 2. **Per-namespace** — a new `scalableTopicAutoScalePolicy` field on > > `Policies`, set via > > `admin.namespaces().setScalableTopicAutoScalePolicy(ns, policy)`. > > > > `AutoScalePolicyOverride` carries the same knobs as the broker config > > (all optional; unset fields fall through). Setting `enabled = false` > > opts a topic or namespace out entirely. > > > > ### Admin Client API > > > > ```java > > interface ScalableTopics { > > // ... existing ... > > void setAutoScalePolicy(String topic, AutoScalePolicyOverride > > policy) throws PulsarAdminException; > > AutoScalePolicyOverride getAutoScalePolicy(String topic) throws > > PulsarAdminException; > > void removeAutoScalePolicy(String topic) throws PulsarAdminException; > > } > > > > interface Namespaces { > > // ... existing ... > > void setScalableTopicAutoScalePolicy(String namespace, > > AutoScalePolicyOverride policy) ...; > > AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String > > namespace) ...; > > void removeScalableTopicAutoScalePolicy(String namespace) ...; > > } > > ``` > > > > ### Metadata Store Paths > > > > | Path | Content | Writer | > > |------|---------|--------| > > | `/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load` | > > `SegmentLoadStats` JSON | segment-owning broker | > > > > (`autoScalePolicy` rides inside the existing `ScalableTopicMetadata` > > blob; the namespace override rides inside `Policies`. No other new > > paths.) > > > > ### Observability > > > > - New per-topic metrics: `pulsar_scalable_topic_active_segments`, > > `pulsar_scalable_topic_auto_splits_total`, > > `pulsar_scalable_topic_auto_merges_total`, > > `pulsar_scalable_topic_split_suppressed_max_segments_total`, > > `pulsar_scalable_topic_merge_suppressed_max_depth_total`. > > - The existing `ScalableTopicStats` is extended with the most recent > > `SegmentLoadStats` per segment and the resolved effective policy, so > > operators can see *why* the controller did or did not act. > > > > --- > > > > ## Operational Safety > > > > The `maxSegments`, `maxDagDepth`, asymmetric cooldown, and > > threshold-gap guards together bound both the rate and the total amount > > of structural change a topic can undergo, so enabling auto split/merge > > cannot cause unbounded segment growth or split/merge storms. > > > > Operators who want manual-only control set > > `scalableTopicAutoScaleEnabled=false` (cluster) or an `enabled=false` > > override (namespace/topic). > > > > > **Compatibility:** scalable topics are a new, as-yet-unreleased feature > > > ([PIP-460](pip-460.md)), so there is no backward/forward compatibility to > > > consider — `SegmentLoadStats`, the policy fields, and the config knobs > > > all ship together with the rest of the scalable-topic feature. > > > > --- > > > > ## Security Considerations > > > > `setAutoScalePolicy` / `getAutoScalePolicy` (topic and namespace > > variants) require the same admin permissions as the corresponding > > existing scalable-topic and namespace policy operations. > > `SegmentLoadStats` is written by brokers via their authenticated > > internal identity and is not client-writable. > > > > --- > > > > ## Links > > > > - Parent PIP: [PIP-468: Scalable Topic Controller](pip-468.md) > > - Grand-parent PIP: [PIP-460: Scalable Topics](pip-460.md) > > - V5 Client API: [PIP-466: New Java Client API (V5)](pip-466.md) > > > >
