Closing the vote with 2 +1s:

 * Matteo
 * Lari



--
Matteo Merli
<[email protected]>

On Thu, Jun 11, 2026 at 8:37 AM Lari Hotari <[email protected]> wrote:
>
> +1 (binding)
>
> -Lari
>
> On 2026/06/07 22:21:56 Matteo Merli wrote:
> > https://github.com/apache/pulsar/pull/25938
> >
> > ------
> >
> > # PIP-483: Scalable Topic Auto Split/Merge
> >
> > *Sub-PIP of [PIP-468: Scalable Topic Controller](pip-468.md)*
> >
> > ## Motivation
> >
> > [PIP-468](pip-468.md) gives the Scalable Topic Controller the ability
> > to **split** and **merge** segments, but only on explicit operator
> > request via the admin API. An operator has to watch the topic, decide
> > it is hot, and issue a split — and later notice it has gone cold and
> > issue a merge. This is the same operational toil that partition-count
> > management imposes on classic Pulsar topics, which scalable topics
> > were meant to eliminate.
> >
> > This PIP adds an **auto-scaling policy** to the controller: the
> > controller leader observes per-segment load and per-subscription
> > consumer pressure, and autonomously splits hot segments and merges
> > cold ones, within hard caps that prevent runaway growth and
> > split/merge flip-flopping.
> >
> > The design is built around three principles that came out of the
> > design discussion:
> >
> > 1. **Splits are fast; merges are lazy.** A split protects throughput
> > and latency under load, so it fires quickly — with only a short
> > cooldown to coalesce bursts of near-simultaneous triggers (e.g. a
> > group of consumers connecting in rapid succession). A merge is purely
> > an efficiency reclaim, so it can wait, be rate-limited, and be skipped
> > when in doubt.
> > 2. **The controller reacts, it does not poll.** New stream/checkpoint
> > consumers register directly with the controller, so consumer-count
> > changes are handled event-driven within seconds. Load data is *pushed*
> > into the metadata store by each segment's owning broker (only when it
> > changes materially) and read by the controller leader. The controller
> > never fans out RPCs to segment owners.
> > 3. **The decision is a pure function.** Given a snapshot of load +
> > layout + policy, the split/merge decision is deterministic and
> > unit-testable in isolation from all I/O.
> >
> > ---
> >
> > ## Goals
> >
> > - Automatically increase segment count when a topic is under
> > ingest/dispatch load or has more stream/checkpoint consumers than
> > segments.
> > - Automatically decrease segment count when load subsides, reclaiming
> > broker resources.
> > - Bound growth (`maxSegments`) and bound split↔merge churn
> > (`maxDagDepth`, asymmetric cooldown).
> > - Default-on cluster-wide, with per-namespace and per-topic overrides,
> > following Pulsar's existing policy-resolution conventions.
> >
> > ### Non-goals
> >
> > - **Broker placement / rebalancing.** Which broker owns a segment's
> > bundle is the load balancer's job; this PIP only changes *how many*
> > segments exist.
> > - **Key-aware or non-midpoint splits.** Splits use the existing
> > midpoint-split mechanism from PIP-468.
> > - **Cross-topic global optimization.** Each topic's controller decides
> > independently.
> >
> > ---
> >
> > ## Design
> >
> > ### Overview
> >
> > ```
> > ┌────────────────────────────────────────────────────────────────┐
> > │ Segment-owning broker (per active segment)                      │
> > │   SegmentLoadReporter                                            │
> > │   - samples the segment topic's TopicStats                      │
> > │   - writes SegmentLoadStats to metadata ONLY on material change │
> > └───────────────────────────────┬────────────────────────────────┘
> >                                  │ (metadata store, push-on-change)
> >                                  ▼
> > ┌────────────────────────────────────────────────────────────────┐
> > │ Controller leader (per scalable topic)                          │
> > │                                                                  │
> > │  Event-driven — within seconds:                                 │
> > │    on STREAM/CHECKPOINT consumer register/unregister            │
> > │      (consumers already register with the controller — no poll) │
> > │      → evaluate the consumer-count split rule immediately       │
> > │                                                                  │
> > │  Periodic AutoScaleTick — traffic, default 60s:                 │
> > │    1. read SegmentLoadStats for all active segments             │
> > │    2. AutoScalePolicyEvaluator.decide(...) → Split|Merge|None   │
> > │    3. dispatch to existing splitSegment / mergeSegments         │
> > └────────────────────────────────────────────────────────────────┘
> > ```
> >
> > The two trigger sources reflect their different latency needs: a new
> > consumer should get its own segment **within seconds**, so it is
> > handled the instant the consumer registers with the controller;
> > traffic shifts up or down over **a minute or more**, so they are
> > evaluated on a slower periodic tick. The only new persistent state is
> > `SegmentLoadStats`. The split/merge *mechanics* are entirely reused
> > from PIP-468.
> >
> > ### Load reporting: push-to-metadata, not pull-per-tick
> >
> > Each segment's owning broker runs a **`SegmentLoadReporter`** for
> > every ACTIVE `segment://` topic it hosts. The broker writes
> > `SegmentLoadStats` **directly to the metadata store** — it already has
> > the rates in memory, so there is no REST round-trip and no
> > controller-initiated pull. On a fixed sampling interval it compares
> > the current rates to the last ones written and writes **only when a
> > rate changes by more than a significant threshold** (default ±25%)
> > since the last write. A steady-state segment writes once and then goes
> > silent, keeping metadata write volume bounded regardless of traffic.
> >
> > #### `SegmentLoadStats` (new metadata record)
> >
> > Stored at `/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load`:
> >
> > ```json
> > {
> >   "msgRateIn": 12000.0,
> >   "bytesRateIn": 64000000.0,
> >   "msgRateOut": 48000.0,
> >   "bytesRateOut": 256000000.0
> > }
> > ```
> >
> > | Field | Source on the owning broker | Meaning for auto split/merge |
> > |-------|------------------------------|---------------------------|
> > | `msgRateIn` / `bytesRateIn` | segment topic `TopicStats` (60s
> > rolling) | ingest load |
> > | `msgRateOut` / `bytesRateOut` | segment topic `TopicStats` |
> > dispatch/fanout load (high for topics with many subscriptions) |
> >
> > The record carries no timestamp of its own: the metadata store's
> > `Stat` for the znode already exposes creation and last-modified
> > timestamps, and the controller uses the **modified timestamp** for
> > windowing. A record that still reads "cold" with an old modified time
> > proves the segment has been cold for `now − modifiedAt` — so
> > split/merge **windows derive from the store's `Stat`** with no
> > per-tick history buffer and no extra field.
> >
> > #### Significant-change threshold
> >
> > To avoid rewriting on every minor wobble, the reporter only writes
> > when a rate has moved by more than
> > `scalableTopicLoadReportRateChangeThreshold` (default 25%) relative to
> > the last value written for that segment. Sampling cadence is
> > `scalableTopicLoadReportInterval` (default 10s). Both are configurable
> > via `broker.conf`.
> >
> > ### Subscription types and what each load type drives
> >
> > Recall from PIP-468 that scalable-topic subscriptions are `STREAM`
> > (controller-managed, 1:1 segment↔consumer assignment; covers both
> > StreamConsumer and CheckpointConsumer) or `QUEUE`
> > (controller-bypassing; every consumer attaches to every segment and
> > the broker round-robins).
> >
> > | Trigger | STREAM subscriptions | QUEUE subscriptions |
> > |---------|----------------------|----------------------|
> > | Consumer-count scale-up | **Yes** — more segments give more 1:1
> > parallelism | **No** — queue consumers share segments; more segments
> > don't add parallelism for them |
> > | Traffic (in/out, msg/bytes) | Yes | **Yes** — queue traffic still
> > loads the segment's broker and counts toward the per-segment rate |
> >
> > So a topic with only QUEUE subscriptions never splits on consumer
> > count, but still splits when any segment's in/out rate crosses
> > threshold.
> >
> > ### The decision: `AutoScalePolicyEvaluator`
> >
> > A pure function with no I/O:
> >
> > ```
> > decide(layout, loadBySegment, streamConsumerCountBySub, policy, now)
> >     → Split(segmentId) | Merge(segmentId1, segmentId2) | None
> > ```
> >
> > It runs in two passes — **split first (short cooldown), then merge
> > (long cooldown)** — and emits at most one action per invocation.
> >
> > #### Pass 1 — SPLIT (fast, lightly coalesced)
> >
> > Splits fire as soon as conditions are met, bounded by `maxSegments`,
> > an in-flight-operation guard, and a **short `splitCooldown` (default 1
> > min)**. The cooldown is deliberately short: it exists only to coalesce
> > a burst of near-simultaneous triggers — e.g. a group of consumers
> > connecting in rapid succession should cause one split, not N — while
> > still letting a genuinely growing topic split again on the next
> > minute.
> >
> > ```
> > if activeSegments >= maxSegments: skip split pass
> > if now - lastSplitAtMs < splitCooldown: skip split pass
> >
> > (a) Consumer-driven:
> >       required = max over STREAM subscriptions of consumerCount   //
> > per-subscription max
> >       if required > activeSegments:
> >           → Split(busiest active segment by msgRateIn)
> >
> > (b) Load-driven (if (a) didn't fire):
> >       candidate segments = active segments where ANY of:
> >         - msgRateIn   > splitMsgRateInThreshold
> >         - bytesRateIn > splitBytesRateInThreshold
> >         - msgRateOut  > splitMsgRateOutThreshold
> >         - bytesRateOut> splitBytesRateOutThreshold
> >       if candidates non-empty:
> >           → Split(most-overloaded candidate); set lastSplitAtMs = now
> > ```
> >
> > The consumer-driven rule (a) is what the **event-driven path**
> > evaluates the moment a consumer registers, so a new consumer gets a
> > segment within seconds (subject to `splitCooldown`). The load-driven
> > rule (b) runs on the periodic tick. Because `msgRateIn` etc. are
> > already 60-second rolling averages on the broker, a value over
> > threshold already represents *sustained* load — no extra split window
> > is needed to filter transient spikes.
> >
> > #### Pass 2 — MERGE (lazy, rate-limited)
> >
> > Merges run only if no split fired this tick, the topic is not within
> > `mergeCooldown` of its last merge, and the result respects
> > `maxDagDepth`.
> >
> > ```
> > if a split fired this tick: skip merge pass
> > if now - lastMergeAtMs < mergeCooldown: skip merge pass
> > if activeSegments <= minSegments: skip merge pass
> >
> > candidate pairs = adjacent ACTIVE segment pairs where BOTH segments satisfy,
> >                   for at least mergeWindow (checked via the store
> > Stat's modified time):
> >     - msgRateIn   < mergeMsgRateInThreshold
> >     - bytesRateIn < mergeBytesRateInThreshold
> >     - msgRateOut  < mergeMsgRateOutThreshold
> >     - bytesRateOut< mergeBytesRateOutThreshold
> >   AND neither segment's lineage is already at maxDagDepth merges
> >
> > if candidate pairs non-empty:
> >     → Merge(coldest pair by combined rate); set lastMergeAtMs = now
> > ```
> >
> > Adjacency is required because the existing `mergeSegments` API only
> > merges hash-range-adjacent active segments.
> >
> > ### Anti-flip-flop: three independent guards
> >
> > 1. **Threshold gap (hysteresis).** Split thresholds are well above
> > merge thresholds for every metric. The dead-band between them is what
> > prevents a just-merged segment from immediately re-qualifying for a
> > split.
> > 2. **Asymmetric cooldown.** Splits: a short `splitCooldown` (default 1
> > min) that only coalesces bursts. Merges: a longer `mergeCooldown`
> > (default 5 min) plus a `mergeWindow` (default 5 min) during which the
> > segment must have stayed cold. A pair must be *durably* cold to merge,
> > but a segment can split again within a minute of getting hot.
> > 3. **Max DAG depth on merges.** `maxDagDepth` (default 10) caps how
> > many merges a given lineage can accumulate. Once reached, that lineage
> > stops being a merge candidate — **but load-driven splits still fire.**
> > This bounds the number of split↔merge cycles a hash range can churn
> > through while never blocking a split that throughput requires.
> >
> > > **Design note — direction of the depth cap.** The cap restricts *merges*, 
> > > not splits. Splits are needed for correctness/performance and must always 
> > > be available; merges are the optional efficiency step and are the ones 
> > > that, combined with splits, could oscillate. `dagDepth` therefore counts 
> > > **merges in a segment's lineage**, derived from the existing 
> > > `parentIds`/`childIds` DAG in `ScalableTopicMetadata` — splits do not 
> > > consume depth budget.
> >
> > ### Caps
> >
> > | Cap | Default | Effect |
> > |-----|---------|--------|
> > | `maxSegments` | 64 | Splits stop once `activeSegments == maxSegments`. |
> > | `minSegments` | 1 | Merges stop once `activeSegments == minSegments`. |
> > | `maxDagDepth` | 10 | Merges stop for a lineage at the cap; splits
> > unaffected. |
> >
> > ### Manual operations and cooldown
> >
> > - Manual `admin.scalableTopics().splitSegment(...)` **sets
> > `lastSplitAtMs`**, so a manual split also starts the short auto-split
> > cooldown.
> > - Manual `admin.scalableTopics().mergeSegments(...)` **sets
> > `lastMergeAtMs`**, so the operator's manual efficiency action also
> > rate-limits the controller's automatic merges.
> >
> > ### Evaluation triggers
> >
> > The controller leader evaluates auto split/merge from two sources:
> >
> > - **Event-driven (within seconds)** — when a STREAM/CHECKPOINT
> > consumer registers with or unregisters from the controller, it
> > immediately evaluates the consumer-count split rule. No polling:
> > consumer registration already flows through the controller (PIP-468).
> > - **Periodic tick** — a `scheduleAutoScaleTick` (separate from the GC
> > tick from PIP-468), default cadence `scalableTopicAutoScaleInterval =
> > 60s`, evaluates the traffic-driven split rules and the merge pass. Per
> > tick it does one metadata batch-read of the topic's `segments/*/load`
> > records (or maintains a watch cache), evaluates, and dispatches.
> >
> > Both sources call the same `AutoScalePolicyEvaluator`; the
> > event-driven path only needs the consumer-count rule, so it is cheap.
> > Both are cancelled on leadership loss / close.
> >
> > ---
> >
> > ## Public-Facing Changes
> >
> > ### Configuration (`broker.conf`)
> >
> > Auto-scaling is **default-on cluster-wide**; these are the defaults
> > applied to every scalable topic that does not override them.
> >
> > | Property | Default | Description |
> > |----------|---------|-------------|
> > | `scalableTopicAutoScaleEnabled` | `true` | Master switch for auto
> > split/merge. |
> > | `scalableTopicAutoScaleInterval` | `60s` | Periodic (traffic)
> > evaluation cadence. Consumer-count changes are handled event-driven,
> > independent of this. |
> > | `scalableTopicMaxSegments` | `64` | Hard ceiling on active segments. |
> > | `scalableTopicMinSegments` | `1` | Hard floor on active segments. |
> > | `scalableTopicMaxDagDepth` | `10` | Max merges in a lineage before
> > merges are disabled for it. |
> > | `scalableTopicSplitCooldown` | `1m` | Minimum time between automatic
> > splits on a topic (coalesces bursts). |
> > | `scalableTopicMergeCooldown` | `5m` | Minimum time between automatic
> > merges on a topic. |
> > | `scalableTopicMergeWindow` | `5m` | Duration a pair must stay cold
> > before merging. |
> > | `scalableTopicSplitMsgRateInThreshold` | `10000` | msg/s ingest
> > split trigger. |
> > | `scalableTopicSplitBytesRateInThreshold` | `50MB` | bytes/s ingest
> > split trigger. |
> > | `scalableTopicSplitMsgRateOutThreshold` | `50000` | msg/s dispatch
> > split trigger. |
> > | `scalableTopicSplitBytesRateOutThreshold` | `250MB` | bytes/s
> > dispatch split trigger. |
> > | `scalableTopicMergeMsgRateInThreshold` | `1000` | msg/s ingest merge
> > trigger. |
> > | `scalableTopicMergeBytesRateInThreshold` | `5MB` | bytes/s ingest
> > merge trigger. |
> > | `scalableTopicMergeMsgRateOutThreshold` | `5000` | msg/s dispatch
> > merge trigger. |
> > | `scalableTopicMergeBytesRateOutThreshold` | `25MB` | bytes/s
> > dispatch merge trigger. |
> > | `scalableTopicLoadReportInterval` | `10s` | Segment owner sampling 
> > interval. |
> > | `scalableTopicLoadReportRateChangeThreshold` | `25%` | Minimum rate
> > change since the last write that triggers a new `SegmentLoadStats`
> > write. |
> >
> > ### Policy resolution (namespace + topic overrides)
> >
> > Following the existing `autoTopicCreationOverride` pattern, an
> > `AutoScalePolicyOverride` can be set at two levels; resolution is
> > most-specific-wins, falling back to `broker.conf`:
> >
> > 1. **Per-topic** — a new `autoScalePolicy` field on
> > `ScalableTopicMetadata`, set via
> > `admin.scalableTopics().setAutoScalePolicy(topic, policy)` /
> > `getAutoScalePolicy(topic)`.
> > 2. **Per-namespace** — a new `scalableTopicAutoScalePolicy` field on
> > `Policies`, set via
> > `admin.namespaces().setScalableTopicAutoScalePolicy(ns, policy)`.
> >
> > `AutoScalePolicyOverride` carries the same knobs as the broker config
> > (all optional; unset fields fall through). Setting `enabled = false`
> > opts a topic or namespace out entirely.
> >
> > ### Admin Client API
> >
> > ```java
> > interface ScalableTopics {
> >     // ... existing ...
> >     void setAutoScalePolicy(String topic, AutoScalePolicyOverride
> > policy) throws PulsarAdminException;
> >     AutoScalePolicyOverride getAutoScalePolicy(String topic) throws
> > PulsarAdminException;
> >     void removeAutoScalePolicy(String topic) throws PulsarAdminException;
> > }
> >
> > interface Namespaces {
> >     // ... existing ...
> >     void setScalableTopicAutoScalePolicy(String namespace,
> > AutoScalePolicyOverride policy) ...;
> >     AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String
> > namespace) ...;
> >     void removeScalableTopicAutoScalePolicy(String namespace) ...;
> > }
> > ```
> >
> > ### Metadata Store Paths
> >
> > | Path | Content | Writer |
> > |------|---------|--------|
> > | `/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load` |
> > `SegmentLoadStats` JSON | segment-owning broker |
> >
> > (`autoScalePolicy` rides inside the existing `ScalableTopicMetadata`
> > blob; the namespace override rides inside `Policies`. No other new
> > paths.)
> >
> > ### Observability
> >
> > - New per-topic metrics: `pulsar_scalable_topic_active_segments`,
> > `pulsar_scalable_topic_auto_splits_total`,
> > `pulsar_scalable_topic_auto_merges_total`,
> > `pulsar_scalable_topic_split_suppressed_max_segments_total`,
> > `pulsar_scalable_topic_merge_suppressed_max_depth_total`.
> > - The existing `ScalableTopicStats` is extended with the most recent
> > `SegmentLoadStats` per segment and the resolved effective policy, so
> > operators can see *why* the controller did or did not act.
> >
> > ---
> >
> > ## Operational Safety
> >
> > The `maxSegments`, `maxDagDepth`, asymmetric cooldown, and
> > threshold-gap guards together bound both the rate and the total amount
> > of structural change a topic can undergo, so enabling auto split/merge
> > cannot cause unbounded segment growth or split/merge storms.
> >
> > Operators who want manual-only control set
> > `scalableTopicAutoScaleEnabled=false` (cluster) or an `enabled=false`
> > override (namespace/topic).
> >
> > > **Compatibility:** scalable topics are a new, as-yet-unreleased feature 
> > > ([PIP-460](pip-460.md)), so there is no backward/forward compatibility to 
> > > consider — `SegmentLoadStats`, the policy fields, and the config knobs 
> > > all ship together with the rest of the scalable-topic feature.
> >
> > ---
> >
> > ## Security Considerations
> >
> > `setAutoScalePolicy` / `getAutoScalePolicy` (topic and namespace
> > variants) require the same admin permissions as the corresponding
> > existing scalable-topic and namespace policy operations.
> > `SegmentLoadStats` is written by brokers via their authenticated
> > internal identity and is not client-writable.
> >
> > ---
> >
> > ## Links
> >
> > - Parent PIP: [PIP-468: Scalable Topic Controller](pip-468.md)
> > - Grand-parent PIP: [PIP-460: Scalable Topics](pip-460.md)
> > - V5 Client API: [PIP-466: New Java Client API (V5)](pip-466.md)
> >
> > Matteo Merli <[email protected]>
> >
> > Thu, Jun 4, 4:36 PM (3 days ago)
> > to Dev
> > https://github.com/apache/pulsar/pull/25938
> >
> >
> > ------
> >
> >
> > # PIP-483: Scalable Topic Auto Split/Merge
> >
> > *Sub-PIP of [PIP-468: Scalable Topic Controller](pip-468.md)*
> >
> > ## Motivation
> >
> > [PIP-468](pip-468.md) gives the Scalable Topic Controller the ability
> > to **split** and **merge** segments, but only on explicit operator
> > request via the admin API. An operator has to watch the topic, decide
> > it is hot, and issue a split — and later notice it has gone cold and
> > issue a merge. This is the same operational toil that partition-count
> > management imposes on classic Pulsar topics, which scalable topics
> > were meant to eliminate.
> >
> > This PIP adds an **auto-scaling policy** to the controller: the
> > controller leader observes per-segment load and per-subscription
> > consumer pressure, and autonomously splits hot segments and merges
> > cold ones, within hard caps that prevent runaway growth and
> > split/merge flip-flopping.
> >
> > The design is built around three principles that came out of the
> > design discussion:
> >
> > 1. **Splits are fast; merges are lazy.** A split protects throughput
> > and latency under load, so it fires quickly — with only a short
> > cooldown to coalesce bursts of near-simultaneous triggers (e.g. a
> > group of consumers connecting in rapid succession). A merge is purely
> > an efficiency reclaim, so it can wait, be rate-limited, and be skipped
> > when in doubt.
> > 2. **The controller reacts, it does not poll.** New stream/checkpoint
> > consumers register directly with the controller, so consumer-count
> > changes are handled event-driven within seconds. Load data is *pushed*
> > into the metadata store by each segment's owning broker (only when it
> > changes materially) and read by the controller leader. The controller
> > never fans out RPCs to segment owners.
> > 3. **The decision is a pure function.** Given a snapshot of load +
> > layout + policy, the split/merge decision is deterministic and
> > unit-testable in isolation from all I/O.
> >
> > ---
> >
> > ## Goals
> >
> > - Automatically increase segment count when a topic is under
> > ingest/dispatch load or has more stream/checkpoint consumers than
> > segments.
> > - Automatically decrease segment count when load subsides, reclaiming
> > broker resources.
> > - Bound growth (`maxSegments`) and bound split↔merge churn
> > (`maxDagDepth`, asymmetric cooldown).
> > - Default-on cluster-wide, with per-namespace and per-topic overrides,
> > following Pulsar's existing policy-resolution conventions.
> >
> > ### Non-goals
> >
> > - **Broker placement / rebalancing.** Which broker owns a segment's
> > bundle is the load balancer's job; this PIP only changes *how many*
> > segments exist.
> > - **Key-aware or non-midpoint splits.** Splits use the existing
> > midpoint-split mechanism from PIP-468.
> > - **Cross-topic global optimization.** Each topic's controller decides
> > independently.
> >
> > ---
> >
> > ## Design
> >
> > ### Overview
> >
> > ```
> > ┌────────────────────────────────────────────────────────────────┐
> > │ Segment-owning broker (per active segment)                      │
> > │   SegmentLoadReporter                                            │
> > │   - samples the segment topic's TopicStats                      │
> > │   - writes SegmentLoadStats to metadata ONLY on material change │
> > └───────────────────────────────┬────────────────────────────────┘
> >                                  │ (metadata store, push-on-change)
> >                                  ▼
> > ┌────────────────────────────────────────────────────────────────┐
> > │ Controller leader (per scalable topic)                          │
> > │                                                                  │
> > │  Event-driven — within seconds:                                 │
> > │    on STREAM/CHECKPOINT consumer register/unregister            │
> > │      (consumers already register with the controller — no poll) │
> > │      → evaluate the consumer-count split rule immediately       │
> > │                                                                  │
> > │  Periodic AutoScaleTick — traffic, default 60s:                 │
> > │    1. read SegmentLoadStats for all active segments             │
> > │    2. AutoScalePolicyEvaluator.decide(...) → Split|Merge|None   │
> > │    3. dispatch to existing splitSegment / mergeSegments         │
> > └────────────────────────────────────────────────────────────────┘
> > ```
> >
> > The two trigger sources reflect their different latency needs: a new
> > consumer should get its own segment **within seconds**, so it is
> > handled the instant the consumer registers with the controller;
> > traffic shifts up or down over **a minute or more**, so they are
> > evaluated on a slower periodic tick. The only new persistent state is
> > `SegmentLoadStats`. The split/merge *mechanics* are entirely reused
> > from PIP-468.
> >
> > ### Load reporting: push-to-metadata, not pull-per-tick
> >
> > Each segment's owning broker runs a **`SegmentLoadReporter`** for
> > every ACTIVE `segment://` topic it hosts. The broker writes
> > `SegmentLoadStats` **directly to the metadata store** — it already has
> > the rates in memory, so there is no REST round-trip and no
> > controller-initiated pull. On a fixed sampling interval it compares
> > the current rates to the last ones written and writes **only when a
> > rate changes by more than a significant threshold** (default ±25%)
> > since the last write. A steady-state segment writes once and then goes
> > silent, keeping metadata write volume bounded regardless of traffic.
> >
> > #### `SegmentLoadStats` (new metadata record)
> >
> > Stored at `/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load`:
> >
> > ```json
> > {
> >   "msgRateIn": 12000.0,
> >   "bytesRateIn": 64000000.0,
> >   "msgRateOut": 48000.0,
> >   "bytesRateOut": 256000000.0
> > }
> > ```
> >
> > | Field | Source on the owning broker | Meaning for auto split/merge |
> > |-------|------------------------------|---------------------------|
> > | `msgRateIn` / `bytesRateIn` | segment topic `TopicStats` (60s
> > rolling) | ingest load |
> > | `msgRateOut` / `bytesRateOut` | segment topic `TopicStats` |
> > dispatch/fanout load (high for topics with many subscriptions) |
> >
> > The record carries no timestamp of its own: the metadata store's
> > `Stat` for the znode already exposes creation and last-modified
> > timestamps, and the controller uses the **modified timestamp** for
> > windowing. A record that still reads "cold" with an old modified time
> > proves the segment has been cold for `now − modifiedAt` — so
> > split/merge **windows derive from the store's `Stat`** with no
> > per-tick history buffer and no extra field.
> >
> > #### Significant-change threshold
> >
> > To avoid rewriting on every minor wobble, the reporter only writes
> > when a rate has moved by more than
> > `scalableTopicLoadReportRateChangeThreshold` (default 25%) relative to
> > the last value written for that segment. Sampling cadence is
> > `scalableTopicLoadReportInterval` (default 10s). Both are configurable
> > via `broker.conf`.
> >
> > ### Subscription types and what each load type drives
> >
> > Recall from PIP-468 that scalable-topic subscriptions are `STREAM`
> > (controller-managed, 1:1 segment↔consumer assignment; covers both
> > StreamConsumer and CheckpointConsumer) or `QUEUE`
> > (controller-bypassing; every consumer attaches to every segment and
> > the broker round-robins).
> >
> > | Trigger | STREAM subscriptions | QUEUE subscriptions |
> > |---------|----------------------|----------------------|
> > | Consumer-count scale-up | **Yes** — more segments give more 1:1
> > parallelism | **No** — queue consumers share segments; more segments
> > don't add parallelism for them |
> > | Traffic (in/out, msg/bytes) | Yes | **Yes** — queue traffic still
> > loads the segment's broker and counts toward the per-segment rate |
> >
> > So a topic with only QUEUE subscriptions never splits on consumer
> > count, but still splits when any segment's in/out rate crosses
> > threshold.
> >
> > ### The decision: `AutoScalePolicyEvaluator`
> >
> > A pure function with no I/O:
> >
> > ```
> > decide(layout, loadBySegment, streamConsumerCountBySub, policy, now)
> >     → Split(segmentId) | Merge(segmentId1, segmentId2) | None
> > ```
> >
> > It runs in two passes — **split first (short cooldown), then merge
> > (long cooldown)** — and emits at most one action per invocation.
> >
> > #### Pass 1 — SPLIT (fast, lightly coalesced)
> >
> > Splits fire as soon as conditions are met, bounded by `maxSegments`,
> > an in-flight-operation guard, and a **short `splitCooldown` (default 1
> > min)**. The cooldown is deliberately short: it exists only to coalesce
> > a burst of near-simultaneous triggers — e.g. a group of consumers
> > connecting in rapid succession should cause one split, not N — while
> > still letting a genuinely growing topic split again on the next
> > minute.
> >
> > ```
> > if activeSegments >= maxSegments: skip split pass
> > if now - lastSplitAtMs < splitCooldown: skip split pass
> >
> > (a) Consumer-driven:
> >       required = max over STREAM subscriptions of consumerCount   //
> > per-subscription max
> >       if required > activeSegments:
> >           → Split(busiest active segment by msgRateIn)
> >
> > (b) Load-driven (if (a) didn't fire):
> >       candidate segments = active segments where ANY of:
> >         - msgRateIn   > splitMsgRateInThreshold
> >         - bytesRateIn > splitBytesRateInThreshold
> >         - msgRateOut  > splitMsgRateOutThreshold
> >         - bytesRateOut> splitBytesRateOutThreshold
> >       if candidates non-empty:
> >           → Split(most-overloaded candidate); set lastSplitAtMs = now
> > ```
> >
> > The consumer-driven rule (a) is what the **event-driven path**
> > evaluates the moment a consumer registers, so a new consumer gets a
> > segment within seconds (subject to `splitCooldown`). The load-driven
> > rule (b) runs on the periodic tick. Because `msgRateIn` etc. are
> > already 60-second rolling averages on the broker, a value over
> > threshold already represents *sustained* load — no extra split window
> > is needed to filter transient spikes.
> >
> > #### Pass 2 — MERGE (lazy, rate-limited)
> >
> > Merges run only if no split fired this tick, the topic is not within
> > `mergeCooldown` of its last merge, and the result respects
> > `maxDagDepth`.
> >
> > ```
> > if a split fired this tick: skip merge pass
> > if now - lastMergeAtMs < mergeCooldown: skip merge pass
> > if activeSegments <= minSegments: skip merge pass
> >
> > candidate pairs = adjacent ACTIVE segment pairs where BOTH segments satisfy,
> >                   for at least mergeWindow (checked via the store
> > Stat's modified time):
> >     - msgRateIn   < mergeMsgRateInThreshold
> >     - bytesRateIn < mergeBytesRateInThreshold
> >     - msgRateOut  < mergeMsgRateOutThreshold
> >     - bytesRateOut< mergeBytesRateOutThreshold
> >   AND neither segment's lineage is already at maxDagDepth merges
> >
> > if candidate pairs non-empty:
> >     → Merge(coldest pair by combined rate); set lastMergeAtMs = now
> > ```
> >
> > Adjacency is required because the existing `mergeSegments` API only
> > merges hash-range-adjacent active segments.
> >
> > ### Anti-flip-flop: three independent guards
> >
> > 1. **Threshold gap (hysteresis).** Split thresholds are well above
> > merge thresholds for every metric. The dead-band between them is what
> > prevents a just-merged segment from immediately re-qualifying for a
> > split.
> > 2. **Asymmetric cooldown.** Splits: a short `splitCooldown` (default 1
> > min) that only coalesces bursts. Merges: a longer `mergeCooldown`
> > (default 5 min) plus a `mergeWindow` (default 5 min) during which the
> > segment must have stayed cold. A pair must be *durably* cold to merge,
> > but a segment can split again within a minute of getting hot.
> > 3. **Max DAG depth on merges.** `maxDagDepth` (default 10) caps how
> > many merges a given lineage can accumulate. Once reached, that lineage
> > stops being a merge candidate — **but load-driven splits still fire.**
> > This bounds the number of split↔merge cycles a hash range can churn
> > through while never blocking a split that throughput requires.
> >
> > > **Design note — direction of the depth cap.** The cap restricts *merges*, 
> > > not splits. Splits are needed for correctness/performance and must always 
> > > be available; merges are the optional efficiency step and are the ones 
> > > that, combined with splits, could oscillate. `dagDepth` therefore counts 
> > > **merges in a segment's lineage**, derived from the existing 
> > > `parentIds`/`childIds` DAG in `ScalableTopicMetadata` — splits do not 
> > > consume depth budget.
> >
> > ### Caps
> >
> > | Cap | Default | Effect |
> > |-----|---------|--------|
> > | `maxSegments` | 64 | Splits stop once `activeSegments == maxSegments`. |
> > | `minSegments` | 1 | Merges stop once `activeSegments == minSegments`. |
> > | `maxDagDepth` | 10 | Merges stop for a lineage at the cap; splits
> > unaffected. |
> >
> > ### Manual operations and cooldown
> >
> > - Manual `admin.scalableTopics().splitSegment(...)` **sets
> > `lastSplitAtMs`**, so a manual split also starts the short auto-split
> > cooldown.
> > - Manual `admin.scalableTopics().mergeSegments(...)` **sets
> > `lastMergeAtMs`**, so the operator's manual efficiency action also
> > rate-limits the controller's automatic merges.
> >
> > ### Evaluation triggers
> >
> > The controller leader evaluates auto split/merge from two sources:
> >
> > - **Event-driven (within seconds)** — when a STREAM/CHECKPOINT
> > consumer registers with or unregisters from the controller, it
> > immediately evaluates the consumer-count split rule. No polling:
> > consumer registration already flows through the controller (PIP-468).
> > - **Periodic tick** — a `scheduleAutoScaleTick` (separate from the GC
> > tick from PIP-468), default cadence `scalableTopicAutoScaleInterval =
> > 60s`, evaluates the traffic-driven split rules and the merge pass. Per
> > tick it does one metadata batch-read of the topic's `segments/*/load`
> > records (or maintains a watch cache), evaluates, and dispatches.
> >
> > Both sources call the same `AutoScalePolicyEvaluator`; the
> > event-driven path only needs the consumer-count rule, so it is cheap.
> > Both are cancelled on leadership loss / close.
> >
> > ---
> >
> > ## Public-Facing Changes
> >
> > ### Configuration (`broker.conf`)
> >
> > Auto-scaling is **default-on cluster-wide**; these are the defaults
> > applied to every scalable topic that does not override them.
> >
> > | Property | Default | Description |
> > |----------|---------|-------------|
> > | `scalableTopicAutoScaleEnabled` | `true` | Master switch for auto
> > split/merge. |
> > | `scalableTopicAutoScaleInterval` | `60s` | Periodic (traffic)
> > evaluation cadence. Consumer-count changes are handled event-driven,
> > independent of this. |
> > | `scalableTopicMaxSegments` | `64` | Hard ceiling on active segments. |
> > | `scalableTopicMinSegments` | `1` | Hard floor on active segments. |
> > | `scalableTopicMaxDagDepth` | `10` | Max merges in a lineage before
> > merges are disabled for it. |
> > | `scalableTopicSplitCooldown` | `1m` | Minimum time between automatic
> > splits on a topic (coalesces bursts). |
> > | `scalableTopicMergeCooldown` | `5m` | Minimum time between automatic
> > merges on a topic. |
> > | `scalableTopicMergeWindow` | `5m` | Duration a pair must stay cold
> > before merging. |
> > | `scalableTopicSplitMsgRateInThreshold` | `10000` | msg/s ingest
> > split trigger. |
> > | `scalableTopicSplitBytesRateInThreshold` | `50MB` | bytes/s ingest
> > split trigger. |
> > | `scalableTopicSplitMsgRateOutThreshold` | `50000` | msg/s dispatch
> > split trigger. |
> > | `scalableTopicSplitBytesRateOutThreshold` | `250MB` | bytes/s
> > dispatch split trigger. |
> > | `scalableTopicMergeMsgRateInThreshold` | `1000` | msg/s ingest merge
> > trigger. |
> > | `scalableTopicMergeBytesRateInThreshold` | `5MB` | bytes/s ingest
> > merge trigger. |
> > | `scalableTopicMergeMsgRateOutThreshold` | `5000` | msg/s dispatch
> > merge trigger. |
> > | `scalableTopicMergeBytesRateOutThreshold` | `25MB` | bytes/s
> > dispatch merge trigger. |
> > | `scalableTopicLoadReportInterval` | `10s` | Segment owner sampling 
> > interval. |
> > | `scalableTopicLoadReportRateChangeThreshold` | `25%` | Minimum rate
> > change since the last write that triggers a new `SegmentLoadStats`
> > write. |
> >
> > ### Policy resolution (namespace + topic overrides)
> >
> > Following the existing `autoTopicCreationOverride` pattern, an
> > `AutoScalePolicyOverride` can be set at two levels; resolution is
> > most-specific-wins, falling back to `broker.conf`:
> >
> > 1. **Per-topic** — a new `autoScalePolicy` field on
> > `ScalableTopicMetadata`, set via
> > `admin.scalableTopics().setAutoScalePolicy(topic, policy)` /
> > `getAutoScalePolicy(topic)`.
> > 2. **Per-namespace** — a new `scalableTopicAutoScalePolicy` field on
> > `Policies`, set via
> > `admin.namespaces().setScalableTopicAutoScalePolicy(ns, policy)`.
> >
> > `AutoScalePolicyOverride` carries the same knobs as the broker config
> > (all optional; unset fields fall through). Setting `enabled = false`
> > opts a topic or namespace out entirely.
> >
> > ### Admin Client API
> >
> > ```java
> > interface ScalableTopics {
> >     // ... existing ...
> >     void setAutoScalePolicy(String topic, AutoScalePolicyOverride
> > policy) throws PulsarAdminException;
> >     AutoScalePolicyOverride getAutoScalePolicy(String topic) throws
> > PulsarAdminException;
> >     void removeAutoScalePolicy(String topic) throws PulsarAdminException;
> > }
> >
> > interface Namespaces {
> >     // ... existing ...
> >     void setScalableTopicAutoScalePolicy(String namespace,
> > AutoScalePolicyOverride policy) ...;
> >     AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String
> > namespace) ...;
> >     void removeScalableTopicAutoScalePolicy(String namespace) ...;
> > }
> > ```
> >
> > ### Metadata Store Paths
> >
> > | Path | Content | Writer |
> > |------|---------|--------|
> > | `/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load` |
> > `SegmentLoadStats` JSON | segment-owning broker |
> >
> > (`autoScalePolicy` rides inside the existing `ScalableTopicMetadata`
> > blob; the namespace override rides inside `Policies`. No other new
> > paths.)
> >
> > ### Observability
> >
> > - New per-topic metrics: `pulsar_scalable_topic_active_segments`,
> > `pulsar_scalable_topic_auto_splits_total`,
> > `pulsar_scalable_topic_auto_merges_total`,
> > `pulsar_scalable_topic_split_suppressed_max_segments_total`,
> > `pulsar_scalable_topic_merge_suppressed_max_depth_total`.
> > - The existing `ScalableTopicStats` is extended with the most recent
> > `SegmentLoadStats` per segment and the resolved effective policy, so
> > operators can see *why* the controller did or did not act.
> >
> > ---
> >
> > ## Operational Safety
> >
> > The `maxSegments`, `maxDagDepth`, asymmetric cooldown, and
> > threshold-gap guards together bound both the rate and the total amount
> > of structural change a topic can undergo, so enabling auto split/merge
> > cannot cause unbounded segment growth or split/merge storms.
> >
> > Operators who want manual-only control set
> > `scalableTopicAutoScaleEnabled=false` (cluster) or an `enabled=false`
> > override (namespace/topic).
> >
> > > **Compatibility:** scalable topics are a new, as-yet-unreleased feature 
> > > ([PIP-460](pip-460.md)), so there is no backward/forward compatibility to 
> > > consider — `SegmentLoadStats`, the policy fields, and the config knobs 
> > > all ship together with the rest of the scalable-topic feature.
> >
> > ---
> >
> > ## Security Considerations
> >
> > `setAutoScalePolicy` / `getAutoScalePolicy` (topic and namespace
> > variants) require the same admin permissions as the corresponding
> > existing scalable-topic and namespace policy operations.
> > `SegmentLoadStats` is written by brokers via their authenticated
> > internal identity and is not client-writable.
> >
> > ---
> >
> > ## Links
> >
> > - Parent PIP: [PIP-468: Scalable Topic Controller](pip-468.md)
> > - Grand-parent PIP: [PIP-460: Scalable Topics](pip-460.md)
> > - V5 Client API: [PIP-466: New Java Client API (V5)](pip-466.md)
> >
> >

Reply via email to