+1 nonbinding Lari Hotari <[email protected]> 于2026年4月21日周二 00:44写道:
> +1 (binding) > > -Lari > > On 2026/04/17 04:03:36 Matteo Merli wrote: > > Matteo Merli <[email protected]> > > > > Mon, Apr 13, 11:11 PM (3 days ago) > > to Dev > > https://github.com/apache/pulsar/pull/25516 > > > > > > ----- > > > > # PIP-468: Scalable Topic Controller > > > > *Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)* > > > > ## Motivation > > > > [PIP-460](pip-460.md) introduces scalable topics as a new topic type > > in Pulsar, built on a DAG of range segments that can be split and > > merged to scale independently of the initial configuration. PIP-460 > > defines the overall vision but defers the detailed design of each > > component to dedicated sub-PIPs. > > > > This PIP specifies the **Scalable Topic Controller** — the broker-side > > component responsible for: > > > > 1. **Segment lifecycle management** — creating, terminating, and > > deleting the underlying segment topics that back a scalable topic. > > 2. **Segment layout coordination** — executing split and merge > > operations with correct ordering guarantees so that no messages are > > lost and all subscription cursors exist before producers are > > redirected. > > 3. **Consumer assignment** — coordinating which consumers receive > > messages from which segments for a given subscription. > > 4. **Leader election** — ensuring exactly one broker acts as the > > controller for each scalable topic, with automatic failover. > > 5. **DAG watch sessions** — pushing topology updates to connected > > clients (producers and consumers) when the segment layout changes. > > > > Without a well-defined controller, split and merge operations would be > > unsafe: producers could write to segments that have no subscription > > cursors, consumers could miss messages during topology changes, and > > concurrent layout modifications could corrupt the segment DAG. > > > > --- > > > > ## Design > > > > ### Architecture Overview > > > > The controller subsystem consists of four layers: > > > > ``` > > ┌──────────────────────────────────────────────────────────┐ > > │ Admin API Layer │ > > │ ScalableTopics REST + Segments REST │ > > └──────────────┬───────────────────────┬───────────────────┘ > > │ │ > > ┌──────────────▼───────────────────────▼───────────────────┐ > > │ ScalableTopicService │ > > │ Per-broker singleton; manages controllers │ > > └──────────────────────────┬───────────────────────────────┘ > > │ > > ┌──────────────────────────▼───────────────────────────────┐ > > │ ScalableTopicController (per topic) │ > > │ Leader-elected; owns layout, split/merge, consumers │ > > │ │ > > │ ┌─────────────────┐ ┌───────────────────────────────┐ │ > > │ │ SegmentLayout │ │ SubscriptionCoordinator (×N) │ │ > > │ │ (immutable DAG) │ │ per-subscription assignments │ │ > > │ └─────────────────┘ └───────────────────────────────┘ │ > > └──────────────────────────┬───────────────────────────────┘ > > │ > > ┌──────────────────────────▼───────────────────────────────┐ > > │ ScalableTopicResources │ > > │ Metadata store access (read/write/watch) │ > > └──────────────────────────────────────────────────────────┘ > > ``` > > > > ### Broker Roles and Request Routing > > > > A scalable topic involves three distinct broker roles. Understanding > > which broker handles which operation is critical to the design. > > > > #### Any broker — DAG watch sessions > > > > A **DAG watch session** (scalable topic lookup) can be served by **any > > broker** in the cluster. The session reads segment metadata from the > > shared metadata store and registers a watch for changes. No state is > > held on the broker beyond the watch registration. This means producers > > and consumers can connect to any broker for topic discovery — the same > > way regular topic lookups work today. > > > > When the metadata changes (due to a split or merge performed by the > > controller leader), the metadata store notification fires on whichever > > broker is serving the watch session, and that broker pushes the > > updated DAG to the client. > > > > #### Controller leader — layout mutations and consumer assignment > > > > Each scalable topic has exactly one **controller leader** across the > > cluster, elected via the metadata store. The controller leader is the > > only broker that can: > > > > - Execute **split** and **merge** operations (the multi-step protocols > > described below). > > - Accept **consumer registrations** and compute segment-to-consumer > assignments. > > - **Notify consumers** of assignment changes after topology updates. > > > > Clients discover the controller leader's broker URL through the DAG > > watch session response (`controller_broker_url` field). StreamConsumer > > and CheckpointConsumer clients then connect directly to the controller > > leader to register and receive assignments. > > > > Admin API requests for split and merge are routed to the > > `ScalableTopicService` on any broker, which obtains (or creates) a > > local `ScalableTopicController` that participates in leader election. > > If the local controller is the leader, it executes the operation > > directly. If not, the request must be redirected to the leader (or the > > controller must first win the election). The split and merge admin > > endpoints do not require the request to land on the leader broker — > > the `ScalableTopicService` handles this transparently. > > > > #### Segment-owning brokers — produce and consume > > > > Individual **segment topics** (`segment://`) are regular persistent > > topics from the broker's perspective. They are distributed across the > > cluster by the standard Pulsar **load manager** and namespace bundle > > assignment — the same mechanism used for `persistent://` topics. Each > > segment topic is owned by exactly one broker at any time, determined > > by its namespace bundle hash. > > > > The controller leader does not need to own any of the segment topics. > > When the controller needs to operate on a segment (create, terminate, > > delete, discover subscriptions), it uses the **Segments admin API**, > > which routes the request to whichever broker currently owns that > > segment's namespace bundle. This decoupling means the controller can > > coordinate segments spread across many brokers without requiring > > co-location. > > > > ``` > > ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ > > │ Broker A │ │ Broker B │ │ Broker C │ > > │ │ │ (controller │ │ │ > > │ segment-0 │ │ leader) │ │ segment-2 │ > > │ segment-1 │ │ │ │ │ > > │ │ │ DAG watch │ │ DAG watch │ > > │ DAG watch │ │ sessions │ │ sessions │ > > │ sessions │ │ │ │ │ > > └─────────────┘ └─────────────┘ └─────────────┘ > > │ > > Split/merge ops > > use admin API to > > reach segments on > > Broker A and C > > ``` > > > > In this example, Broker B is the controller leader but owns no > > segments. When it executes a split of segment-0, it calls the Segments > > admin API which routes to Broker A (the owner of segment-0). > > > > ### Components > > > > #### ScalableTopicService > > > > A per-broker singleton, created and owned by `BrokerService`. It > > manages the lifecycle of all `ScalableTopicController` instances on > > the broker. > > > > **Responsibilities:** > > > > - **Controller management**: maintains a map of active controllers > > keyed by topic name. Creates controllers on demand via > > `getOrCreateController()` and releases them on topic unload or broker > > shutdown. > > - **Topic lifecycle**: handles `createScalableTopic()` and > > `deleteScalableTopic()` admin operations, including creating/deleting > > the underlying segment topics via the Segments admin API. > > - **Delegation**: routes `splitSegment()` and `mergeSegments()` > > requests to the appropriate controller. > > - **Leader state monitoring**: listens for leader election state > > changes and re-attempts election when the current leader is lost. > > > > #### ScalableTopicController > > > > A per-topic coordinator. Only one instance across the cluster is the > > **leader** for a given scalable topic, ensured by leader election > > through the metadata store. The leader is the only instance that > > modifies the segment layout or assigns consumers. > > > > **State:** > > > > | Field | Type | Description | > > |-------|------|-------------| > > | `topicName` | `TopicName` | The `topic://` name of the scalable topic | > > | `currentLayout` | `SegmentLayout` | The current immutable snapshot > > of the segment DAG | > > | `subscriptions` | `Map<String, SubscriptionCoordinator>` | > > Per-subscription consumer coordinators | > > | `leaderState` | `LeaderElectionState` | Current leader election state | > > | `leaderElection` | `LeaderElection<String>` | Metadata store leader > > election handle; value is the broker URL | > > > > **Key operations:** > > > > - `initialize()` — loads the current metadata from the store, then > > attempts leader election. The elected leader stores its broker service > > URL so that clients can discover and connect to it. > > - `splitSegment(segmentId)` — splits an active segment at its midpoint > > (see [Split Protocol](#split-protocol)). > > - `mergeSegments(segmentId1, segmentId2)` — merges two adjacent active > > segments (see [Merge Protocol](#merge-protocol)). > > - `registerConsumer(subscription, consumer)` — registers a consumer > > and returns its initial segment assignment. > > - `unregisterConsumer(subscription, consumer)` — removes a consumer > > and triggers rebalancing. > > - `getLeaderBrokerUrl()` — returns the current leader's broker URL, > > used by clients to connect to the controller. > > > > #### SegmentLayout > > > > An immutable, versioned snapshot of the segment DAG. All mutation > > methods (`splitSegment`, `mergeSegments`, `pruneSegment`) return a > > **new** `SegmentLayout` instance — the original is never modified. > > This ensures safe concurrent reads without locking. > > > > **Fields:** > > > > | Field | Type | Description | > > |-------|------|-------------| > > | `epoch` | `long` | Monotonically increasing version; incremented on > > every layout change | > > | `nextSegmentId` | `long` | Counter for assigning IDs to new segments | > > | `allSegments` | `Map<Long, SegmentInfo>` | Complete DAG: all > > segments (active + sealed) | > > | `activeSegments` | `Map<Long, SegmentInfo>` | Filtered view: only > > `ACTIVE` segments | > > > > **Key operations:** > > > > - `findActiveSegment(hash)` — returns the active segment whose hash > > range contains the given hash value. Used by producers for message > > routing. > > - `splitSegment(segmentId)` — validates the segment is active, > > computes the midpoint of its hash range, creates two child > > `SegmentInfo` records covering `[start, mid]` and `[mid+1, end]`, > > seals the parent, and returns a new layout with incremented epoch. > > - `mergeSegments(id1, id2)` — validates both segments are active and > > adjacent (the end of one equals `start - 1` of the other), creates a > > merged `SegmentInfo` covering the combined range, seals both parents, > > and returns a new layout. > > - `getLineage(segmentId)` — returns the full ancestor + descendant > > chain for a segment, used for DAG traversal during catch-up reads. > > - `toMetadata()` / `fromMetadata()` — converts to/from the persisted > > `ScalableTopicMetadata` format. > > > > #### SegmentInfo > > > > A record representing a single segment in the DAG: > > > > | Field | Type | Description | > > |-------|------|-------------| > > | `segmentId` | `long` | Unique, monotonically increasing identifier | > > | `hashRange` | `HashRange` | The `[start, end]` inclusive hash range > > this segment covers | > > | `state` | `SegmentState` | `ACTIVE` or `SEALED` | > > | `parentIds` | `List<Long>` | Parent segments (empty for root segments) > | > > | `childIds` | `List<Long>` | Child segments (empty for leaf/active > segments) | > > | `createdAtEpoch` | `long` | Layout epoch in which this segment was > > created (`0` for the initial segments) | > > | `sealedAtEpoch` | `long` | Layout epoch in which this segment was > > sealed; only meaningful when `state == SEALED`. `0` when the segment > > is `ACTIVE`; this sentinel is unambiguous because sealing always > > happens as part of a split or merge, which increments the epoch — so a > > segment can never be sealed at epoch `0`. | > > > > Factory methods: `SegmentInfo.active(id, range, epoch)` and > > `SegmentInfo.sealed(...)`. > > > > #### SubscriptionCoordinator > > > > Manages segment-to-consumer assignments for a single subscription > > within a scalable topic. > > > > **Assignment strategy:** round-robin. Active segments are sorted by > > hash range start; consumers are sorted by name. Segments are > > distributed evenly across consumers. When a consumer is added or > > removed, or when the layout changes (split/merge), the coordinator > > recomputes assignments and pushes updates to affected consumers. > > > > **Key operations:** > > > > - `addConsumer(consumer)` — adds a consumer, rebalances, and returns > > the full assignment map. > > - `removeConsumer(consumer)` — removes a consumer and redistributes > > its segments. > > - `onLayoutChange(newLayout)` — called after a split or merge; > > recomputes all assignments against the new set of active segments. > > > > #### ConsumerRegistration and ConsumerAssignment > > > > `ConsumerRegistration` is a record identifying a connected consumer: > > > > | Field | Type | Description | > > |-------|------|-------------| > > | `consumerId` | `long` | Protocol-level consumer ID | > > | `consumerName` | `String` | Stable name chosen by the client | > > | `cnx` | `TransportCnx` | The connection to push assignment updates | > > > > `ConsumerAssignment` is the assignment sent to a consumer: > > > > | Field | Type | Description | > > |-------|------|-------------| > > | `layoutEpoch` | `long` | The layout epoch this assignment is based on | > > | `assignedSegments` | `List<AssignedSegment>` | The segments assigned > > to this consumer | > > > > Each `AssignedSegment` contains: `segmentId`, `hashRange`, and > > `underlyingTopicName` (the `segment://` topic name the consumer should > > connect to). > > > > ### Metadata Store Schema > > > > Scalable topic metadata is stored in the metadata store under a > > well-known path structure: > > > > ``` > > /topics/{tenant}/{namespace}/{encodedTopicName} > > ``` > > > > The value at this path is a JSON-serialized `ScalableTopicMetadata`: > > > > ```json > > { > > "epoch": 3, > > "nextSegmentId": 5, > > "segments": { > > "0": { "segmentId": 0, "hashRange": {"start": 0, "end": 65535}, > > "state": "SEALED", "parentIds": [], "childIds": [1, 2], > > "createdAtEpoch": 0, "sealedAtEpoch": 1 }, > > "1": { "segmentId": 1, "hashRange": {"start": 0, "end": 32767}, > > "state": "SEALED", "parentIds": [0], "childIds": [3, 4], > > "createdAtEpoch": 1, "sealedAtEpoch": 3 }, > > "2": { "segmentId": 2, "hashRange": {"start": 32768, "end": > > 65535}, "state": "ACTIVE", "parentIds": [0], "childIds": [], > > "createdAtEpoch": 1, "sealedAtEpoch": 0 }, > > "3": { "segmentId": 3, "hashRange": {"start": 0, "end": 16383}, > > "state": "ACTIVE", "parentIds": [1], "childIds": [], "createdAtEpoch": > > 3, "sealedAtEpoch": 0 }, > > "4": { "segmentId": 4, "hashRange": {"start": 16384, "end": > > 32767}, "state": "ACTIVE", "parentIds": [1], "childIds": [], > > "createdAtEpoch": 3, "sealedAtEpoch": 0 } > > }, > > "properties": {} > > } > > ``` > > > > The controller leader lock is stored at: > > > > ``` > > /topics/{tenant}/{namespace}/{encodedTopicName}/controller > > ``` > > > > The value is the broker service URL of the elected leader. > > > > ### DAG Watch Sessions > > > > As described in [Broker Roles](#broker-roles-and-request-routing), any > > broker can serve a DAG watch session because the segment metadata > > lives in the shared metadata store — no controller leader involvement > > is required. The session is established via the binary protocol: > > > > 1. **Client sends `CommandScalableTopicLookup`** with a > > client-assigned `sessionId` and the `topic://` topic name. > > 2. **Broker creates a `DagWatchSession`** that: > > - Loads the current `ScalableTopicMetadata` from the store. > > - Resolves which broker owns each active segment's `segment://` > > topic (via topic lookup). > > - Reads the controller broker URL from the leader lock path. > > - Registers a metadata store notification listener for changes to > > the topic's metadata path. > > 3. **Broker sends `CommandScalableTopicUpdate`** with the initial > > `ScalableTopicDAG` containing the full segment DAG, broker addresses, > > and controller URL. > > 4. **On metadata change**, the notification listener fires. The broker > > reloads metadata, re-resolves broker addresses, and pushes an updated > > `CommandScalableTopicUpdate` to the client. > > 5. **Client sends `CommandScalableTopicClose`** to tear down the session. > > > > The `ScalableTopicDAG` protocol message contains: > > > > | Field | Type | Description | > > |-------|------|-------------| > > | `epoch` | `uint64` | Layout epoch | > > | `segments` | `repeated SegmentInfoProto` | Full segment DAG | > > | `segment_brokers` | `repeated SegmentBrokerAddress` | Broker URL for > > each active segment | > > | `controller_broker_url` | `string` | Controller leader's broker URL | > > | `controller_broker_url_tls` | `string` | TLS variant | > > > > --- > > > > ## Split Protocol > > > > Splitting a segment is the core scaling-out operation. The protocol is > > carefully ordered to guarantee that **no messages are lost** and **all > > subscription cursors exist before producers are redirected**. > > > > ### Step-by-step > > > > ``` > > Controller Metadata Store Broker(s) > > │ │ │ > > 1. Discover │──── getSubscriptions ───────>│ │ > > subscriptions│<─── [sub-A, sub-B] ─────────│ │ > > │ │ │ > > 2. Create │──── createSegment(child1, ──>│ ─── route ────────> │ > > children │ [sub-A, sub-B]) │ │ > > create topic > > │──── createSegment(child2, ──>│ ─── route ────────> │ + > cursors > > │ [sub-A, sub-B]) │ │ > > │ │ │ > > 3. Terminate │──── terminateSegment ───────>│ ─── route ────────> │ > > parent │ (parent) │ │ > seal ML > > │ │ │ > > 4. Update │──── CAS update ─────────────>│ │ > > metadata │ (atomic) │ │ > > │ │ │ > > 5. Notify │──── push to consumers ──────────────────────────> │ > > consumers │ (rebalance) │ │ > > ``` > > > > **Step 1: Discover subscriptions.** The controller queries the parent > > segment topic for its list of subscriptions. It first checks locally > > (if the segment is on this broker), falling back to the admin API for > > remote segments. > > > > **Step 2: Create child segment topics.** Two new `segment://` topics > > are created via the Segments admin API. Each creation request includes > > the list of subscriptions discovered in step 1. The Segments API > > handler creates the persistent topic and initializes subscription > > cursors at the `Earliest` position. The admin API routes to whichever > > broker owns the namespace bundle for each segment, so child segments > > may be created on different brokers. > > > > **Step 3: Terminate parent segment.** The parent segment topic is > > terminated via the Segments admin API, which writes a termination > > marker to the managed ledger. After termination, producers writing to > > the parent receive `TopicTerminated` and must re-route to child > > segments. > > > > **Step 4: Atomic metadata update.** The controller issues a > > compare-and-swap (CAS) update to the metadata store. The update > > transitions the parent to `SEALED` state with child pointers, and adds > > the two new `ACTIVE` child segments. The CAS guarantees atomicity — if > > another operation modified the metadata concurrently, the update fails > > and must be retried. > > > > **Step 5: Notify consumers.** All `SubscriptionCoordinator` instances > > are notified of the layout change. They recompute segment-to-consumer > > assignments and push updates to connected consumers. > > > > ### Why this ordering matters > > > > - Steps 1-2 **before** step 4: if we updated metadata first, clients > > would discover the new segments immediately. Producers would start > > writing to child segments that have no subscription cursors yet, > > causing those messages to be missed by consumers. > > - Step 3 **before** step 4: terminating the parent before the metadata > > update ensures that by the time clients see the new layout, the parent > > is already sealed. There is no window where both parent and children > > are writable. > > - Step 2 creates cursors at `Earliest`: this is safe because the child > > topics are brand new (empty). The cursor starts at the beginning, and > > the first message written by a redirected producer will be the first > > message consumed. > > > > --- > > > > ## Merge Protocol > > > > Merging two adjacent segments is the core scaling-in operation. The > > protocol follows the same ordering principle: **create first, update > > metadata last**. > > > > ### Step-by-step > > > > **Step 1: Discover subscriptions.** The controller queries both parent > > segments for their subscriptions and takes the union, so no > > subscription is lost. > > > > **Step 2: Create merged segment topic.** A single new `segment://` > > topic is created with all discovered subscriptions. > > > > **Step 3: Terminate both parent segments.** Both parents are > > terminated via the admin API. This is done sequentially to avoid a > > race where producers of one parent are still writing while the other > > is already sealed. > > > > **Step 4: Atomic metadata update.** The CAS update seals both parents > > with child pointers to the merged segment, and adds the new `ACTIVE` > > merged segment. > > > > **Step 5: Notify consumers.** Same as split — rebalance and push. > > > > ### Cross-broker coordination > > > > Unlike splits, merges inherently involve multiple brokers because the > > two parent segments may be owned by different brokers (assigned by the > > load manager). The controller leader handles this transparently: all > > segment operations (create, terminate, delete) go through the Segments > > admin API, which routes each request to the broker that currently owns > > the target segment's namespace bundle. The controller does not need to > > know or care which broker owns which segment — the admin API routing > > handles it. > > > > --- > > > > ## Segment Topic Management via Admin API > > > > Segment topics are managed exclusively through a dedicated REST API at > > `/admin/v2/segments`. This is a deliberate design choice: segment > > topics use the `segment://` domain and must not be confused with > > regular `persistent://` topics. > > > > ### Endpoints > > > > | Method | Path | Description | > > |--------|------|-------------| > > | `PUT` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Create a > > segment topic. Optional request body with subscription names to > > pre-create. | > > | `POST` | `/{tenant}/{namespace}/{topic}/{descriptor}/terminate` | > > Terminate (seal) a segment topic. | > > | `DELETE` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Delete a > > segment topic. | > > > > The `{descriptor}` is the segment's hash range and ID in the format > > `{hexStart}-{hexEnd}-{segmentId}` (e.g., `0000-7fff-1`). > > > > ### Create Segment > > > > When creating a segment topic, the handler: > > > > 1. Constructs the full `segment://` topic name from the path components. > > 2. Validates namespace bundle ownership (the request is routed to the > > owning broker). > > 3. Creates the persistent topic via `BrokerService.getOrCreateTopic()`. > > 4. For each subscription in the request body, creates a cursor at the > > `Earliest` position. > > > > ### Terminate Segment > > > > Terminates the managed ledger backing the segment topic. After > > termination, the topic accepts no further writes, and producers > > receive `TopicTerminated`. > > > > ### Delete Segment > > > > Deletes the segment topic and its managed ledger. Used during scalable > > topic deletion to clean up all underlying storage. > > > > --- > > > > ## Consumer Assignment > > > > Consumer assignment behavior depends on the consumer type: > > > > - **StreamConsumer** (ordered, cumulative ack) — requires coordinated > > segment-to-consumer assignment from the controller leader. Each active > > segment is owned by exactly one consumer at any time to preserve > > per-segment ordering. This is the subject of the rest of this section. > > - **CheckpointConsumer** (unmanaged, for connectors) — same as > > StreamConsumer from a controller perspective: it registers with the > > controller leader and receives an explicit segment assignment. > > - **QueueConsumer** (unordered, individual ack) — **does not require > > any controller-side assignment**. Because there is no ordering > > requirement, every queue consumer attaches directly to **all** > > segments of the scalable topic — both `ACTIVE` and `SEALED` — and each > > segment-owning broker independently performs round-robin delivery > > across the queue consumers attached to that segment. New segments > > produced by a split are picked up transparently via the DAG watch > > session push; sealed segments keep serving messages until their > > backlog drains and are then dropped from the set. The controller > > leader is not in the data path for queue consumers and no > > `SubscriptionCoordinator` is involved for them. > > > > ### Registration Flow (StreamConsumer / CheckpointConsumer) > > > > 1. A `StreamConsumer` connects to the controller broker (discovered > > via the DAG watch session's `controller_broker_url`). > > 2. The consumer sends a registration request with its `subscription` > > name and `consumerName`. > > 3. The controller's `registerConsumer()` method routes to the > > appropriate `SubscriptionCoordinator` (created on first use for that > > subscription). > > 4. The coordinator adds the consumer, recomputes assignments, and > > returns the consumer's `ConsumerAssignment` — a list of `(segmentId, > > hashRange, segmentTopicName)` tuples. > > 5. The consumer connects to each assigned `segment://` topic and > > begins consuming. > > > > ### Rebalancing > > > > Rebalancing occurs when: > > > > - A consumer is added or removed. > > - A split or merge changes the set of active segments. > > > > The rebalancing algorithm is round-robin: > > > > 1. Collect all active segments, sorted by hash range start. > > 2. Collect all consumers, sorted by name (for deterministic ordering). > > 3. Assign segments to consumers in round-robin order. > > 4. For each consumer whose assignment changed, push the new > > `ConsumerAssignment`. > > > > ### Consumer Session Lifecycle > > > > A consumer's registration and its segment assignment are treated as a > > **persistent session**, not as a transient association tied to a TCP > > connection. This is a deliberate departure from the existing Pulsar > > consumer model, where consumer liveness is asserted purely by the TCP > > connection to the broker. > > > > **Rationale.** Scalable topic consumer assignments have non-trivial > > cost: when a consumer disappears, its segments are redistributed among > > the remaining consumers, each of which may need to reconnect to > > different segment brokers and (for ordered consumers) drain in-flight > > messages before the new assignment takes effect. Treating a brief > > disconnection as a full consumer loss would cause unnecessary > > rebalancing in common scenarios such as a consumer process restart or > > a broker restart. > > > > **What is persisted vs. in-memory.** > > > > The **session itself is persisted** — the keep-alive state is not. > > > > - **Persisted in the metadata store (the session):** the > > `ConsumerRegistration` — consumer name and its current segment > > assignment — keyed by `consumerName` under the subscription path. Once > > a consumer successfully registers, this entry is durable and outlives > > TCP disconnects, client restarts, and controller leader failovers. The > > assignment survives failover without forcing consumers to re-register. > > - **In-memory on the controller leader (the keep-alive):** whether > > each consumer is currently connected, and the grace-period timer that > > runs while it is disconnected. Keep-alive signals are **not** written > > to the metadata store; the leader observes the consumer's connection > > state directly and tracks the timer in RAM. This avoids a metadata > > store write on every liveness tick. > > > > **Session semantics (steady state).** > > > > - When a consumer registers, the `SubscriptionCoordinator` writes its > > `ConsumerRegistration` and marks it connected in-memory. > > - If the consumer's connection drops, the leader does **not** > > immediately remove the consumer. It transitions the in-memory state to > > "disconnected" and starts a configurable **session grace period** > > timer for that consumer. > > - If the same `consumerName` reconnects within the grace period, the > > timer is cancelled and the consumer resumes with the same persisted > > assignment — no rebalance, no other consumer disturbed. > > - If the grace period expires, the leader deletes the > > `ConsumerRegistration` from the metadata store and triggers a > > rebalance for the remaining consumers. > > > > **Behavior on controller leader failover.** > > > > Because liveness timers are in-memory only, they are lost when the > > leader broker crashes. The new leader: > > > > 1. Loads all `ConsumerRegistration` entries for each subscription from > > the metadata store, restoring the prior segment assignment. > > 2. Treats every consumer as "just disconnected" and starts a fresh > > grace-period timer for each. No timestamps carry across from the > > previous leader. > > 3. Consumers reconnecting to the new leader within the fresh grace > > period resume with the same assignment — seamless from their > > perspective. > > 4. Consumers that fail to reconnect before the fresh timer expires are > > evicted and their segments are redistributed. > > > > This means a leader failover always gives clients a full grace period > > to reconnect, regardless of how long they had already been > > disconnected under the old leader. The trade-off is explicit: we keep > > the metadata store writes proportional to real membership changes > > (register / unregister / rebalance) rather than to liveness ticks. > > > > **Covered scenarios.** > > > > - **Consumer process restart.** A client process restarts and > > reconnects with the same `consumerName` well within the grace period — > > the same segments are reassigned, no redistribution. > > - **Controller leader restart.** The leader is re-elected on a > > different broker. Consumer entries and assignments are restored from > > the metadata store; the new leader starts a fresh grace-period timer > > for every consumer so reconnects are transparent. > > - **Segment-owning broker restart.** Segment topics are reassigned by > > the standard Pulsar load manager; the consumer's segment-to-consumer > > assignment is unaffected and reconnects to the new owning broker are > > transparent at the consumer-assignment level. > > > > The grace period and related tunables are configurable per broker and > > will be specified in a follow-up sub-PIP. > > > > ### Layout Change Propagation > > > > When a split or merge completes and the metadata is updated: > > > > 1. The controller reloads the metadata and creates a new `SegmentLayout`. > > 2. `notifySubscriptions()` is called, which iterates all > > `SubscriptionCoordinator` instances. > > 3. Each coordinator calls `onLayoutChange(newLayout)`, which > > recomputes assignments against the new active segments and pushes > > updates to affected consumers. > > > > --- > > > > ## Leader Election and Failover > > > > ### Election > > > > Each `ScalableTopicController` participates in leader election via the > > metadata store's `LeaderElection` API. The election path is: > > > > ``` > > /topics/{tenant}/{namespace}/{encodedTopicName}/controller > > ``` > > > > The elected leader writes its broker service URL as the election > > value. Clients discover the controller by reading this value > > (delivered as part of the DAG watch session response). > > > > ### Failover > > > > When the leader broker fails: > > > > 1. The metadata store detects the session loss and clears the leader > lock. > > 2. Other brokers with active controllers for the same topic receive a > > `NoLeader` state change notification. > > 3. The `ScalableTopicService.onLeaderStateChange()` handler re-invokes > > `controller.initialize()`, which reloads metadata and re-attempts > > election. > > 4. The new leader restores in-memory state from the persisted > > metadata, including all subscriptions and their consumer registrations > > with current assignments. Consumers reconnecting within the session > > grace period find their sessions intact and resume with the same > > segment assignment without a rebalance (see [Consumer Session > > Lifecycle](#consumer-session-lifecycle)). > > > > ### Non-leader brokers > > > > A broker that loses leadership or never wins it retains the controller > > instance but all mutating operations (`splitSegment`, `mergeSegments`, > > `registerConsumer`) throw `IllegalStateException` after a > > `checkLeader()` guard. The controller remains available for read > > operations like `getLayout()`. > > > > --- > > > > ## Scalable Topic Lifecycle > > > > ### Creation > > > > ``` > > Admin API (PUT /admin/v2/scalable/{tenant}/{namespace}/{topic}) > > └─> ScalableTopicService.createScalableTopic(topic, numInitialSegments) > > ├─> > ScalableTopicController.createInitialMetadata(numInitialSegments) > > │ └─> Divides [0x0000, 0xFFFF] into N equal ranges > > │ Creates N SegmentInfo records (ACTIVE, no parents) > > │ Returns ScalableTopicMetadata with epoch=0 > > ├─> ScalableTopicResources.createScalableTopicAsync(topic, > metadata) > > │ └─> Writes metadata to store at /topics/{t}/{ns}/{topic} > > └─> For each segment: createSegmentAsync via Segments admin API > > └─> Creates segment:// topic on owning broker > > ``` > > > > ### Deletion > > > > ``` > > Admin API (DELETE /admin/v2/scalable/{tenant}/{namespace}/{topic}) > > └─> ScalableTopicService.deleteScalableTopic(topic) > > ├─> releaseController(topic) > > │ └─> Closes leader election, removes from controllers map > > ├─> Load metadata from store > > ├─> For each segment: deleteSegmentAsync via Segments admin API > > │ └─> Deletes segment:// topic on owning broker (best-effort) > > └─> ScalableTopicResources.deleteScalableTopicAsync(topic) > > └─> Removes metadata from store > > ``` > > > > --- > > > > ## Public-Facing Changes > > > > ### Binary Protocol > > > > Three new protocol commands (added to `PulsarApi.proto`): > > > > | Command | Direction | Field ID | Description | > > |---------|-----------|----------|-------------| > > | `CommandScalableTopicLookup` | Client → Broker | 70 | Initiates a > > DAG watch session | > > | `CommandScalableTopicUpdate` | Broker → Client | 71 | Initial > > response and subsequent pushed updates | > > | `CommandScalableTopicClose` | Client → Broker | 72 | Closes the DAG > > watch session | > > > > New protocol messages: > > > > - `ScalableTopicDAG` — the full segment DAG with broker addresses and > > controller URL. > > - `SegmentInfoProto` — per-segment record in the DAG. > > - `SegmentBrokerAddress` — maps a segment ID to its owning broker. > > - `SegmentState` enum — `ACTIVE` or `SEALED`. > > > > ### Admin API > > > > **Scalable Topics** (`/admin/v2/scalable`): > > > > | Method | Path | Description | > > |--------|------|-------------| > > | `GET` | `/{tenant}/{namespace}` | List scalable topics | > > | `PUT` | `/{tenant}/{namespace}/{topic}` | Create scalable topic | > > | `GET` | `/{tenant}/{namespace}/{topic}` | Get topic metadata | > > | `DELETE` | `/{tenant}/{namespace}/{topic}` | Delete scalable topic | > > | `GET` | `/{tenant}/{namespace}/{topic}/stats` | Get aggregated stats > > for the scalable topic (segment counts, per-segment rates, > > per-subscription state, consumer counts) | > > | `PUT` | `/{tenant}/{namespace}/{topic}/subscriptions/{subscription}` > > | Create a subscription on the scalable topic. The controller > > propagates it to all active segments by issuing `createSubscription` > > on each `segment://` topic via the Segments admin API. | > > | `DELETE` | `/{tenant}/{namespace}/{topic}/subscriptions/{subscription}` > > | Delete a subscription. The controller unregisters all consumers and > > deletes the subscription from every segment. | > > | `POST` | `/{tenant}/{namespace}/{topic}/split/{segmentId}` | Split a > segment | > > | `POST` | > `/{tenant}/{namespace}/{topic}/merge/{segmentId1}/{segmentId2}` > > | Merge two segments | > > > > **Segments** (`/admin/v2/segments`): > > > > | Method | Path | Description | > > |--------|------|-------------| > > | `PUT` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Create segment > topic | > > | `POST` | `/{tenant}/{namespace}/{topic}/{descriptor}/terminate` | > > Terminate segment | > > | `DELETE` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Delete > > segment topic | > > > > ### Admin Client API > > > > New `ScalableTopics` interface on `PulsarAdmin`: > > > > - `listScalableTopics(namespace)` / `listScalableTopicsAsync(namespace)` > > - `createScalableTopic(topic, numSegments)` / > `createScalableTopicAsync(...)` > > - `getMetadata(topic)` / `getMetadataAsync(topic)` > > - `getStats(topic)` / `getStatsAsync(topic)` > > - `deleteScalableTopic(topic)` / `deleteScalableTopicAsync(topic)` > > - `createSubscription(topic, subscription)` / > `createSubscriptionAsync(...)` > > - `deleteSubscription(topic, subscription)` / > `deleteSubscriptionAsync(...)` > > - `splitSegment(topic, segmentId)` / `splitSegmentAsync(...)` > > - `mergeSegments(topic, segmentId1, segmentId2)` / > `mergeSegmentsAsync(...)` > > - `createSegment(segmentTopic, subscriptions)` / > `createSegmentAsync(...)` > > - `terminateSegment(segmentTopic)` / `terminateSegmentAsync(...)` > > - `deleteSegment(segmentTopic, force)` / `deleteSegmentAsync(...)` > > > > ### Configuration > > > > | Property | Default | Description | > > |----------|---------|-------------| > > | `scalableTopicEnabled` | `true` | Enable scalable topic support on > > the broker | > > > > Additional configuration for auto-split thresholds and consumer > > controller session grace period will be specified in follow-up > > sub-PIPs. > > > > ### Metadata Store Paths > > > > | Path | Content | > > |------|---------| > > | `/topics/{tenant}/{namespace}/{topic}` | `ScalableTopicMetadata` > > JSON — segment DAG and global topic state | > > | `/topics/{tenant}/{namespace}/{topic}/controller` | Leader broker > > URL (ephemeral) | > > | `/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}` > > | `SubscriptionMetadata` — subscription-level config and the persisted > > set of consumer registrations | > > | > `/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/consumers/{consumerName}` > > | `ConsumerRegistration` — the durable session: consumer name and its > > current segment assignment. Keep-alive state (connected/disconnected, > > grace-period timer) is in-memory on the controller leader only and is > > **not** persisted here. | > > > > --- > > > > ## Backward & Forward Compatibility > > > > ### Upgrade > > > > The controller is a new component with no interaction with existing > > partitioned or non-partitioned topics. Enabling `scalableTopicEnabled` > > activates the new code paths. Existing topics are unaffected. > > > > ### Downgrade / Rollback > > > > Scalable topic metadata and segment data remain in the metadata store > > and BookKeeper. Rolling back to a version without scalable topic > > support leaves this data intact but inaccessible. Upgrading again > > restores access. > > > > ### Client Compatibility > > > > The existing Pulsar client SDK rejects `topic://` and `segment://` > > domains with a clear error message directing users to the V5 client > > SDK. This check is enforced in `PulsarClientImpl` for producer, > > consumer, and reader creation paths. > > > > --- > > > > ## Security Considerations > > > > The controller follows the same tenant/namespace/topic authorization > > model as existing Pulsar topics: > > > > - Admin API operations require topic-level admin permissions. > > - DAG watch sessions require topic-level lookup permissions. > > - Consumer registration requires topic-level consume permissions. > > - Segment operations are internal and authenticated via the broker's > > internal admin client. > > > > The controller leader lock in the metadata store is accessible only to > > authenticated brokers. > > > > --- > > > > ## Links > > > > - Parent PIP: [PIP-460: Scalable Topics](pip-460.md) > > - V5 Client API: [PIP-466: New Java Client API (V5)](pip-466.md) > > > > -- > > Matteo Merli > > <[email protected]> > > > > >
