https://github.com/apache/pulsar/pull/25516
-----
# PIP-468: Scalable Topic Controller
*Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*
## Motivation
[PIP-460](pip-460.md) introduces scalable topics as a new topic type
in Pulsar, built on a DAG of range segments that can be split and
merged to scale independently of the initial configuration. PIP-460
defines the overall vision but defers the detailed design of each
component to dedicated sub-PIPs.
This PIP specifies the **Scalable Topic Controller** — the broker-side
component responsible for:
1. **Segment lifecycle management** — creating, terminating, and
deleting the underlying segment topics that back a scalable topic.
2. **Segment layout coordination** — executing split and merge
operations with correct ordering guarantees so that no messages are
lost and all subscription cursors exist before producers are
redirected.
3. **Consumer assignment** — coordinating which consumers receive
messages from which segments for a given subscription.
4. **Leader election** — ensuring exactly one broker acts as the
controller for each scalable topic, with automatic failover.
5. **DAG watch sessions** — pushing topology updates to connected
clients (producers and consumers) when the segment layout changes.
Without a well-defined controller, split and merge operations would be
unsafe: producers could write to segments that have no subscription
cursors, consumers could miss messages during topology changes, and
concurrent layout modifications could corrupt the segment DAG.
---
## Design
### Architecture Overview
The controller subsystem consists of four layers:
```
┌──────────────────────────────────────────────────────────┐
│ Admin API Layer │
│ ScalableTopics REST + Segments REST │
└──────────────┬───────────────────────┬───────────────────┘
│ │
┌──────────────▼───────────────────────▼───────────────────┐
│ ScalableTopicService │
│ Per-broker singleton; manages controllers │
└──────────────────────────┬───────────────────────────────┘
│
┌──────────────────────────▼───────────────────────────────┐
│ ScalableTopicController (per topic) │
│ Leader-elected; owns layout, split/merge, consumers │
│ │
│ ┌─────────────────┐ ┌───────────────────────────────┐ │
│ │ SegmentLayout │ │ SubscriptionCoordinator (×N) │ │
│ │ (immutable DAG) │ │ per-subscription assignments │ │
│ └─────────────────┘ └───────────────────────────────┘ │
└──────────────────────────┬───────────────────────────────┘
│
┌──────────────────────────▼───────────────────────────────┐
│ ScalableTopicResources │
│ Metadata store access (read/write/watch) │
└──────────────────────────────────────────────────────────┘
```
### Broker Roles and Request Routing
A scalable topic involves three distinct broker roles. Understanding
which broker handles which operation is critical to the design.
#### Any broker — DAG watch sessions
A **DAG watch session** (scalable topic lookup) can be served by **any
broker** in the cluster. The session reads segment metadata from the
shared metadata store and registers a watch for changes. No state is
held on the broker beyond the watch registration. This means producers
and consumers can connect to any broker for topic discovery — the same
way regular topic lookups work today.
When the metadata changes (due to a split or merge performed by the
controller leader), the metadata store notification fires on whichever
broker is serving the watch session, and that broker pushes the
updated DAG to the client.
#### Controller leader — layout mutations and consumer assignment
Each scalable topic has exactly one **controller leader** across the
cluster, elected via the metadata store. The controller leader is the
only broker that can:
- Execute **split** and **merge** operations (the multi-step protocols
described below).
- Accept **consumer registrations** and compute segment-to-consumer assignments.
- **Notify consumers** of assignment changes after topology updates.
Clients discover the controller leader's broker URL through the DAG
watch session response (`controller_broker_url` field). StreamConsumer
and CheckpointConsumer clients then connect directly to the controller
leader to register and receive assignments.
Admin API requests for split and merge are routed to the
`ScalableTopicService` on any broker, which obtains (or creates) a
local `ScalableTopicController` that participates in leader election.
If the local controller is the leader, it executes the operation
directly. If not, the request must be redirected to the leader (or the
controller must first win the election). The split and merge admin
endpoints do not require the request to land on the leader broker —
the `ScalableTopicService` handles this transparently.
#### Segment-owning brokers — produce and consume
Individual **segment topics** (`segment://`) are regular persistent
topics from the broker's perspective. They are distributed across the
cluster by the standard Pulsar **load manager** and namespace bundle
assignment — the same mechanism used for `persistent://` topics. Each
segment topic is owned by exactly one broker at any time, determined
by its namespace bundle hash.
The controller leader does not need to own any of the segment topics.
When the controller needs to operate on a segment (create, terminate,
delete, discover subscriptions), it uses the **Segments admin API**,
which routes the request to whichever broker currently owns that
segment's namespace bundle. This decoupling means the controller can
coordinate segments spread across many brokers without requiring
co-location.
```
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Broker A │ │ Broker B │ │ Broker C │
│ │ │ (controller │ │ │
│ segment-0 │ │ leader) │ │ segment-2 │
│ segment-1 │ │ │ │ │
│ │ │ DAG watch │ │ DAG watch │
│ DAG watch │ │ sessions │ │ sessions │
│ sessions │ │ │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
│
Split/merge ops
use admin API to
reach segments on
Broker A and C
```
In this example, Broker B is the controller leader but owns no
segments. When it executes a split of segment-0, it calls the Segments
admin API which routes to Broker A (the owner of segment-0).
### Components
#### ScalableTopicService
A per-broker singleton, created and owned by `BrokerService`. It
manages the lifecycle of all `ScalableTopicController` instances on
the broker.
**Responsibilities:**
- **Controller management**: maintains a map of active controllers
keyed by topic name. Creates controllers on demand via
`getOrCreateController()` and releases them on topic unload or broker
shutdown.
- **Topic lifecycle**: handles `createScalableTopic()` and
`deleteScalableTopic()` admin operations, including creating/deleting
the underlying segment topics via the Segments admin API.
- **Delegation**: routes `splitSegment()` and `mergeSegments()`
requests to the appropriate controller.
- **Leader state monitoring**: listens for leader election state
changes and re-attempts election when the current leader is lost.
#### ScalableTopicController
A per-topic coordinator. Only one instance across the cluster is the
**leader** for a given scalable topic, ensured by leader election
through the metadata store. The leader is the only instance that
modifies the segment layout or assigns consumers.
**State:**
| Field | Type | Description |
|-------|------|-------------|
| `topicName` | `TopicName` | The `topic://` name of the scalable topic |
| `currentLayout` | `SegmentLayout` | The current immutable snapshot
of the segment DAG |
| `subscriptions` | `Map<String, SubscriptionCoordinator>` |
Per-subscription consumer coordinators |
| `leaderState` | `LeaderElectionState` | Current leader election state |
| `leaderElection` | `LeaderElection<String>` | Metadata store leader
election handle; value is the broker URL |
**Key operations:**
- `initialize()` — loads the current metadata from the store, then
attempts leader election. The elected leader stores its broker service
URL so that clients can discover and connect to it.
- `splitSegment(segmentId)` — splits an active segment at its midpoint
(see [Split Protocol](#split-protocol)).
- `mergeSegments(segmentId1, segmentId2)` — merges two adjacent active
segments (see [Merge Protocol](#merge-protocol)).
- `registerConsumer(subscription, consumer)` — registers a consumer
and returns its initial segment assignment.
- `unregisterConsumer(subscription, consumer)` — removes a consumer
and triggers rebalancing.
- `getLeaderBrokerUrl()` — returns the current leader's broker URL,
used by clients to connect to the controller.
#### SegmentLayout
An immutable, versioned snapshot of the segment DAG. All mutation
methods (`splitSegment`, `mergeSegments`, `pruneSegment`) return a
**new** `SegmentLayout` instance — the original is never modified.
This ensures safe concurrent reads without locking.
**Fields:**
| Field | Type | Description |
|-------|------|-------------|
| `epoch` | `long` | Monotonically increasing version; incremented on
every layout change |
| `nextSegmentId` | `long` | Counter for assigning IDs to new segments |
| `allSegments` | `Map<Long, SegmentInfo>` | Complete DAG: all
segments (active + sealed) |
| `activeSegments` | `Map<Long, SegmentInfo>` | Filtered view: only
`ACTIVE` segments |
**Key operations:**
- `findActiveSegment(hash)` — returns the active segment whose hash
range contains the given hash value. Used by producers for message
routing.
- `splitSegment(segmentId)` — validates the segment is active,
computes the midpoint of its hash range, creates two child
`SegmentInfo` records covering `[start, mid]` and `[mid+1, end]`,
seals the parent, and returns a new layout with incremented epoch.
- `mergeSegments(id1, id2)` — validates both segments are active and
adjacent (the end of one equals `start - 1` of the other), creates a
merged `SegmentInfo` covering the combined range, seals both parents,
and returns a new layout.
- `getLineage(segmentId)` — returns the full ancestor + descendant
chain for a segment, used for DAG traversal during catch-up reads.
- `toMetadata()` / `fromMetadata()` — converts to/from the persisted
`ScalableTopicMetadata` format.
#### SegmentInfo
A record representing a single segment in the DAG:
| Field | Type | Description |
|-------|------|-------------|
| `segmentId` | `long` | Unique, monotonically increasing identifier |
| `hashRange` | `HashRange` | The `[start, end]` inclusive hash range
this segment covers |
| `state` | `SegmentState` | `ACTIVE` or `SEALED` |
| `parentIds` | `List<Long>` | Parent segments (empty for root segments) |
| `childIds` | `List<Long>` | Child segments (empty for leaf/active segments) |
| `createdAtEpoch` | `long` | Layout epoch in which this segment was
created (`0` for the initial segments) |
| `sealedAtEpoch` | `long` | Layout epoch in which this segment was
sealed; only meaningful when `state == SEALED`. `0` when the segment
is `ACTIVE`; this sentinel is unambiguous because sealing always
happens as part of a split or merge, which increments the epoch — so a
segment can never be sealed at epoch `0`. |
Factory methods: `SegmentInfo.active(id, range, epoch)` and
`SegmentInfo.sealed(...)`.
#### SubscriptionCoordinator
Manages segment-to-consumer assignments for a single subscription
within a scalable topic.
**Assignment strategy:** round-robin. Active segments are sorted by
hash range start; consumers are sorted by name. Segments are
distributed evenly across consumers. When a consumer is added or
removed, or when the layout changes (split/merge), the coordinator
recomputes assignments and pushes updates to affected consumers.
**Key operations:**
- `addConsumer(consumer)` — adds a consumer, rebalances, and returns
the full assignment map.
- `removeConsumer(consumer)` — removes a consumer and redistributes
its segments.
- `onLayoutChange(newLayout)` — called after a split or merge;
recomputes all assignments against the new set of active segments.
#### ConsumerRegistration and ConsumerAssignment
`ConsumerRegistration` is a record identifying a connected consumer:
| Field | Type | Description |
|-------|------|-------------|
| `consumerId` | `long` | Protocol-level consumer ID |
| `consumerName` | `String` | Stable name chosen by the client |
| `cnx` | `TransportCnx` | The connection to push assignment updates |
`ConsumerAssignment` is the assignment sent to a consumer:
| Field | Type | Description |
|-------|------|-------------|
| `layoutEpoch` | `long` | The layout epoch this assignment is based on |
| `assignedSegments` | `List<AssignedSegment>` | The segments assigned
to this consumer |
Each `AssignedSegment` contains: `segmentId`, `hashRange`, and
`underlyingTopicName` (the `segment://` topic name the consumer should
connect to).
### Metadata Store Schema
Scalable topic metadata is stored in the metadata store under a
well-known path structure:
```
/topics/{tenant}/{namespace}/{encodedTopicName}
```
The value at this path is a JSON-serialized `ScalableTopicMetadata`:
```json
{
"epoch": 3,
"nextSegmentId": 5,
"segments": {
"0": { "segmentId": 0, "hashRange": {"start": 0, "end": 65535},
"state": "SEALED", "parentIds": [], "childIds": [1, 2],
"createdAtEpoch": 0, "sealedAtEpoch": 1 },
"1": { "segmentId": 1, "hashRange": {"start": 0, "end": 32767},
"state": "SEALED", "parentIds": [0], "childIds": [3, 4],
"createdAtEpoch": 1, "sealedAtEpoch": 3 },
"2": { "segmentId": 2, "hashRange": {"start": 32768, "end":
65535}, "state": "ACTIVE", "parentIds": [0], "childIds": [],
"createdAtEpoch": 1, "sealedAtEpoch": 0 },
"3": { "segmentId": 3, "hashRange": {"start": 0, "end": 16383},
"state": "ACTIVE", "parentIds": [1], "childIds": [], "createdAtEpoch":
3, "sealedAtEpoch": 0 },
"4": { "segmentId": 4, "hashRange": {"start": 16384, "end":
32767}, "state": "ACTIVE", "parentIds": [1], "childIds": [],
"createdAtEpoch": 3, "sealedAtEpoch": 0 }
},
"properties": {}
}
```
The controller leader lock is stored at:
```
/topics/{tenant}/{namespace}/{encodedTopicName}/controller
```
The value is the broker service URL of the elected leader.
### DAG Watch Sessions
As described in [Broker Roles](#broker-roles-and-request-routing), any
broker can serve a DAG watch session because the segment metadata
lives in the shared metadata store — no controller leader involvement
is required. The session is established via the binary protocol:
1. **Client sends `CommandScalableTopicLookup`** with a
client-assigned `sessionId` and the `topic://` topic name.
2. **Broker creates a `DagWatchSession`** that:
- Loads the current `ScalableTopicMetadata` from the store.
- Resolves which broker owns each active segment's `segment://`
topic (via topic lookup).
- Reads the controller broker URL from the leader lock path.
- Registers a metadata store notification listener for changes to
the topic's metadata path.
3. **Broker sends `CommandScalableTopicUpdate`** with the initial
`ScalableTopicDAG` containing the full segment DAG, broker addresses,
and controller URL.
4. **On metadata change**, the notification listener fires. The broker
reloads metadata, re-resolves broker addresses, and pushes an updated
`CommandScalableTopicUpdate` to the client.
5. **Client sends `CommandScalableTopicClose`** to tear down the session.
The `ScalableTopicDAG` protocol message contains:
| Field | Type | Description |
|-------|------|-------------|
| `epoch` | `uint64` | Layout epoch |
| `segments` | `repeated SegmentInfoProto` | Full segment DAG |
| `segment_brokers` | `repeated SegmentBrokerAddress` | Broker URL for
each active segment |
| `controller_broker_url` | `string` | Controller leader's broker URL |
| `controller_broker_url_tls` | `string` | TLS variant |
---
## Split Protocol
Splitting a segment is the core scaling-out operation. The protocol is
carefully ordered to guarantee that **no messages are lost** and **all
subscription cursors exist before producers are redirected**.
### Step-by-step
```
Controller Metadata Store Broker(s)
│ │ │
1. Discover │──── getSubscriptions ───────>│ │
subscriptions│<─── [sub-A, sub-B] ─────────│ │
│ │ │
2. Create │──── createSegment(child1, ──>│ ─── route ────────> │
children │ [sub-A, sub-B]) │ │
create topic
│──── createSegment(child2, ──>│ ─── route ────────> │ + cursors
│ [sub-A, sub-B]) │ │
│ │ │
3. Terminate │──── terminateSegment ───────>│ ─── route ────────> │
parent │ (parent) │ │ seal ML
│ │ │
4. Update │──── CAS update ─────────────>│ │
metadata │ (atomic) │ │
│ │ │
5. Notify │──── push to consumers ──────────────────────────> │
consumers │ (rebalance) │ │
```
**Step 1: Discover subscriptions.** The controller queries the parent
segment topic for its list of subscriptions. It first checks locally
(if the segment is on this broker), falling back to the admin API for
remote segments.
**Step 2: Create child segment topics.** Two new `segment://` topics
are created via the Segments admin API. Each creation request includes
the list of subscriptions discovered in step 1. The Segments API
handler creates the persistent topic and initializes subscription
cursors at the `Earliest` position. The admin API routes to whichever
broker owns the namespace bundle for each segment, so child segments
may be created on different brokers.
**Step 3: Terminate parent segment.** The parent segment topic is
terminated via the Segments admin API, which writes a termination
marker to the managed ledger. After termination, producers writing to
the parent receive `TopicTerminated` and must re-route to child
segments.
**Step 4: Atomic metadata update.** The controller issues a
compare-and-swap (CAS) update to the metadata store. The update
transitions the parent to `SEALED` state with child pointers, and adds
the two new `ACTIVE` child segments. The CAS guarantees atomicity — if
another operation modified the metadata concurrently, the update fails
and must be retried.
**Step 5: Notify consumers.** All `SubscriptionCoordinator` instances
are notified of the layout change. They recompute segment-to-consumer
assignments and push updates to connected consumers.
### Why this ordering matters
- Steps 1-2 **before** step 4: if we updated metadata first, clients
would discover the new segments immediately. Producers would start
writing to child segments that have no subscription cursors yet,
causing those messages to be missed by consumers.
- Step 3 **before** step 4: terminating the parent before the metadata
update ensures that by the time clients see the new layout, the parent
is already sealed. There is no window where both parent and children
are writable.
- Step 2 creates cursors at `Earliest`: this is safe because the child
topics are brand new (empty). The cursor starts at the beginning, and
the first message written by a redirected producer will be the first
message consumed.
---
## Merge Protocol
Merging two adjacent segments is the core scaling-in operation. The
protocol follows the same ordering principle: **create first, update
metadata last**.
### Step-by-step
**Step 1: Discover subscriptions.** The controller queries both parent
segments for their subscriptions and takes the union, so no
subscription is lost.
**Step 2: Create merged segment topic.** A single new `segment://`
topic is created with all discovered subscriptions.
**Step 3: Terminate both parent segments.** Both parents are
terminated via the admin API. This is done sequentially to avoid a
race where producers of one parent are still writing while the other
is already sealed.
**Step 4: Atomic metadata update.** The CAS update seals both parents
with child pointers to the merged segment, and adds the new `ACTIVE`
merged segment.
**Step 5: Notify consumers.** Same as split — rebalance and push.
### Cross-broker coordination
Unlike splits, merges inherently involve multiple brokers because the
two parent segments may be owned by different brokers (assigned by the
load manager). The controller leader handles this transparently: all
segment operations (create, terminate, delete) go through the Segments
admin API, which routes each request to the broker that currently owns
the target segment's namespace bundle. The controller does not need to
know or care which broker owns which segment — the admin API routing
handles it.
---
## Segment Topic Management via Admin API
Segment topics are managed exclusively through a dedicated REST API at
`/admin/v2/segments`. This is a deliberate design choice: segment
topics use the `segment://` domain and must not be confused with
regular `persistent://` topics.
### Endpoints
| Method | Path | Description |
|--------|------|-------------|
| `PUT` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Create a
segment topic. Optional request body with subscription names to
pre-create. |
| `POST` | `/{tenant}/{namespace}/{topic}/{descriptor}/terminate` |
Terminate (seal) a segment topic. |
| `DELETE` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Delete a
segment topic. |
The `{descriptor}` is the segment's hash range and ID in the format
`{hexStart}-{hexEnd}-{segmentId}` (e.g., `0000-7fff-1`).
### Create Segment
When creating a segment topic, the handler:
1. Constructs the full `segment://` topic name from the path components.
2. Validates namespace bundle ownership (the request is routed to the
owning broker).
3. Creates the persistent topic via `BrokerService.getOrCreateTopic()`.
4. For each subscription in the request body, creates a cursor at the
`Earliest` position.
### Terminate Segment
Terminates the managed ledger backing the segment topic. After
termination, the topic accepts no further writes, and producers
receive `TopicTerminated`.
### Delete Segment
Deletes the segment topic and its managed ledger. Used during scalable
topic deletion to clean up all underlying storage.
---
## Consumer Assignment
Consumer assignment behavior depends on the consumer type:
- **StreamConsumer** (ordered, cumulative ack) — requires coordinated
segment-to-consumer assignment from the controller leader. Each active
segment is owned by exactly one consumer at any time to preserve
per-segment ordering. This is the subject of the rest of this section.
- **CheckpointConsumer** (unmanaged, for connectors) — same as
StreamConsumer from a controller perspective: it registers with the
controller leader and receives an explicit segment assignment.
- **QueueConsumer** (unordered, individual ack) — **does not require
any controller-side assignment**. Because there is no ordering
requirement, every queue consumer attaches directly to **all**
segments of the scalable topic — both `ACTIVE` and `SEALED` — and each
segment-owning broker independently performs round-robin delivery
across the queue consumers attached to that segment. New segments
produced by a split are picked up transparently via the DAG watch
session push; sealed segments keep serving messages until their
backlog drains and are then dropped from the set. The controller
leader is not in the data path for queue consumers and no
`SubscriptionCoordinator` is involved for them.
### Registration Flow (StreamConsumer / CheckpointConsumer)
1. A `StreamConsumer` connects to the controller broker (discovered
via the DAG watch session's `controller_broker_url`).
2. The consumer sends a registration request with its `subscription`
name and `consumerName`.
3. The controller's `registerConsumer()` method routes to the
appropriate `SubscriptionCoordinator` (created on first use for that
subscription).
4. The coordinator adds the consumer, recomputes assignments, and
returns the consumer's `ConsumerAssignment` — a list of `(segmentId,
hashRange, segmentTopicName)` tuples.
5. The consumer connects to each assigned `segment://` topic and
begins consuming.
### Rebalancing
Rebalancing occurs when:
- A consumer is added or removed.
- A split or merge changes the set of active segments.
The rebalancing algorithm is round-robin:
1. Collect all active segments, sorted by hash range start.
2. Collect all consumers, sorted by name (for deterministic ordering).
3. Assign segments to consumers in round-robin order.
4. For each consumer whose assignment changed, push the new
`ConsumerAssignment`.
### Consumer Session Lifecycle
A consumer's registration and its segment assignment are treated as a
**persistent session**, not as a transient association tied to a TCP
connection. This is a deliberate departure from the existing Pulsar
consumer model, where consumer liveness is asserted purely by the TCP
connection to the broker.
**Rationale.** Scalable topic consumer assignments have non-trivial
cost: when a consumer disappears, its segments are redistributed among
the remaining consumers, each of which may need to reconnect to
different segment brokers and (for ordered consumers) drain in-flight
messages before the new assignment takes effect. Treating a brief
disconnection as a full consumer loss would cause unnecessary
rebalancing in common scenarios such as a consumer process restart or
a broker restart.
**What is persisted vs. in-memory.**
The **session itself is persisted** — the keep-alive state is not.
- **Persisted in the metadata store (the session):** the
`ConsumerRegistration` — consumer name and its current segment
assignment — keyed by `consumerName` under the subscription path. Once
a consumer successfully registers, this entry is durable and outlives
TCP disconnects, client restarts, and controller leader failovers. The
assignment survives failover without forcing consumers to re-register.
- **In-memory on the controller leader (the keep-alive):** whether
each consumer is currently connected, and the grace-period timer that
runs while it is disconnected. Keep-alive signals are **not** written
to the metadata store; the leader observes the consumer's connection
state directly and tracks the timer in RAM. This avoids a metadata
store write on every liveness tick.
**Session semantics (steady state).**
- When a consumer registers, the `SubscriptionCoordinator` writes its
`ConsumerRegistration` and marks it connected in-memory.
- If the consumer's connection drops, the leader does **not**
immediately remove the consumer. It transitions the in-memory state to
"disconnected" and starts a configurable **session grace period**
timer for that consumer.
- If the same `consumerName` reconnects within the grace period, the
timer is cancelled and the consumer resumes with the same persisted
assignment — no rebalance, no other consumer disturbed.
- If the grace period expires, the leader deletes the
`ConsumerRegistration` from the metadata store and triggers a
rebalance for the remaining consumers.
**Behavior on controller leader failover.**
Because liveness timers are in-memory only, they are lost when the
leader broker crashes. The new leader:
1. Loads all `ConsumerRegistration` entries for each subscription from
the metadata store, restoring the prior segment assignment.
2. Treats every consumer as "just disconnected" and starts a fresh
grace-period timer for each. No timestamps carry across from the
previous leader.
3. Consumers reconnecting to the new leader within the fresh grace
period resume with the same assignment — seamless from their
perspective.
4. Consumers that fail to reconnect before the fresh timer expires are
evicted and their segments are redistributed.
This means a leader failover always gives clients a full grace period
to reconnect, regardless of how long they had already been
disconnected under the old leader. The trade-off is explicit: we keep
the metadata store writes proportional to real membership changes
(register / unregister / rebalance) rather than to liveness ticks.
**Covered scenarios.**
- **Consumer process restart.** A client process restarts and
reconnects with the same `consumerName` well within the grace period —
the same segments are reassigned, no redistribution.
- **Controller leader restart.** The leader is re-elected on a
different broker. Consumer entries and assignments are restored from
the metadata store; the new leader starts a fresh grace-period timer
for every consumer so reconnects are transparent.
- **Segment-owning broker restart.** Segment topics are reassigned by
the standard Pulsar load manager; the consumer's segment-to-consumer
assignment is unaffected and reconnects to the new owning broker are
transparent at the consumer-assignment level.
The grace period and related tunables are configurable per broker and
will be specified in a follow-up sub-PIP.
### Layout Change Propagation
When a split or merge completes and the metadata is updated:
1. The controller reloads the metadata and creates a new `SegmentLayout`.
2. `notifySubscriptions()` is called, which iterates all
`SubscriptionCoordinator` instances.
3. Each coordinator calls `onLayoutChange(newLayout)`, which
recomputes assignments against the new active segments and pushes
updates to affected consumers.
---
## Leader Election and Failover
### Election
Each `ScalableTopicController` participates in leader election via the
metadata store's `LeaderElection` API. The election path is:
```
/topics/{tenant}/{namespace}/{encodedTopicName}/controller
```
The elected leader writes its broker service URL as the election
value. Clients discover the controller by reading this value
(delivered as part of the DAG watch session response).
### Failover
When the leader broker fails:
1. The metadata store detects the session loss and clears the leader lock.
2. Other brokers with active controllers for the same topic receive a
`NoLeader` state change notification.
3. The `ScalableTopicService.onLeaderStateChange()` handler re-invokes
`controller.initialize()`, which reloads metadata and re-attempts
election.
4. The new leader restores in-memory state from the persisted
metadata, including all subscriptions and their consumer registrations
with current assignments. Consumers reconnecting within the session
grace period find their sessions intact and resume with the same
segment assignment without a rebalance (see [Consumer Session
Lifecycle](#consumer-session-lifecycle)).
### Non-leader brokers
A broker that loses leadership or never wins it retains the controller
instance but all mutating operations (`splitSegment`, `mergeSegments`,
`registerConsumer`) throw `IllegalStateException` after a
`checkLeader()` guard. The controller remains available for read
operations like `getLayout()`.
---
## Scalable Topic Lifecycle
### Creation
```
Admin API (PUT /admin/v2/scalable/{tenant}/{namespace}/{topic})
└─> ScalableTopicService.createScalableTopic(topic, numInitialSegments)
├─> ScalableTopicController.createInitialMetadata(numInitialSegments)
│ └─> Divides [0x0000, 0xFFFF] into N equal ranges
│ Creates N SegmentInfo records (ACTIVE, no parents)
│ Returns ScalableTopicMetadata with epoch=0
├─> ScalableTopicResources.createScalableTopicAsync(topic, metadata)
│ └─> Writes metadata to store at /topics/{t}/{ns}/{topic}
└─> For each segment: createSegmentAsync via Segments admin API
└─> Creates segment:// topic on owning broker
```
### Deletion
```
Admin API (DELETE /admin/v2/scalable/{tenant}/{namespace}/{topic})
└─> ScalableTopicService.deleteScalableTopic(topic)
├─> releaseController(topic)
│ └─> Closes leader election, removes from controllers map
├─> Load metadata from store
├─> For each segment: deleteSegmentAsync via Segments admin API
│ └─> Deletes segment:// topic on owning broker (best-effort)
└─> ScalableTopicResources.deleteScalableTopicAsync(topic)
└─> Removes metadata from store
```
---
## Public-Facing Changes
### Binary Protocol
Three new protocol commands (added to `PulsarApi.proto`):
| Command | Direction | Field ID | Description |
|---------|-----------|----------|-------------|
| `CommandScalableTopicLookup` | Client → Broker | 70 | Initiates a
DAG watch session |
| `CommandScalableTopicUpdate` | Broker → Client | 71 | Initial
response and subsequent pushed updates |
| `CommandScalableTopicClose` | Client → Broker | 72 | Closes the DAG
watch session |
New protocol messages:
- `ScalableTopicDAG` — the full segment DAG with broker addresses and
controller URL.
- `SegmentInfoProto` — per-segment record in the DAG.
- `SegmentBrokerAddress` — maps a segment ID to its owning broker.
- `SegmentState` enum — `ACTIVE` or `SEALED`.
### Admin API
**Scalable Topics** (`/admin/v2/scalable`):
| Method | Path | Description |
|--------|------|-------------|
| `GET` | `/{tenant}/{namespace}` | List scalable topics |
| `PUT` | `/{tenant}/{namespace}/{topic}` | Create scalable topic |
| `GET` | `/{tenant}/{namespace}/{topic}` | Get topic metadata |
| `DELETE` | `/{tenant}/{namespace}/{topic}` | Delete scalable topic |
| `GET` | `/{tenant}/{namespace}/{topic}/stats` | Get aggregated stats
for the scalable topic (segment counts, per-segment rates,
per-subscription state, consumer counts) |
| `PUT` | `/{tenant}/{namespace}/{topic}/subscriptions/{subscription}`
| Create a subscription on the scalable topic. The controller
propagates it to all active segments by issuing `createSubscription`
on each `segment://` topic via the Segments admin API. |
| `DELETE` | `/{tenant}/{namespace}/{topic}/subscriptions/{subscription}`
| Delete a subscription. The controller unregisters all consumers and
deletes the subscription from every segment. |
| `POST` | `/{tenant}/{namespace}/{topic}/split/{segmentId}` | Split a segment |
| `POST` | `/{tenant}/{namespace}/{topic}/merge/{segmentId1}/{segmentId2}`
| Merge two segments |
**Segments** (`/admin/v2/segments`):
| Method | Path | Description |
|--------|------|-------------|
| `PUT` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Create segment topic |
| `POST` | `/{tenant}/{namespace}/{topic}/{descriptor}/terminate` |
Terminate segment |
| `DELETE` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Delete
segment topic |
### Admin Client API
New `ScalableTopics` interface on `PulsarAdmin`:
- `listScalableTopics(namespace)` / `listScalableTopicsAsync(namespace)`
- `createScalableTopic(topic, numSegments)` / `createScalableTopicAsync(...)`
- `getMetadata(topic)` / `getMetadataAsync(topic)`
- `getStats(topic)` / `getStatsAsync(topic)`
- `deleteScalableTopic(topic)` / `deleteScalableTopicAsync(topic)`
- `createSubscription(topic, subscription)` / `createSubscriptionAsync(...)`
- `deleteSubscription(topic, subscription)` / `deleteSubscriptionAsync(...)`
- `splitSegment(topic, segmentId)` / `splitSegmentAsync(...)`
- `mergeSegments(topic, segmentId1, segmentId2)` / `mergeSegmentsAsync(...)`
- `createSegment(segmentTopic, subscriptions)` / `createSegmentAsync(...)`
- `terminateSegment(segmentTopic)` / `terminateSegmentAsync(...)`
- `deleteSegment(segmentTopic, force)` / `deleteSegmentAsync(...)`
### Configuration
| Property | Default | Description |
|----------|---------|-------------|
| `scalableTopicEnabled` | `true` | Enable scalable topic support on
the broker |
Additional configuration for auto-split thresholds and consumer
controller session grace period will be specified in follow-up
sub-PIPs.
### Metadata Store Paths
| Path | Content |
|------|---------|
| `/topics/{tenant}/{namespace}/{topic}` | `ScalableTopicMetadata`
JSON — segment DAG and global topic state |
| `/topics/{tenant}/{namespace}/{topic}/controller` | Leader broker
URL (ephemeral) |
| `/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}`
| `SubscriptionMetadata` — subscription-level config and the persisted
set of consumer registrations |
|
`/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/consumers/{consumerName}`
| `ConsumerRegistration` — the durable session: consumer name and its
current segment assignment. Keep-alive state (connected/disconnected,
grace-period timer) is in-memory on the controller leader only and is
**not** persisted here. |
---
## Backward & Forward Compatibility
### Upgrade
The controller is a new component with no interaction with existing
partitioned or non-partitioned topics. Enabling `scalableTopicEnabled`
activates the new code paths. Existing topics are unaffected.
### Downgrade / Rollback
Scalable topic metadata and segment data remain in the metadata store
and BookKeeper. Rolling back to a version without scalable topic
support leaves this data intact but inaccessible. Upgrading again
restores access.
### Client Compatibility
The existing Pulsar client SDK rejects `topic://` and `segment://`
domains with a clear error message directing users to the V5 client
SDK. This check is enforced in `PulsarClientImpl` for producer,
consumer, and reader creation paths.
---
## Security Considerations
The controller follows the same tenant/namespace/topic authorization
model as existing Pulsar topics:
- Admin API operations require topic-level admin permissions.
- DAG watch sessions require topic-level lookup permissions.
- Consumer registration requires topic-level consume permissions.
- Segment operations are internal and authenticated via the broker's
internal admin client.
The controller leader lock in the metadata store is accessible only to
authenticated brokers.
---
## Links
- Parent PIP: [PIP-460: Scalable Topics](pip-460.md)
- V5 Client API: [PIP-466: New Java Client API (V5)](pip-466.md)
- Mailing List discussion thread: TBD
--
Matteo Merli
<[email protected]>