https://github.com/apache/pulsar/pull/25938


------


# PIP-483: Scalable Topic Auto Split/Merge

*Sub-PIP of [PIP-468: Scalable Topic Controller](pip-468.md)*

## Motivation

[PIP-468](pip-468.md) gives the Scalable Topic Controller the ability
to **split** and **merge** segments, but only on explicit operator
request via the admin API. An operator has to watch the topic, decide
it is hot, and issue a split — and later notice it has gone cold and
issue a merge. This is the same operational toil that partition-count
management imposes on classic Pulsar topics, which scalable topics
were meant to eliminate.

This PIP adds an **auto-scaling policy** to the controller: the
controller leader observes per-segment load and per-subscription
consumer pressure, and autonomously splits hot segments and merges
cold ones, within hard caps that prevent runaway growth and
split/merge flip-flopping.

The design is built around three principles that came out of the
design discussion:

1. **Splits are fast; merges are lazy.** A split protects throughput
and latency under load, so it fires quickly — with only a short
cooldown to coalesce bursts of near-simultaneous triggers (e.g. a
group of consumers connecting in rapid succession). A merge is purely
an efficiency reclaim, so it can wait, be rate-limited, and be skipped
when in doubt.
2. **The controller reacts, it does not poll.** New stream/checkpoint
consumers register directly with the controller, so consumer-count
changes are handled event-driven within seconds. Load data is *pushed*
into the metadata store by each segment's owning broker (only when it
changes materially) and read by the controller leader. The controller
never fans out RPCs to segment owners.
3. **The decision is a pure function.** Given a snapshot of load +
layout + policy, the split/merge decision is deterministic and
unit-testable in isolation from all I/O.

---

## Goals

- Automatically increase segment count when a topic is under
ingest/dispatch load or has more stream/checkpoint consumers than
segments.
- Automatically decrease segment count when load subsides, reclaiming
broker resources.
- Bound growth (`maxSegments`) and bound split↔merge churn
(`maxDagDepth`, asymmetric cooldown).
- Default-on cluster-wide, with per-namespace and per-topic overrides,
following Pulsar's existing policy-resolution conventions.

### Non-goals

- **Broker placement / rebalancing.** Which broker owns a segment's
bundle is the load balancer's job; this PIP only changes *how many*
segments exist.
- **Key-aware or non-midpoint splits.** Splits use the existing
midpoint-split mechanism from PIP-468.
- **Cross-topic global optimization.** Each topic's controller decides
independently.

---

## Design

### Overview

```
┌────────────────────────────────────────────────────────────────┐
│ Segment-owning broker (per active segment)                      │
│   SegmentLoadReporter                                            │
│   - samples the segment topic's TopicStats                      │
│   - writes SegmentLoadStats to metadata ONLY on material change │
└───────────────────────────────┬────────────────────────────────┘
                                 │ (metadata store, push-on-change)
                                 ▼
┌────────────────────────────────────────────────────────────────┐
│ Controller leader (per scalable topic)                          │
│                                                                  │
│  Event-driven — within seconds:                                 │
│    on STREAM/CHECKPOINT consumer register/unregister            │
│      (consumers already register with the controller — no poll) │
│      → evaluate the consumer-count split rule immediately       │
│                                                                  │
│  Periodic AutoScaleTick — traffic, default 60s:                 │
│    1. read SegmentLoadStats for all active segments             │
│    2. AutoScalePolicyEvaluator.decide(...) → Split|Merge|None   │
│    3. dispatch to existing splitSegment / mergeSegments         │
└────────────────────────────────────────────────────────────────┘
```

The two trigger sources reflect their different latency needs: a new
consumer should get its own segment **within seconds**, so it is
handled the instant the consumer registers with the controller;
traffic shifts up or down over **a minute or more**, so they are
evaluated on a slower periodic tick. The only new persistent state is
`SegmentLoadStats`. The split/merge *mechanics* are entirely reused
from PIP-468.

### Load reporting: push-to-metadata, not pull-per-tick

Each segment's owning broker runs a **`SegmentLoadReporter`** for
every ACTIVE `segment://` topic it hosts. The broker writes
`SegmentLoadStats` **directly to the metadata store** — it already has
the rates in memory, so there is no REST round-trip and no
controller-initiated pull. On a fixed sampling interval it compares
the current rates to the last ones written and writes **only when a
rate changes by more than a significant threshold** (default ±25%)
since the last write. A steady-state segment writes once and then goes
silent, keeping metadata write volume bounded regardless of traffic.

#### `SegmentLoadStats` (new metadata record)

Stored at `/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load`:

```json
{
  "msgRateIn": 12000.0,
  "bytesRateIn": 64000000.0,
  "msgRateOut": 48000.0,
  "bytesRateOut": 256000000.0
}
```

| Field | Source on the owning broker | Meaning for auto split/merge |
|-------|------------------------------|---------------------------|
| `msgRateIn` / `bytesRateIn` | segment topic `TopicStats` (60s
rolling) | ingest load |
| `msgRateOut` / `bytesRateOut` | segment topic `TopicStats` |
dispatch/fanout load (high for topics with many subscriptions) |

The record carries no timestamp of its own: the metadata store's
`Stat` for the znode already exposes creation and last-modified
timestamps, and the controller uses the **modified timestamp** for
windowing. A record that still reads "cold" with an old modified time
proves the segment has been cold for `now − modifiedAt` — so
split/merge **windows derive from the store's `Stat`** with no
per-tick history buffer and no extra field.

#### Significant-change threshold

To avoid rewriting on every minor wobble, the reporter only writes
when a rate has moved by more than
`scalableTopicLoadReportRateChangeThreshold` (default 25%) relative to
the last value written for that segment. Sampling cadence is
`scalableTopicLoadReportInterval` (default 10s). Both are configurable
via `broker.conf`.

### Subscription types and what each load type drives

Recall from PIP-468 that scalable-topic subscriptions are `STREAM`
(controller-managed, 1:1 segment↔consumer assignment; covers both
StreamConsumer and CheckpointConsumer) or `QUEUE`
(controller-bypassing; every consumer attaches to every segment and
the broker round-robins).

| Trigger | STREAM subscriptions | QUEUE subscriptions |
|---------|----------------------|----------------------|
| Consumer-count scale-up | **Yes** — more segments give more 1:1
parallelism | **No** — queue consumers share segments; more segments
don't add parallelism for them |
| Traffic (in/out, msg/bytes) | Yes | **Yes** — queue traffic still
loads the segment's broker and counts toward the per-segment rate |

So a topic with only QUEUE subscriptions never splits on consumer
count, but still splits when any segment's in/out rate crosses
threshold.

### The decision: `AutoScalePolicyEvaluator`

A pure function with no I/O:

```
decide(layout, loadBySegment, streamConsumerCountBySub, policy, now)
    → Split(segmentId) | Merge(segmentId1, segmentId2) | None
```

It runs in two passes — **split first (short cooldown), then merge
(long cooldown)** — and emits at most one action per invocation.

#### Pass 1 — SPLIT (fast, lightly coalesced)

Splits fire as soon as conditions are met, bounded by `maxSegments`,
an in-flight-operation guard, and a **short `splitCooldown` (default 1
min)**. The cooldown is deliberately short: it exists only to coalesce
a burst of near-simultaneous triggers — e.g. a group of consumers
connecting in rapid succession should cause one split, not N — while
still letting a genuinely growing topic split again on the next
minute.

```
if activeSegments >= maxSegments: skip split pass
if now - lastSplitAtMs < splitCooldown: skip split pass

(a) Consumer-driven:
      required = max over STREAM subscriptions of consumerCount   //
per-subscription max
      if required > activeSegments:
          → Split(busiest active segment by msgRateIn)

(b) Load-driven (if (a) didn't fire):
      candidate segments = active segments where ANY of:
        - msgRateIn   > splitMsgRateInThreshold
        - bytesRateIn > splitBytesRateInThreshold
        - msgRateOut  > splitMsgRateOutThreshold
        - bytesRateOut> splitBytesRateOutThreshold
      if candidates non-empty:
          → Split(most-overloaded candidate); set lastSplitAtMs = now
```

The consumer-driven rule (a) is what the **event-driven path**
evaluates the moment a consumer registers, so a new consumer gets a
segment within seconds (subject to `splitCooldown`). The load-driven
rule (b) runs on the periodic tick. Because `msgRateIn` etc. are
already 60-second rolling averages on the broker, a value over
threshold already represents *sustained* load — no extra split window
is needed to filter transient spikes.

#### Pass 2 — MERGE (lazy, rate-limited)

Merges run only if no split fired this tick, the topic is not within
`mergeCooldown` of its last merge, and the result respects
`maxDagDepth`.

```
if a split fired this tick: skip merge pass
if now - lastMergeAtMs < mergeCooldown: skip merge pass
if activeSegments <= minSegments: skip merge pass

candidate pairs = adjacent ACTIVE segment pairs where BOTH segments satisfy,
                  for at least mergeWindow (checked via the store
Stat's modified time):
    - msgRateIn   < mergeMsgRateInThreshold
    - bytesRateIn < mergeBytesRateInThreshold
    - msgRateOut  < mergeMsgRateOutThreshold
    - bytesRateOut< mergeBytesRateOutThreshold
  AND neither segment's lineage is already at maxDagDepth merges

if candidate pairs non-empty:
    → Merge(coldest pair by combined rate); set lastMergeAtMs = now
```

Adjacency is required because the existing `mergeSegments` API only
merges hash-range-adjacent active segments.

### Anti-flip-flop: three independent guards

1. **Threshold gap (hysteresis).** Split thresholds are well above
merge thresholds for every metric. The dead-band between them is what
prevents a just-merged segment from immediately re-qualifying for a
split.
2. **Asymmetric cooldown.** Splits: a short `splitCooldown` (default 1
min) that only coalesces bursts. Merges: a longer `mergeCooldown`
(default 5 min) plus a `mergeWindow` (default 5 min) during which the
segment must have stayed cold. A pair must be *durably* cold to merge,
but a segment can split again within a minute of getting hot.
3. **Max DAG depth on merges.** `maxDagDepth` (default 10) caps how
many merges a given lineage can accumulate. Once reached, that lineage
stops being a merge candidate — **but load-driven splits still fire.**
This bounds the number of split↔merge cycles a hash range can churn
through while never blocking a split that throughput requires.

> **Design note — direction of the depth cap.** The cap restricts *merges*, not 
> splits. Splits are needed for correctness/performance and must always be 
> available; merges are the optional efficiency step and are the ones that, 
> combined with splits, could oscillate. `dagDepth` therefore counts **merges 
> in a segment's lineage**, derived from the existing `parentIds`/`childIds` 
> DAG in `ScalableTopicMetadata` — splits do not consume depth budget.

### Caps

| Cap | Default | Effect |
|-----|---------|--------|
| `maxSegments` | 64 | Splits stop once `activeSegments == maxSegments`. |
| `minSegments` | 1 | Merges stop once `activeSegments == minSegments`. |
| `maxDagDepth` | 10 | Merges stop for a lineage at the cap; splits
unaffected. |

### Manual operations and cooldown

- Manual `admin.scalableTopics().splitSegment(...)` **sets
`lastSplitAtMs`**, so a manual split also starts the short auto-split
cooldown.
- Manual `admin.scalableTopics().mergeSegments(...)` **sets
`lastMergeAtMs`**, so the operator's manual efficiency action also
rate-limits the controller's automatic merges.

### Evaluation triggers

The controller leader evaluates auto split/merge from two sources:

- **Event-driven (within seconds)** — when a STREAM/CHECKPOINT
consumer registers with or unregisters from the controller, it
immediately evaluates the consumer-count split rule. No polling:
consumer registration already flows through the controller (PIP-468).
- **Periodic tick** — a `scheduleAutoScaleTick` (separate from the GC
tick from PIP-468), default cadence `scalableTopicAutoScaleInterval =
60s`, evaluates the traffic-driven split rules and the merge pass. Per
tick it does one metadata batch-read of the topic's `segments/*/load`
records (or maintains a watch cache), evaluates, and dispatches.

Both sources call the same `AutoScalePolicyEvaluator`; the
event-driven path only needs the consumer-count rule, so it is cheap.
Both are cancelled on leadership loss / close.

---

## Public-Facing Changes

### Configuration (`broker.conf`)

Auto-scaling is **default-on cluster-wide**; these are the defaults
applied to every scalable topic that does not override them.

| Property | Default | Description |
|----------|---------|-------------|
| `scalableTopicAutoScaleEnabled` | `true` | Master switch for auto
split/merge. |
| `scalableTopicAutoScaleInterval` | `60s` | Periodic (traffic)
evaluation cadence. Consumer-count changes are handled event-driven,
independent of this. |
| `scalableTopicMaxSegments` | `64` | Hard ceiling on active segments. |
| `scalableTopicMinSegments` | `1` | Hard floor on active segments. |
| `scalableTopicMaxDagDepth` | `10` | Max merges in a lineage before
merges are disabled for it. |
| `scalableTopicSplitCooldown` | `1m` | Minimum time between automatic
splits on a topic (coalesces bursts). |
| `scalableTopicMergeCooldown` | `5m` | Minimum time between automatic
merges on a topic. |
| `scalableTopicMergeWindow` | `5m` | Duration a pair must stay cold
before merging. |
| `scalableTopicSplitMsgRateInThreshold` | `10000` | msg/s ingest
split trigger. |
| `scalableTopicSplitBytesRateInThreshold` | `50MB` | bytes/s ingest
split trigger. |
| `scalableTopicSplitMsgRateOutThreshold` | `50000` | msg/s dispatch
split trigger. |
| `scalableTopicSplitBytesRateOutThreshold` | `250MB` | bytes/s
dispatch split trigger. |
| `scalableTopicMergeMsgRateInThreshold` | `1000` | msg/s ingest merge
trigger. |
| `scalableTopicMergeBytesRateInThreshold` | `5MB` | bytes/s ingest
merge trigger. |
| `scalableTopicMergeMsgRateOutThreshold` | `5000` | msg/s dispatch
merge trigger. |
| `scalableTopicMergeBytesRateOutThreshold` | `25MB` | bytes/s
dispatch merge trigger. |
| `scalableTopicLoadReportInterval` | `10s` | Segment owner sampling interval. |
| `scalableTopicLoadReportRateChangeThreshold` | `25%` | Minimum rate
change since the last write that triggers a new `SegmentLoadStats`
write. |

### Policy resolution (namespace + topic overrides)

Following the existing `autoTopicCreationOverride` pattern, an
`AutoScalePolicyOverride` can be set at two levels; resolution is
most-specific-wins, falling back to `broker.conf`:

1. **Per-topic** — a new `autoScalePolicy` field on
`ScalableTopicMetadata`, set via
`admin.scalableTopics().setAutoScalePolicy(topic, policy)` /
`getAutoScalePolicy(topic)`.
2. **Per-namespace** — a new `scalableTopicAutoScalePolicy` field on
`Policies`, set via
`admin.namespaces().setScalableTopicAutoScalePolicy(ns, policy)`.

`AutoScalePolicyOverride` carries the same knobs as the broker config
(all optional; unset fields fall through). Setting `enabled = false`
opts a topic or namespace out entirely.

### Admin Client API

```java
interface ScalableTopics {
    // ... existing ...
    void setAutoScalePolicy(String topic, AutoScalePolicyOverride
policy) throws PulsarAdminException;
    AutoScalePolicyOverride getAutoScalePolicy(String topic) throws
PulsarAdminException;
    void removeAutoScalePolicy(String topic) throws PulsarAdminException;
}

interface Namespaces {
    // ... existing ...
    void setScalableTopicAutoScalePolicy(String namespace,
AutoScalePolicyOverride policy) ...;
    AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String
namespace) ...;
    void removeScalableTopicAutoScalePolicy(String namespace) ...;
}
```

### Metadata Store Paths

| Path | Content | Writer |
|------|---------|--------|
| `/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load` |
`SegmentLoadStats` JSON | segment-owning broker |

(`autoScalePolicy` rides inside the existing `ScalableTopicMetadata`
blob; the namespace override rides inside `Policies`. No other new
paths.)

### Observability

- New per-topic metrics: `pulsar_scalable_topic_active_segments`,
`pulsar_scalable_topic_auto_splits_total`,
`pulsar_scalable_topic_auto_merges_total`,
`pulsar_scalable_topic_split_suppressed_max_segments_total`,
`pulsar_scalable_topic_merge_suppressed_max_depth_total`.
- The existing `ScalableTopicStats` is extended with the most recent
`SegmentLoadStats` per segment and the resolved effective policy, so
operators can see *why* the controller did or did not act.

---

## Operational Safety

The `maxSegments`, `maxDagDepth`, asymmetric cooldown, and
threshold-gap guards together bound both the rate and the total amount
of structural change a topic can undergo, so enabling auto split/merge
cannot cause unbounded segment growth or split/merge storms.

Operators who want manual-only control set
`scalableTopicAutoScaleEnabled=false` (cluster) or an `enabled=false`
override (namespace/topic).

> **Compatibility:** scalable topics are a new, as-yet-unreleased feature 
> ([PIP-460](pip-460.md)), so there is no backward/forward compatibility to 
> consider — `SegmentLoadStats`, the policy fields, and the config knobs all 
> ship together with the rest of the scalable-topic feature.

---

## Security Considerations

`setAutoScalePolicy` / `getAutoScalePolicy` (topic and namespace
variants) require the same admin permissions as the corresponding
existing scalable-topic and namespace policy operations.
`SegmentLoadStats` is written by brokers via their authenticated
internal identity and is not client-writable.

---

## Links

- Parent PIP: [PIP-468: Scalable Topic Controller](pip-468.md)
- Grand-parent PIP: [PIP-460: Scalable Topics](pip-460.md)
- V5 Client API: [PIP-466: New Java Client API (V5)](pip-466.md)

--
Matteo Merli
<[email protected]>

Reply via email to