+1 (binding) -Lari
On 2026/04/17 04:03:36 Matteo Merli wrote: > Matteo Merli <[email protected]> > > Mon, Apr 13, 11:11 PM (3 days ago) > to Dev > https://github.com/apache/pulsar/pull/25516 > > > ----- > > # PIP-468: Scalable Topic Controller > > *Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)* > > ## Motivation > > [PIP-460](pip-460.md) introduces scalable topics as a new topic type > in Pulsar, built on a DAG of range segments that can be split and > merged to scale independently of the initial configuration. PIP-460 > defines the overall vision but defers the detailed design of each > component to dedicated sub-PIPs. > > This PIP specifies the **Scalable Topic Controller** — the broker-side > component responsible for: > > 1. **Segment lifecycle management** — creating, terminating, and > deleting the underlying segment topics that back a scalable topic. > 2. **Segment layout coordination** — executing split and merge > operations with correct ordering guarantees so that no messages are > lost and all subscription cursors exist before producers are > redirected. > 3. **Consumer assignment** — coordinating which consumers receive > messages from which segments for a given subscription. > 4. **Leader election** — ensuring exactly one broker acts as the > controller for each scalable topic, with automatic failover. > 5. **DAG watch sessions** — pushing topology updates to connected > clients (producers and consumers) when the segment layout changes. > > Without a well-defined controller, split and merge operations would be > unsafe: producers could write to segments that have no subscription > cursors, consumers could miss messages during topology changes, and > concurrent layout modifications could corrupt the segment DAG. > > --- > > ## Design > > ### Architecture Overview > > The controller subsystem consists of four layers: > > ``` > ┌──────────────────────────────────────────────────────────┐ > │ Admin API Layer │ > │ ScalableTopics REST + Segments REST │ > └──────────────┬───────────────────────┬───────────────────┘ > │ │ > ┌──────────────▼───────────────────────▼───────────────────┐ > │ ScalableTopicService │ > │ Per-broker singleton; manages controllers │ > └──────────────────────────┬───────────────────────────────┘ > │ > ┌──────────────────────────▼───────────────────────────────┐ > │ ScalableTopicController (per topic) │ > │ Leader-elected; owns layout, split/merge, consumers │ > │ │ > │ ┌─────────────────┐ ┌───────────────────────────────┐ │ > │ │ SegmentLayout │ │ SubscriptionCoordinator (×N) │ │ > │ │ (immutable DAG) │ │ per-subscription assignments │ │ > │ └─────────────────┘ └───────────────────────────────┘ │ > └──────────────────────────┬───────────────────────────────┘ > │ > ┌──────────────────────────▼───────────────────────────────┐ > │ ScalableTopicResources │ > │ Metadata store access (read/write/watch) │ > └──────────────────────────────────────────────────────────┘ > ``` > > ### Broker Roles and Request Routing > > A scalable topic involves three distinct broker roles. Understanding > which broker handles which operation is critical to the design. > > #### Any broker — DAG watch sessions > > A **DAG watch session** (scalable topic lookup) can be served by **any > broker** in the cluster. The session reads segment metadata from the > shared metadata store and registers a watch for changes. No state is > held on the broker beyond the watch registration. This means producers > and consumers can connect to any broker for topic discovery — the same > way regular topic lookups work today. > > When the metadata changes (due to a split or merge performed by the > controller leader), the metadata store notification fires on whichever > broker is serving the watch session, and that broker pushes the > updated DAG to the client. > > #### Controller leader — layout mutations and consumer assignment > > Each scalable topic has exactly one **controller leader** across the > cluster, elected via the metadata store. The controller leader is the > only broker that can: > > - Execute **split** and **merge** operations (the multi-step protocols > described below). > - Accept **consumer registrations** and compute segment-to-consumer > assignments. > - **Notify consumers** of assignment changes after topology updates. > > Clients discover the controller leader's broker URL through the DAG > watch session response (`controller_broker_url` field). StreamConsumer > and CheckpointConsumer clients then connect directly to the controller > leader to register and receive assignments. > > Admin API requests for split and merge are routed to the > `ScalableTopicService` on any broker, which obtains (or creates) a > local `ScalableTopicController` that participates in leader election. > If the local controller is the leader, it executes the operation > directly. If not, the request must be redirected to the leader (or the > controller must first win the election). The split and merge admin > endpoints do not require the request to land on the leader broker — > the `ScalableTopicService` handles this transparently. > > #### Segment-owning brokers — produce and consume > > Individual **segment topics** (`segment://`) are regular persistent > topics from the broker's perspective. They are distributed across the > cluster by the standard Pulsar **load manager** and namespace bundle > assignment — the same mechanism used for `persistent://` topics. Each > segment topic is owned by exactly one broker at any time, determined > by its namespace bundle hash. > > The controller leader does not need to own any of the segment topics. > When the controller needs to operate on a segment (create, terminate, > delete, discover subscriptions), it uses the **Segments admin API**, > which routes the request to whichever broker currently owns that > segment's namespace bundle. This decoupling means the controller can > coordinate segments spread across many brokers without requiring > co-location. > > ``` > ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ > │ Broker A │ │ Broker B │ │ Broker C │ > │ │ │ (controller │ │ │ > │ segment-0 │ │ leader) │ │ segment-2 │ > │ segment-1 │ │ │ │ │ > │ │ │ DAG watch │ │ DAG watch │ > │ DAG watch │ │ sessions │ │ sessions │ > │ sessions │ │ │ │ │ > └─────────────┘ └─────────────┘ └─────────────┘ > │ > Split/merge ops > use admin API to > reach segments on > Broker A and C > ``` > > In this example, Broker B is the controller leader but owns no > segments. When it executes a split of segment-0, it calls the Segments > admin API which routes to Broker A (the owner of segment-0). > > ### Components > > #### ScalableTopicService > > A per-broker singleton, created and owned by `BrokerService`. It > manages the lifecycle of all `ScalableTopicController` instances on > the broker. > > **Responsibilities:** > > - **Controller management**: maintains a map of active controllers > keyed by topic name. Creates controllers on demand via > `getOrCreateController()` and releases them on topic unload or broker > shutdown. > - **Topic lifecycle**: handles `createScalableTopic()` and > `deleteScalableTopic()` admin operations, including creating/deleting > the underlying segment topics via the Segments admin API. > - **Delegation**: routes `splitSegment()` and `mergeSegments()` > requests to the appropriate controller. > - **Leader state monitoring**: listens for leader election state > changes and re-attempts election when the current leader is lost. > > #### ScalableTopicController > > A per-topic coordinator. Only one instance across the cluster is the > **leader** for a given scalable topic, ensured by leader election > through the metadata store. The leader is the only instance that > modifies the segment layout or assigns consumers. > > **State:** > > | Field | Type | Description | > |-------|------|-------------| > | `topicName` | `TopicName` | The `topic://` name of the scalable topic | > | `currentLayout` | `SegmentLayout` | The current immutable snapshot > of the segment DAG | > | `subscriptions` | `Map<String, SubscriptionCoordinator>` | > Per-subscription consumer coordinators | > | `leaderState` | `LeaderElectionState` | Current leader election state | > | `leaderElection` | `LeaderElection<String>` | Metadata store leader > election handle; value is the broker URL | > > **Key operations:** > > - `initialize()` — loads the current metadata from the store, then > attempts leader election. The elected leader stores its broker service > URL so that clients can discover and connect to it. > - `splitSegment(segmentId)` — splits an active segment at its midpoint > (see [Split Protocol](#split-protocol)). > - `mergeSegments(segmentId1, segmentId2)` — merges two adjacent active > segments (see [Merge Protocol](#merge-protocol)). > - `registerConsumer(subscription, consumer)` — registers a consumer > and returns its initial segment assignment. > - `unregisterConsumer(subscription, consumer)` — removes a consumer > and triggers rebalancing. > - `getLeaderBrokerUrl()` — returns the current leader's broker URL, > used by clients to connect to the controller. > > #### SegmentLayout > > An immutable, versioned snapshot of the segment DAG. All mutation > methods (`splitSegment`, `mergeSegments`, `pruneSegment`) return a > **new** `SegmentLayout` instance — the original is never modified. > This ensures safe concurrent reads without locking. > > **Fields:** > > | Field | Type | Description | > |-------|------|-------------| > | `epoch` | `long` | Monotonically increasing version; incremented on > every layout change | > | `nextSegmentId` | `long` | Counter for assigning IDs to new segments | > | `allSegments` | `Map<Long, SegmentInfo>` | Complete DAG: all > segments (active + sealed) | > | `activeSegments` | `Map<Long, SegmentInfo>` | Filtered view: only > `ACTIVE` segments | > > **Key operations:** > > - `findActiveSegment(hash)` — returns the active segment whose hash > range contains the given hash value. Used by producers for message > routing. > - `splitSegment(segmentId)` — validates the segment is active, > computes the midpoint of its hash range, creates two child > `SegmentInfo` records covering `[start, mid]` and `[mid+1, end]`, > seals the parent, and returns a new layout with incremented epoch. > - `mergeSegments(id1, id2)` — validates both segments are active and > adjacent (the end of one equals `start - 1` of the other), creates a > merged `SegmentInfo` covering the combined range, seals both parents, > and returns a new layout. > - `getLineage(segmentId)` — returns the full ancestor + descendant > chain for a segment, used for DAG traversal during catch-up reads. > - `toMetadata()` / `fromMetadata()` — converts to/from the persisted > `ScalableTopicMetadata` format. > > #### SegmentInfo > > A record representing a single segment in the DAG: > > | Field | Type | Description | > |-------|------|-------------| > | `segmentId` | `long` | Unique, monotonically increasing identifier | > | `hashRange` | `HashRange` | The `[start, end]` inclusive hash range > this segment covers | > | `state` | `SegmentState` | `ACTIVE` or `SEALED` | > | `parentIds` | `List<Long>` | Parent segments (empty for root segments) | > | `childIds` | `List<Long>` | Child segments (empty for leaf/active segments) > | > | `createdAtEpoch` | `long` | Layout epoch in which this segment was > created (`0` for the initial segments) | > | `sealedAtEpoch` | `long` | Layout epoch in which this segment was > sealed; only meaningful when `state == SEALED`. `0` when the segment > is `ACTIVE`; this sentinel is unambiguous because sealing always > happens as part of a split or merge, which increments the epoch — so a > segment can never be sealed at epoch `0`. | > > Factory methods: `SegmentInfo.active(id, range, epoch)` and > `SegmentInfo.sealed(...)`. > > #### SubscriptionCoordinator > > Manages segment-to-consumer assignments for a single subscription > within a scalable topic. > > **Assignment strategy:** round-robin. Active segments are sorted by > hash range start; consumers are sorted by name. Segments are > distributed evenly across consumers. When a consumer is added or > removed, or when the layout changes (split/merge), the coordinator > recomputes assignments and pushes updates to affected consumers. > > **Key operations:** > > - `addConsumer(consumer)` — adds a consumer, rebalances, and returns > the full assignment map. > - `removeConsumer(consumer)` — removes a consumer and redistributes > its segments. > - `onLayoutChange(newLayout)` — called after a split or merge; > recomputes all assignments against the new set of active segments. > > #### ConsumerRegistration and ConsumerAssignment > > `ConsumerRegistration` is a record identifying a connected consumer: > > | Field | Type | Description | > |-------|------|-------------| > | `consumerId` | `long` | Protocol-level consumer ID | > | `consumerName` | `String` | Stable name chosen by the client | > | `cnx` | `TransportCnx` | The connection to push assignment updates | > > `ConsumerAssignment` is the assignment sent to a consumer: > > | Field | Type | Description | > |-------|------|-------------| > | `layoutEpoch` | `long` | The layout epoch this assignment is based on | > | `assignedSegments` | `List<AssignedSegment>` | The segments assigned > to this consumer | > > Each `AssignedSegment` contains: `segmentId`, `hashRange`, and > `underlyingTopicName` (the `segment://` topic name the consumer should > connect to). > > ### Metadata Store Schema > > Scalable topic metadata is stored in the metadata store under a > well-known path structure: > > ``` > /topics/{tenant}/{namespace}/{encodedTopicName} > ``` > > The value at this path is a JSON-serialized `ScalableTopicMetadata`: > > ```json > { > "epoch": 3, > "nextSegmentId": 5, > "segments": { > "0": { "segmentId": 0, "hashRange": {"start": 0, "end": 65535}, > "state": "SEALED", "parentIds": [], "childIds": [1, 2], > "createdAtEpoch": 0, "sealedAtEpoch": 1 }, > "1": { "segmentId": 1, "hashRange": {"start": 0, "end": 32767}, > "state": "SEALED", "parentIds": [0], "childIds": [3, 4], > "createdAtEpoch": 1, "sealedAtEpoch": 3 }, > "2": { "segmentId": 2, "hashRange": {"start": 32768, "end": > 65535}, "state": "ACTIVE", "parentIds": [0], "childIds": [], > "createdAtEpoch": 1, "sealedAtEpoch": 0 }, > "3": { "segmentId": 3, "hashRange": {"start": 0, "end": 16383}, > "state": "ACTIVE", "parentIds": [1], "childIds": [], "createdAtEpoch": > 3, "sealedAtEpoch": 0 }, > "4": { "segmentId": 4, "hashRange": {"start": 16384, "end": > 32767}, "state": "ACTIVE", "parentIds": [1], "childIds": [], > "createdAtEpoch": 3, "sealedAtEpoch": 0 } > }, > "properties": {} > } > ``` > > The controller leader lock is stored at: > > ``` > /topics/{tenant}/{namespace}/{encodedTopicName}/controller > ``` > > The value is the broker service URL of the elected leader. > > ### DAG Watch Sessions > > As described in [Broker Roles](#broker-roles-and-request-routing), any > broker can serve a DAG watch session because the segment metadata > lives in the shared metadata store — no controller leader involvement > is required. The session is established via the binary protocol: > > 1. **Client sends `CommandScalableTopicLookup`** with a > client-assigned `sessionId` and the `topic://` topic name. > 2. **Broker creates a `DagWatchSession`** that: > - Loads the current `ScalableTopicMetadata` from the store. > - Resolves which broker owns each active segment's `segment://` > topic (via topic lookup). > - Reads the controller broker URL from the leader lock path. > - Registers a metadata store notification listener for changes to > the topic's metadata path. > 3. **Broker sends `CommandScalableTopicUpdate`** with the initial > `ScalableTopicDAG` containing the full segment DAG, broker addresses, > and controller URL. > 4. **On metadata change**, the notification listener fires. The broker > reloads metadata, re-resolves broker addresses, and pushes an updated > `CommandScalableTopicUpdate` to the client. > 5. **Client sends `CommandScalableTopicClose`** to tear down the session. > > The `ScalableTopicDAG` protocol message contains: > > | Field | Type | Description | > |-------|------|-------------| > | `epoch` | `uint64` | Layout epoch | > | `segments` | `repeated SegmentInfoProto` | Full segment DAG | > | `segment_brokers` | `repeated SegmentBrokerAddress` | Broker URL for > each active segment | > | `controller_broker_url` | `string` | Controller leader's broker URL | > | `controller_broker_url_tls` | `string` | TLS variant | > > --- > > ## Split Protocol > > Splitting a segment is the core scaling-out operation. The protocol is > carefully ordered to guarantee that **no messages are lost** and **all > subscription cursors exist before producers are redirected**. > > ### Step-by-step > > ``` > Controller Metadata Store Broker(s) > │ │ │ > 1. Discover │──── getSubscriptions ───────>│ │ > subscriptions│<─── [sub-A, sub-B] ─────────│ │ > │ │ │ > 2. Create │──── createSegment(child1, ──>│ ─── route ────────> │ > children │ [sub-A, sub-B]) │ │ > create topic > │──── createSegment(child2, ──>│ ─── route ────────> │ + > cursors > │ [sub-A, sub-B]) │ │ > │ │ │ > 3. Terminate │──── terminateSegment ───────>│ ─── route ────────> │ > parent │ (parent) │ │ seal ML > │ │ │ > 4. Update │──── CAS update ─────────────>│ │ > metadata │ (atomic) │ │ > │ │ │ > 5. Notify │──── push to consumers ──────────────────────────> │ > consumers │ (rebalance) │ │ > ``` > > **Step 1: Discover subscriptions.** The controller queries the parent > segment topic for its list of subscriptions. It first checks locally > (if the segment is on this broker), falling back to the admin API for > remote segments. > > **Step 2: Create child segment topics.** Two new `segment://` topics > are created via the Segments admin API. Each creation request includes > the list of subscriptions discovered in step 1. The Segments API > handler creates the persistent topic and initializes subscription > cursors at the `Earliest` position. The admin API routes to whichever > broker owns the namespace bundle for each segment, so child segments > may be created on different brokers. > > **Step 3: Terminate parent segment.** The parent segment topic is > terminated via the Segments admin API, which writes a termination > marker to the managed ledger. After termination, producers writing to > the parent receive `TopicTerminated` and must re-route to child > segments. > > **Step 4: Atomic metadata update.** The controller issues a > compare-and-swap (CAS) update to the metadata store. The update > transitions the parent to `SEALED` state with child pointers, and adds > the two new `ACTIVE` child segments. The CAS guarantees atomicity — if > another operation modified the metadata concurrently, the update fails > and must be retried. > > **Step 5: Notify consumers.** All `SubscriptionCoordinator` instances > are notified of the layout change. They recompute segment-to-consumer > assignments and push updates to connected consumers. > > ### Why this ordering matters > > - Steps 1-2 **before** step 4: if we updated metadata first, clients > would discover the new segments immediately. Producers would start > writing to child segments that have no subscription cursors yet, > causing those messages to be missed by consumers. > - Step 3 **before** step 4: terminating the parent before the metadata > update ensures that by the time clients see the new layout, the parent > is already sealed. There is no window where both parent and children > are writable. > - Step 2 creates cursors at `Earliest`: this is safe because the child > topics are brand new (empty). The cursor starts at the beginning, and > the first message written by a redirected producer will be the first > message consumed. > > --- > > ## Merge Protocol > > Merging two adjacent segments is the core scaling-in operation. The > protocol follows the same ordering principle: **create first, update > metadata last**. > > ### Step-by-step > > **Step 1: Discover subscriptions.** The controller queries both parent > segments for their subscriptions and takes the union, so no > subscription is lost. > > **Step 2: Create merged segment topic.** A single new `segment://` > topic is created with all discovered subscriptions. > > **Step 3: Terminate both parent segments.** Both parents are > terminated via the admin API. This is done sequentially to avoid a > race where producers of one parent are still writing while the other > is already sealed. > > **Step 4: Atomic metadata update.** The CAS update seals both parents > with child pointers to the merged segment, and adds the new `ACTIVE` > merged segment. > > **Step 5: Notify consumers.** Same as split — rebalance and push. > > ### Cross-broker coordination > > Unlike splits, merges inherently involve multiple brokers because the > two parent segments may be owned by different brokers (assigned by the > load manager). The controller leader handles this transparently: all > segment operations (create, terminate, delete) go through the Segments > admin API, which routes each request to the broker that currently owns > the target segment's namespace bundle. The controller does not need to > know or care which broker owns which segment — the admin API routing > handles it. > > --- > > ## Segment Topic Management via Admin API > > Segment topics are managed exclusively through a dedicated REST API at > `/admin/v2/segments`. This is a deliberate design choice: segment > topics use the `segment://` domain and must not be confused with > regular `persistent://` topics. > > ### Endpoints > > | Method | Path | Description | > |--------|------|-------------| > | `PUT` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Create a > segment topic. Optional request body with subscription names to > pre-create. | > | `POST` | `/{tenant}/{namespace}/{topic}/{descriptor}/terminate` | > Terminate (seal) a segment topic. | > | `DELETE` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Delete a > segment topic. | > > The `{descriptor}` is the segment's hash range and ID in the format > `{hexStart}-{hexEnd}-{segmentId}` (e.g., `0000-7fff-1`). > > ### Create Segment > > When creating a segment topic, the handler: > > 1. Constructs the full `segment://` topic name from the path components. > 2. Validates namespace bundle ownership (the request is routed to the > owning broker). > 3. Creates the persistent topic via `BrokerService.getOrCreateTopic()`. > 4. For each subscription in the request body, creates a cursor at the > `Earliest` position. > > ### Terminate Segment > > Terminates the managed ledger backing the segment topic. After > termination, the topic accepts no further writes, and producers > receive `TopicTerminated`. > > ### Delete Segment > > Deletes the segment topic and its managed ledger. Used during scalable > topic deletion to clean up all underlying storage. > > --- > > ## Consumer Assignment > > Consumer assignment behavior depends on the consumer type: > > - **StreamConsumer** (ordered, cumulative ack) — requires coordinated > segment-to-consumer assignment from the controller leader. Each active > segment is owned by exactly one consumer at any time to preserve > per-segment ordering. This is the subject of the rest of this section. > - **CheckpointConsumer** (unmanaged, for connectors) — same as > StreamConsumer from a controller perspective: it registers with the > controller leader and receives an explicit segment assignment. > - **QueueConsumer** (unordered, individual ack) — **does not require > any controller-side assignment**. Because there is no ordering > requirement, every queue consumer attaches directly to **all** > segments of the scalable topic — both `ACTIVE` and `SEALED` — and each > segment-owning broker independently performs round-robin delivery > across the queue consumers attached to that segment. New segments > produced by a split are picked up transparently via the DAG watch > session push; sealed segments keep serving messages until their > backlog drains and are then dropped from the set. The controller > leader is not in the data path for queue consumers and no > `SubscriptionCoordinator` is involved for them. > > ### Registration Flow (StreamConsumer / CheckpointConsumer) > > 1. A `StreamConsumer` connects to the controller broker (discovered > via the DAG watch session's `controller_broker_url`). > 2. The consumer sends a registration request with its `subscription` > name and `consumerName`. > 3. The controller's `registerConsumer()` method routes to the > appropriate `SubscriptionCoordinator` (created on first use for that > subscription). > 4. The coordinator adds the consumer, recomputes assignments, and > returns the consumer's `ConsumerAssignment` — a list of `(segmentId, > hashRange, segmentTopicName)` tuples. > 5. The consumer connects to each assigned `segment://` topic and > begins consuming. > > ### Rebalancing > > Rebalancing occurs when: > > - A consumer is added or removed. > - A split or merge changes the set of active segments. > > The rebalancing algorithm is round-robin: > > 1. Collect all active segments, sorted by hash range start. > 2. Collect all consumers, sorted by name (for deterministic ordering). > 3. Assign segments to consumers in round-robin order. > 4. For each consumer whose assignment changed, push the new > `ConsumerAssignment`. > > ### Consumer Session Lifecycle > > A consumer's registration and its segment assignment are treated as a > **persistent session**, not as a transient association tied to a TCP > connection. This is a deliberate departure from the existing Pulsar > consumer model, where consumer liveness is asserted purely by the TCP > connection to the broker. > > **Rationale.** Scalable topic consumer assignments have non-trivial > cost: when a consumer disappears, its segments are redistributed among > the remaining consumers, each of which may need to reconnect to > different segment brokers and (for ordered consumers) drain in-flight > messages before the new assignment takes effect. Treating a brief > disconnection as a full consumer loss would cause unnecessary > rebalancing in common scenarios such as a consumer process restart or > a broker restart. > > **What is persisted vs. in-memory.** > > The **session itself is persisted** — the keep-alive state is not. > > - **Persisted in the metadata store (the session):** the > `ConsumerRegistration` — consumer name and its current segment > assignment — keyed by `consumerName` under the subscription path. Once > a consumer successfully registers, this entry is durable and outlives > TCP disconnects, client restarts, and controller leader failovers. The > assignment survives failover without forcing consumers to re-register. > - **In-memory on the controller leader (the keep-alive):** whether > each consumer is currently connected, and the grace-period timer that > runs while it is disconnected. Keep-alive signals are **not** written > to the metadata store; the leader observes the consumer's connection > state directly and tracks the timer in RAM. This avoids a metadata > store write on every liveness tick. > > **Session semantics (steady state).** > > - When a consumer registers, the `SubscriptionCoordinator` writes its > `ConsumerRegistration` and marks it connected in-memory. > - If the consumer's connection drops, the leader does **not** > immediately remove the consumer. It transitions the in-memory state to > "disconnected" and starts a configurable **session grace period** > timer for that consumer. > - If the same `consumerName` reconnects within the grace period, the > timer is cancelled and the consumer resumes with the same persisted > assignment — no rebalance, no other consumer disturbed. > - If the grace period expires, the leader deletes the > `ConsumerRegistration` from the metadata store and triggers a > rebalance for the remaining consumers. > > **Behavior on controller leader failover.** > > Because liveness timers are in-memory only, they are lost when the > leader broker crashes. The new leader: > > 1. Loads all `ConsumerRegistration` entries for each subscription from > the metadata store, restoring the prior segment assignment. > 2. Treats every consumer as "just disconnected" and starts a fresh > grace-period timer for each. No timestamps carry across from the > previous leader. > 3. Consumers reconnecting to the new leader within the fresh grace > period resume with the same assignment — seamless from their > perspective. > 4. Consumers that fail to reconnect before the fresh timer expires are > evicted and their segments are redistributed. > > This means a leader failover always gives clients a full grace period > to reconnect, regardless of how long they had already been > disconnected under the old leader. The trade-off is explicit: we keep > the metadata store writes proportional to real membership changes > (register / unregister / rebalance) rather than to liveness ticks. > > **Covered scenarios.** > > - **Consumer process restart.** A client process restarts and > reconnects with the same `consumerName` well within the grace period — > the same segments are reassigned, no redistribution. > - **Controller leader restart.** The leader is re-elected on a > different broker. Consumer entries and assignments are restored from > the metadata store; the new leader starts a fresh grace-period timer > for every consumer so reconnects are transparent. > - **Segment-owning broker restart.** Segment topics are reassigned by > the standard Pulsar load manager; the consumer's segment-to-consumer > assignment is unaffected and reconnects to the new owning broker are > transparent at the consumer-assignment level. > > The grace period and related tunables are configurable per broker and > will be specified in a follow-up sub-PIP. > > ### Layout Change Propagation > > When a split or merge completes and the metadata is updated: > > 1. The controller reloads the metadata and creates a new `SegmentLayout`. > 2. `notifySubscriptions()` is called, which iterates all > `SubscriptionCoordinator` instances. > 3. Each coordinator calls `onLayoutChange(newLayout)`, which > recomputes assignments against the new active segments and pushes > updates to affected consumers. > > --- > > ## Leader Election and Failover > > ### Election > > Each `ScalableTopicController` participates in leader election via the > metadata store's `LeaderElection` API. The election path is: > > ``` > /topics/{tenant}/{namespace}/{encodedTopicName}/controller > ``` > > The elected leader writes its broker service URL as the election > value. Clients discover the controller by reading this value > (delivered as part of the DAG watch session response). > > ### Failover > > When the leader broker fails: > > 1. The metadata store detects the session loss and clears the leader lock. > 2. Other brokers with active controllers for the same topic receive a > `NoLeader` state change notification. > 3. The `ScalableTopicService.onLeaderStateChange()` handler re-invokes > `controller.initialize()`, which reloads metadata and re-attempts > election. > 4. The new leader restores in-memory state from the persisted > metadata, including all subscriptions and their consumer registrations > with current assignments. Consumers reconnecting within the session > grace period find their sessions intact and resume with the same > segment assignment without a rebalance (see [Consumer Session > Lifecycle](#consumer-session-lifecycle)). > > ### Non-leader brokers > > A broker that loses leadership or never wins it retains the controller > instance but all mutating operations (`splitSegment`, `mergeSegments`, > `registerConsumer`) throw `IllegalStateException` after a > `checkLeader()` guard. The controller remains available for read > operations like `getLayout()`. > > --- > > ## Scalable Topic Lifecycle > > ### Creation > > ``` > Admin API (PUT /admin/v2/scalable/{tenant}/{namespace}/{topic}) > └─> ScalableTopicService.createScalableTopic(topic, numInitialSegments) > ├─> ScalableTopicController.createInitialMetadata(numInitialSegments) > │ └─> Divides [0x0000, 0xFFFF] into N equal ranges > │ Creates N SegmentInfo records (ACTIVE, no parents) > │ Returns ScalableTopicMetadata with epoch=0 > ├─> ScalableTopicResources.createScalableTopicAsync(topic, metadata) > │ └─> Writes metadata to store at /topics/{t}/{ns}/{topic} > └─> For each segment: createSegmentAsync via Segments admin API > └─> Creates segment:// topic on owning broker > ``` > > ### Deletion > > ``` > Admin API (DELETE /admin/v2/scalable/{tenant}/{namespace}/{topic}) > └─> ScalableTopicService.deleteScalableTopic(topic) > ├─> releaseController(topic) > │ └─> Closes leader election, removes from controllers map > ├─> Load metadata from store > ├─> For each segment: deleteSegmentAsync via Segments admin API > │ └─> Deletes segment:// topic on owning broker (best-effort) > └─> ScalableTopicResources.deleteScalableTopicAsync(topic) > └─> Removes metadata from store > ``` > > --- > > ## Public-Facing Changes > > ### Binary Protocol > > Three new protocol commands (added to `PulsarApi.proto`): > > | Command | Direction | Field ID | Description | > |---------|-----------|----------|-------------| > | `CommandScalableTopicLookup` | Client → Broker | 70 | Initiates a > DAG watch session | > | `CommandScalableTopicUpdate` | Broker → Client | 71 | Initial > response and subsequent pushed updates | > | `CommandScalableTopicClose` | Client → Broker | 72 | Closes the DAG > watch session | > > New protocol messages: > > - `ScalableTopicDAG` — the full segment DAG with broker addresses and > controller URL. > - `SegmentInfoProto` — per-segment record in the DAG. > - `SegmentBrokerAddress` — maps a segment ID to its owning broker. > - `SegmentState` enum — `ACTIVE` or `SEALED`. > > ### Admin API > > **Scalable Topics** (`/admin/v2/scalable`): > > | Method | Path | Description | > |--------|------|-------------| > | `GET` | `/{tenant}/{namespace}` | List scalable topics | > | `PUT` | `/{tenant}/{namespace}/{topic}` | Create scalable topic | > | `GET` | `/{tenant}/{namespace}/{topic}` | Get topic metadata | > | `DELETE` | `/{tenant}/{namespace}/{topic}` | Delete scalable topic | > | `GET` | `/{tenant}/{namespace}/{topic}/stats` | Get aggregated stats > for the scalable topic (segment counts, per-segment rates, > per-subscription state, consumer counts) | > | `PUT` | `/{tenant}/{namespace}/{topic}/subscriptions/{subscription}` > | Create a subscription on the scalable topic. The controller > propagates it to all active segments by issuing `createSubscription` > on each `segment://` topic via the Segments admin API. | > | `DELETE` | `/{tenant}/{namespace}/{topic}/subscriptions/{subscription}` > | Delete a subscription. The controller unregisters all consumers and > deletes the subscription from every segment. | > | `POST` | `/{tenant}/{namespace}/{topic}/split/{segmentId}` | Split a > segment | > | `POST` | `/{tenant}/{namespace}/{topic}/merge/{segmentId1}/{segmentId2}` > | Merge two segments | > > **Segments** (`/admin/v2/segments`): > > | Method | Path | Description | > |--------|------|-------------| > | `PUT` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Create segment topic > | > | `POST` | `/{tenant}/{namespace}/{topic}/{descriptor}/terminate` | > Terminate segment | > | `DELETE` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Delete > segment topic | > > ### Admin Client API > > New `ScalableTopics` interface on `PulsarAdmin`: > > - `listScalableTopics(namespace)` / `listScalableTopicsAsync(namespace)` > - `createScalableTopic(topic, numSegments)` / `createScalableTopicAsync(...)` > - `getMetadata(topic)` / `getMetadataAsync(topic)` > - `getStats(topic)` / `getStatsAsync(topic)` > - `deleteScalableTopic(topic)` / `deleteScalableTopicAsync(topic)` > - `createSubscription(topic, subscription)` / `createSubscriptionAsync(...)` > - `deleteSubscription(topic, subscription)` / `deleteSubscriptionAsync(...)` > - `splitSegment(topic, segmentId)` / `splitSegmentAsync(...)` > - `mergeSegments(topic, segmentId1, segmentId2)` / `mergeSegmentsAsync(...)` > - `createSegment(segmentTopic, subscriptions)` / `createSegmentAsync(...)` > - `terminateSegment(segmentTopic)` / `terminateSegmentAsync(...)` > - `deleteSegment(segmentTopic, force)` / `deleteSegmentAsync(...)` > > ### Configuration > > | Property | Default | Description | > |----------|---------|-------------| > | `scalableTopicEnabled` | `true` | Enable scalable topic support on > the broker | > > Additional configuration for auto-split thresholds and consumer > controller session grace period will be specified in follow-up > sub-PIPs. > > ### Metadata Store Paths > > | Path | Content | > |------|---------| > | `/topics/{tenant}/{namespace}/{topic}` | `ScalableTopicMetadata` > JSON — segment DAG and global topic state | > | `/topics/{tenant}/{namespace}/{topic}/controller` | Leader broker > URL (ephemeral) | > | `/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}` > | `SubscriptionMetadata` — subscription-level config and the persisted > set of consumer registrations | > | > `/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/consumers/{consumerName}` > | `ConsumerRegistration` — the durable session: consumer name and its > current segment assignment. Keep-alive state (connected/disconnected, > grace-period timer) is in-memory on the controller leader only and is > **not** persisted here. | > > --- > > ## Backward & Forward Compatibility > > ### Upgrade > > The controller is a new component with no interaction with existing > partitioned or non-partitioned topics. Enabling `scalableTopicEnabled` > activates the new code paths. Existing topics are unaffected. > > ### Downgrade / Rollback > > Scalable topic metadata and segment data remain in the metadata store > and BookKeeper. Rolling back to a version without scalable topic > support leaves this data intact but inaccessible. Upgrading again > restores access. > > ### Client Compatibility > > The existing Pulsar client SDK rejects `topic://` and `segment://` > domains with a clear error message directing users to the V5 client > SDK. This check is enforced in `PulsarClientImpl` for producer, > consumer, and reader creation paths. > > --- > > ## Security Considerations > > The controller follows the same tenant/namespace/topic authorization > model as existing Pulsar topics: > > - Admin API operations require topic-level admin permissions. > - DAG watch sessions require topic-level lookup permissions. > - Consumer registration requires topic-level consume permissions. > - Segment operations are internal and authenticated via the broker's > internal admin client. > > The controller leader lock in the metadata store is accessible only to > authenticated brokers. > > --- > > ## Links > > - Parent PIP: [PIP-460: Scalable Topics](pip-460.md) > - V5 Client API: [PIP-466: New Java Client API (V5)](pip-466.md) > > -- > Matteo Merli > <[email protected]> > >
