+1 (binding)

-Lari

On 2026/04/17 04:03:36 Matteo Merli wrote:
> Matteo Merli <[email protected]>
> 
> Mon, Apr 13, 11:11 PM (3 days ago)
> to Dev
> https://github.com/apache/pulsar/pull/25516
> 
> 
> -----
> 
> # PIP-468: Scalable Topic Controller
> 
> *Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*
> 
> ## Motivation
> 
> [PIP-460](pip-460.md) introduces scalable topics as a new topic type
> in Pulsar, built on a DAG of range segments that can be split and
> merged to scale independently of the initial configuration. PIP-460
> defines the overall vision but defers the detailed design of each
> component to dedicated sub-PIPs.
> 
> This PIP specifies the **Scalable Topic Controller** — the broker-side
> component responsible for:
> 
> 1. **Segment lifecycle management** — creating, terminating, and
> deleting the underlying segment topics that back a scalable topic.
> 2. **Segment layout coordination** — executing split and merge
> operations with correct ordering guarantees so that no messages are
> lost and all subscription cursors exist before producers are
> redirected.
> 3. **Consumer assignment** — coordinating which consumers receive
> messages from which segments for a given subscription.
> 4. **Leader election** — ensuring exactly one broker acts as the
> controller for each scalable topic, with automatic failover.
> 5. **DAG watch sessions** — pushing topology updates to connected
> clients (producers and consumers) when the segment layout changes.
> 
> Without a well-defined controller, split and merge operations would be
> unsafe: producers could write to segments that have no subscription
> cursors, consumers could miss messages during topology changes, and
> concurrent layout modifications could corrupt the segment DAG.
> 
> ---
> 
> ## Design
> 
> ### Architecture Overview
> 
> The controller subsystem consists of four layers:
> 
> ```
> ┌──────────────────────────────────────────────────────────┐
> │                    Admin API Layer                        │
> │         ScalableTopics REST  +  Segments REST             │
> └──────────────┬───────────────────────┬───────────────────┘
>                │                       │
> ┌──────────────▼───────────────────────▼───────────────────┐
> │                  ScalableTopicService                     │
> │         Per-broker singleton; manages controllers         │
> └──────────────────────────┬───────────────────────────────┘
>                            │
> ┌──────────────────────────▼───────────────────────────────┐
> │              ScalableTopicController (per topic)          │
> │   Leader-elected; owns layout, split/merge, consumers     │
> │                                                           │
> │   ┌─────────────────┐  ┌───────────────────────────────┐ │
> │   │  SegmentLayout   │  │  SubscriptionCoordinator (×N) │ │
> │   │  (immutable DAG) │  │  per-subscription assignments │ │
> │   └─────────────────┘  └───────────────────────────────┘ │
> └──────────────────────────┬───────────────────────────────┘
>                            │
> ┌──────────────────────────▼───────────────────────────────┐
> │              ScalableTopicResources                       │
> │         Metadata store access (read/write/watch)          │
> └──────────────────────────────────────────────────────────┘
> ```
> 
> ### Broker Roles and Request Routing
> 
> A scalable topic involves three distinct broker roles. Understanding
> which broker handles which operation is critical to the design.
> 
> #### Any broker — DAG watch sessions
> 
> A **DAG watch session** (scalable topic lookup) can be served by **any
> broker** in the cluster. The session reads segment metadata from the
> shared metadata store and registers a watch for changes. No state is
> held on the broker beyond the watch registration. This means producers
> and consumers can connect to any broker for topic discovery — the same
> way regular topic lookups work today.
> 
> When the metadata changes (due to a split or merge performed by the
> controller leader), the metadata store notification fires on whichever
> broker is serving the watch session, and that broker pushes the
> updated DAG to the client.
> 
> #### Controller leader — layout mutations and consumer assignment
> 
> Each scalable topic has exactly one **controller leader** across the
> cluster, elected via the metadata store. The controller leader is the
> only broker that can:
> 
> - Execute **split** and **merge** operations (the multi-step protocols
> described below).
> - Accept **consumer registrations** and compute segment-to-consumer 
> assignments.
> - **Notify consumers** of assignment changes after topology updates.
> 
> Clients discover the controller leader's broker URL through the DAG
> watch session response (`controller_broker_url` field). StreamConsumer
> and CheckpointConsumer clients then connect directly to the controller
> leader to register and receive assignments.
> 
> Admin API requests for split and merge are routed to the
> `ScalableTopicService` on any broker, which obtains (or creates) a
> local `ScalableTopicController` that participates in leader election.
> If the local controller is the leader, it executes the operation
> directly. If not, the request must be redirected to the leader (or the
> controller must first win the election). The split and merge admin
> endpoints do not require the request to land on the leader broker —
> the `ScalableTopicService` handles this transparently.
> 
> #### Segment-owning brokers — produce and consume
> 
> Individual **segment topics** (`segment://`) are regular persistent
> topics from the broker's perspective. They are distributed across the
> cluster by the standard Pulsar **load manager** and namespace bundle
> assignment — the same mechanism used for `persistent://` topics. Each
> segment topic is owned by exactly one broker at any time, determined
> by its namespace bundle hash.
> 
> The controller leader does not need to own any of the segment topics.
> When the controller needs to operate on a segment (create, terminate,
> delete, discover subscriptions), it uses the **Segments admin API**,
> which routes the request to whichever broker currently owns that
> segment's namespace bundle. This decoupling means the controller can
> coordinate segments spread across many brokers without requiring
> co-location.
> 
> ```
> ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
> │  Broker A    │     │  Broker B    │     │  Broker C    │
> │              │     │  (controller │     │              │
> │  segment-0   │     │   leader)    │     │  segment-2   │
> │  segment-1   │     │              │     │              │
> │              │     │  DAG watch   │     │  DAG watch   │
> │  DAG watch   │     │  sessions    │     │  sessions    │
> │  sessions    │     │              │     │              │
> └─────────────┘     └─────────────┘     └─────────────┘
>                            │
>                     Split/merge ops
>                     use admin API to
>                     reach segments on
>                     Broker A and C
> ```
> 
> In this example, Broker B is the controller leader but owns no
> segments. When it executes a split of segment-0, it calls the Segments
> admin API which routes to Broker A (the owner of segment-0).
> 
> ### Components
> 
> #### ScalableTopicService
> 
> A per-broker singleton, created and owned by `BrokerService`. It
> manages the lifecycle of all `ScalableTopicController` instances on
> the broker.
> 
> **Responsibilities:**
> 
> - **Controller management**: maintains a map of active controllers
> keyed by topic name. Creates controllers on demand via
> `getOrCreateController()` and releases them on topic unload or broker
> shutdown.
> - **Topic lifecycle**: handles `createScalableTopic()` and
> `deleteScalableTopic()` admin operations, including creating/deleting
> the underlying segment topics via the Segments admin API.
> - **Delegation**: routes `splitSegment()` and `mergeSegments()`
> requests to the appropriate controller.
> - **Leader state monitoring**: listens for leader election state
> changes and re-attempts election when the current leader is lost.
> 
> #### ScalableTopicController
> 
> A per-topic coordinator. Only one instance across the cluster is the
> **leader** for a given scalable topic, ensured by leader election
> through the metadata store. The leader is the only instance that
> modifies the segment layout or assigns consumers.
> 
> **State:**
> 
> | Field | Type | Description |
> |-------|------|-------------|
> | `topicName` | `TopicName` | The `topic://` name of the scalable topic |
> | `currentLayout` | `SegmentLayout` | The current immutable snapshot
> of the segment DAG |
> | `subscriptions` | `Map<String, SubscriptionCoordinator>` |
> Per-subscription consumer coordinators |
> | `leaderState` | `LeaderElectionState` | Current leader election state |
> | `leaderElection` | `LeaderElection<String>` | Metadata store leader
> election handle; value is the broker URL |
> 
> **Key operations:**
> 
> - `initialize()` — loads the current metadata from the store, then
> attempts leader election. The elected leader stores its broker service
> URL so that clients can discover and connect to it.
> - `splitSegment(segmentId)` — splits an active segment at its midpoint
> (see [Split Protocol](#split-protocol)).
> - `mergeSegments(segmentId1, segmentId2)` — merges two adjacent active
> segments (see [Merge Protocol](#merge-protocol)).
> - `registerConsumer(subscription, consumer)` — registers a consumer
> and returns its initial segment assignment.
> - `unregisterConsumer(subscription, consumer)` — removes a consumer
> and triggers rebalancing.
> - `getLeaderBrokerUrl()` — returns the current leader's broker URL,
> used by clients to connect to the controller.
> 
> #### SegmentLayout
> 
> An immutable, versioned snapshot of the segment DAG. All mutation
> methods (`splitSegment`, `mergeSegments`, `pruneSegment`) return a
> **new** `SegmentLayout` instance — the original is never modified.
> This ensures safe concurrent reads without locking.
> 
> **Fields:**
> 
> | Field | Type | Description |
> |-------|------|-------------|
> | `epoch` | `long` | Monotonically increasing version; incremented on
> every layout change |
> | `nextSegmentId` | `long` | Counter for assigning IDs to new segments |
> | `allSegments` | `Map<Long, SegmentInfo>` | Complete DAG: all
> segments (active + sealed) |
> | `activeSegments` | `Map<Long, SegmentInfo>` | Filtered view: only
> `ACTIVE` segments |
> 
> **Key operations:**
> 
> - `findActiveSegment(hash)` — returns the active segment whose hash
> range contains the given hash value. Used by producers for message
> routing.
> - `splitSegment(segmentId)` — validates the segment is active,
> computes the midpoint of its hash range, creates two child
> `SegmentInfo` records covering `[start, mid]` and `[mid+1, end]`,
> seals the parent, and returns a new layout with incremented epoch.
> - `mergeSegments(id1, id2)` — validates both segments are active and
> adjacent (the end of one equals `start - 1` of the other), creates a
> merged `SegmentInfo` covering the combined range, seals both parents,
> and returns a new layout.
> - `getLineage(segmentId)` — returns the full ancestor + descendant
> chain for a segment, used for DAG traversal during catch-up reads.
> - `toMetadata()` / `fromMetadata()` — converts to/from the persisted
> `ScalableTopicMetadata` format.
> 
> #### SegmentInfo
> 
> A record representing a single segment in the DAG:
> 
> | Field | Type | Description |
> |-------|------|-------------|
> | `segmentId` | `long` | Unique, monotonically increasing identifier |
> | `hashRange` | `HashRange` | The `[start, end]` inclusive hash range
> this segment covers |
> | `state` | `SegmentState` | `ACTIVE` or `SEALED` |
> | `parentIds` | `List<Long>` | Parent segments (empty for root segments) |
> | `childIds` | `List<Long>` | Child segments (empty for leaf/active segments) 
> |
> | `createdAtEpoch` | `long` | Layout epoch in which this segment was
> created (`0` for the initial segments) |
> | `sealedAtEpoch` | `long` | Layout epoch in which this segment was
> sealed; only meaningful when `state == SEALED`. `0` when the segment
> is `ACTIVE`; this sentinel is unambiguous because sealing always
> happens as part of a split or merge, which increments the epoch — so a
> segment can never be sealed at epoch `0`. |
> 
> Factory methods: `SegmentInfo.active(id, range, epoch)` and
> `SegmentInfo.sealed(...)`.
> 
> #### SubscriptionCoordinator
> 
> Manages segment-to-consumer assignments for a single subscription
> within a scalable topic.
> 
> **Assignment strategy:** round-robin. Active segments are sorted by
> hash range start; consumers are sorted by name. Segments are
> distributed evenly across consumers. When a consumer is added or
> removed, or when the layout changes (split/merge), the coordinator
> recomputes assignments and pushes updates to affected consumers.
> 
> **Key operations:**
> 
> - `addConsumer(consumer)` — adds a consumer, rebalances, and returns
> the full assignment map.
> - `removeConsumer(consumer)` — removes a consumer and redistributes
> its segments.
> - `onLayoutChange(newLayout)` — called after a split or merge;
> recomputes all assignments against the new set of active segments.
> 
> #### ConsumerRegistration and ConsumerAssignment
> 
> `ConsumerRegistration` is a record identifying a connected consumer:
> 
> | Field | Type | Description |
> |-------|------|-------------|
> | `consumerId` | `long` | Protocol-level consumer ID |
> | `consumerName` | `String` | Stable name chosen by the client |
> | `cnx` | `TransportCnx` | The connection to push assignment updates |
> 
> `ConsumerAssignment` is the assignment sent to a consumer:
> 
> | Field | Type | Description |
> |-------|------|-------------|
> | `layoutEpoch` | `long` | The layout epoch this assignment is based on |
> | `assignedSegments` | `List<AssignedSegment>` | The segments assigned
> to this consumer |
> 
> Each `AssignedSegment` contains: `segmentId`, `hashRange`, and
> `underlyingTopicName` (the `segment://` topic name the consumer should
> connect to).
> 
> ### Metadata Store Schema
> 
> Scalable topic metadata is stored in the metadata store under a
> well-known path structure:
> 
> ```
> /topics/{tenant}/{namespace}/{encodedTopicName}
> ```
> 
> The value at this path is a JSON-serialized `ScalableTopicMetadata`:
> 
> ```json
> {
>   "epoch": 3,
>   "nextSegmentId": 5,
>   "segments": {
>     "0": { "segmentId": 0, "hashRange": {"start": 0, "end": 65535},
> "state": "SEALED", "parentIds": [], "childIds": [1, 2],
> "createdAtEpoch": 0, "sealedAtEpoch": 1 },
>     "1": { "segmentId": 1, "hashRange": {"start": 0, "end": 32767},
> "state": "SEALED", "parentIds": [0], "childIds": [3, 4],
> "createdAtEpoch": 1, "sealedAtEpoch": 3 },
>     "2": { "segmentId": 2, "hashRange": {"start": 32768, "end":
> 65535}, "state": "ACTIVE", "parentIds": [0], "childIds": [],
> "createdAtEpoch": 1, "sealedAtEpoch": 0 },
>     "3": { "segmentId": 3, "hashRange": {"start": 0, "end": 16383},
> "state": "ACTIVE", "parentIds": [1], "childIds": [], "createdAtEpoch":
> 3, "sealedAtEpoch": 0 },
>     "4": { "segmentId": 4, "hashRange": {"start": 16384, "end":
> 32767}, "state": "ACTIVE", "parentIds": [1], "childIds": [],
> "createdAtEpoch": 3, "sealedAtEpoch": 0 }
>   },
>   "properties": {}
> }
> ```
> 
> The controller leader lock is stored at:
> 
> ```
> /topics/{tenant}/{namespace}/{encodedTopicName}/controller
> ```
> 
> The value is the broker service URL of the elected leader.
> 
> ### DAG Watch Sessions
> 
> As described in [Broker Roles](#broker-roles-and-request-routing), any
> broker can serve a DAG watch session because the segment metadata
> lives in the shared metadata store — no controller leader involvement
> is required. The session is established via the binary protocol:
> 
> 1. **Client sends `CommandScalableTopicLookup`** with a
> client-assigned `sessionId` and the `topic://` topic name.
> 2. **Broker creates a `DagWatchSession`** that:
>    - Loads the current `ScalableTopicMetadata` from the store.
>    - Resolves which broker owns each active segment's `segment://`
> topic (via topic lookup).
>    - Reads the controller broker URL from the leader lock path.
>    - Registers a metadata store notification listener for changes to
> the topic's metadata path.
> 3. **Broker sends `CommandScalableTopicUpdate`** with the initial
> `ScalableTopicDAG` containing the full segment DAG, broker addresses,
> and controller URL.
> 4. **On metadata change**, the notification listener fires. The broker
> reloads metadata, re-resolves broker addresses, and pushes an updated
> `CommandScalableTopicUpdate` to the client.
> 5. **Client sends `CommandScalableTopicClose`** to tear down the session.
> 
> The `ScalableTopicDAG` protocol message contains:
> 
> | Field | Type | Description |
> |-------|------|-------------|
> | `epoch` | `uint64` | Layout epoch |
> | `segments` | `repeated SegmentInfoProto` | Full segment DAG |
> | `segment_brokers` | `repeated SegmentBrokerAddress` | Broker URL for
> each active segment |
> | `controller_broker_url` | `string` | Controller leader's broker URL |
> | `controller_broker_url_tls` | `string` | TLS variant |
> 
> ---
> 
> ## Split Protocol
> 
> Splitting a segment is the core scaling-out operation. The protocol is
> carefully ordered to guarantee that **no messages are lost** and **all
> subscription cursors exist before producers are redirected**.
> 
> ### Step-by-step
> 
> ```
>            Controller                   Metadata Store          Broker(s)
>                │                             │                     │
>   1. Discover  │──── getSubscriptions ───────>│                     │
>   subscriptions│<─── [sub-A, sub-B] ─────────│                     │
>                │                             │                     │
>   2. Create    │──── createSegment(child1, ──>│ ─── route ────────> │
>   children     │     [sub-A, sub-B])          │                     │
> create topic
>                │──── createSegment(child2, ──>│ ─── route ────────> │ + 
> cursors
>                │     [sub-A, sub-B])          │                     │
>                │                             │                     │
>   3. Terminate │──── terminateSegment ───────>│ ─── route ────────> │
>   parent       │     (parent)                 │                     │ seal ML
>                │                             │                     │
>   4. Update    │──── CAS update ─────────────>│                     │
>   metadata     │     (atomic)                 │                     │
>                │                             │                     │
>   5. Notify    │──── push to consumers ──────────────────────────> │
>   consumers    │     (rebalance)              │                     │
> ```
> 
> **Step 1: Discover subscriptions.** The controller queries the parent
> segment topic for its list of subscriptions. It first checks locally
> (if the segment is on this broker), falling back to the admin API for
> remote segments.
> 
> **Step 2: Create child segment topics.** Two new `segment://` topics
> are created via the Segments admin API. Each creation request includes
> the list of subscriptions discovered in step 1. The Segments API
> handler creates the persistent topic and initializes subscription
> cursors at the `Earliest` position. The admin API routes to whichever
> broker owns the namespace bundle for each segment, so child segments
> may be created on different brokers.
> 
> **Step 3: Terminate parent segment.** The parent segment topic is
> terminated via the Segments admin API, which writes a termination
> marker to the managed ledger. After termination, producers writing to
> the parent receive `TopicTerminated` and must re-route to child
> segments.
> 
> **Step 4: Atomic metadata update.** The controller issues a
> compare-and-swap (CAS) update to the metadata store. The update
> transitions the parent to `SEALED` state with child pointers, and adds
> the two new `ACTIVE` child segments. The CAS guarantees atomicity — if
> another operation modified the metadata concurrently, the update fails
> and must be retried.
> 
> **Step 5: Notify consumers.** All `SubscriptionCoordinator` instances
> are notified of the layout change. They recompute segment-to-consumer
> assignments and push updates to connected consumers.
> 
> ### Why this ordering matters
> 
> - Steps 1-2 **before** step 4: if we updated metadata first, clients
> would discover the new segments immediately. Producers would start
> writing to child segments that have no subscription cursors yet,
> causing those messages to be missed by consumers.
> - Step 3 **before** step 4: terminating the parent before the metadata
> update ensures that by the time clients see the new layout, the parent
> is already sealed. There is no window where both parent and children
> are writable.
> - Step 2 creates cursors at `Earliest`: this is safe because the child
> topics are brand new (empty). The cursor starts at the beginning, and
> the first message written by a redirected producer will be the first
> message consumed.
> 
> ---
> 
> ## Merge Protocol
> 
> Merging two adjacent segments is the core scaling-in operation. The
> protocol follows the same ordering principle: **create first, update
> metadata last**.
> 
> ### Step-by-step
> 
> **Step 1: Discover subscriptions.** The controller queries both parent
> segments for their subscriptions and takes the union, so no
> subscription is lost.
> 
> **Step 2: Create merged segment topic.** A single new `segment://`
> topic is created with all discovered subscriptions.
> 
> **Step 3: Terminate both parent segments.** Both parents are
> terminated via the admin API. This is done sequentially to avoid a
> race where producers of one parent are still writing while the other
> is already sealed.
> 
> **Step 4: Atomic metadata update.** The CAS update seals both parents
> with child pointers to the merged segment, and adds the new `ACTIVE`
> merged segment.
> 
> **Step 5: Notify consumers.** Same as split — rebalance and push.
> 
> ### Cross-broker coordination
> 
> Unlike splits, merges inherently involve multiple brokers because the
> two parent segments may be owned by different brokers (assigned by the
> load manager). The controller leader handles this transparently: all
> segment operations (create, terminate, delete) go through the Segments
> admin API, which routes each request to the broker that currently owns
> the target segment's namespace bundle. The controller does not need to
> know or care which broker owns which segment — the admin API routing
> handles it.
> 
> ---
> 
> ## Segment Topic Management via Admin API
> 
> Segment topics are managed exclusively through a dedicated REST API at
> `/admin/v2/segments`. This is a deliberate design choice: segment
> topics use the `segment://` domain and must not be confused with
> regular `persistent://` topics.
> 
> ### Endpoints
> 
> | Method | Path | Description |
> |--------|------|-------------|
> | `PUT` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Create a
> segment topic. Optional request body with subscription names to
> pre-create. |
> | `POST` | `/{tenant}/{namespace}/{topic}/{descriptor}/terminate` |
> Terminate (seal) a segment topic. |
> | `DELETE` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Delete a
> segment topic. |
> 
> The `{descriptor}` is the segment's hash range and ID in the format
> `{hexStart}-{hexEnd}-{segmentId}` (e.g., `0000-7fff-1`).
> 
> ### Create Segment
> 
> When creating a segment topic, the handler:
> 
> 1. Constructs the full `segment://` topic name from the path components.
> 2. Validates namespace bundle ownership (the request is routed to the
> owning broker).
> 3. Creates the persistent topic via `BrokerService.getOrCreateTopic()`.
> 4. For each subscription in the request body, creates a cursor at the
> `Earliest` position.
> 
> ### Terminate Segment
> 
> Terminates the managed ledger backing the segment topic. After
> termination, the topic accepts no further writes, and producers
> receive `TopicTerminated`.
> 
> ### Delete Segment
> 
> Deletes the segment topic and its managed ledger. Used during scalable
> topic deletion to clean up all underlying storage.
> 
> ---
> 
> ## Consumer Assignment
> 
> Consumer assignment behavior depends on the consumer type:
> 
> - **StreamConsumer** (ordered, cumulative ack) — requires coordinated
> segment-to-consumer assignment from the controller leader. Each active
> segment is owned by exactly one consumer at any time to preserve
> per-segment ordering. This is the subject of the rest of this section.
> - **CheckpointConsumer** (unmanaged, for connectors) — same as
> StreamConsumer from a controller perspective: it registers with the
> controller leader and receives an explicit segment assignment.
> - **QueueConsumer** (unordered, individual ack) — **does not require
> any controller-side assignment**. Because there is no ordering
> requirement, every queue consumer attaches directly to **all**
> segments of the scalable topic — both `ACTIVE` and `SEALED` — and each
> segment-owning broker independently performs round-robin delivery
> across the queue consumers attached to that segment. New segments
> produced by a split are picked up transparently via the DAG watch
> session push; sealed segments keep serving messages until their
> backlog drains and are then dropped from the set. The controller
> leader is not in the data path for queue consumers and no
> `SubscriptionCoordinator` is involved for them.
> 
> ### Registration Flow (StreamConsumer / CheckpointConsumer)
> 
> 1. A `StreamConsumer` connects to the controller broker (discovered
> via the DAG watch session's `controller_broker_url`).
> 2. The consumer sends a registration request with its `subscription`
> name and `consumerName`.
> 3. The controller's `registerConsumer()` method routes to the
> appropriate `SubscriptionCoordinator` (created on first use for that
> subscription).
> 4. The coordinator adds the consumer, recomputes assignments, and
> returns the consumer's `ConsumerAssignment` — a list of `(segmentId,
> hashRange, segmentTopicName)` tuples.
> 5. The consumer connects to each assigned `segment://` topic and
> begins consuming.
> 
> ### Rebalancing
> 
> Rebalancing occurs when:
> 
> - A consumer is added or removed.
> - A split or merge changes the set of active segments.
> 
> The rebalancing algorithm is round-robin:
> 
> 1. Collect all active segments, sorted by hash range start.
> 2. Collect all consumers, sorted by name (for deterministic ordering).
> 3. Assign segments to consumers in round-robin order.
> 4. For each consumer whose assignment changed, push the new
> `ConsumerAssignment`.
> 
> ### Consumer Session Lifecycle
> 
> A consumer's registration and its segment assignment are treated as a
> **persistent session**, not as a transient association tied to a TCP
> connection. This is a deliberate departure from the existing Pulsar
> consumer model, where consumer liveness is asserted purely by the TCP
> connection to the broker.
> 
> **Rationale.** Scalable topic consumer assignments have non-trivial
> cost: when a consumer disappears, its segments are redistributed among
> the remaining consumers, each of which may need to reconnect to
> different segment brokers and (for ordered consumers) drain in-flight
> messages before the new assignment takes effect. Treating a brief
> disconnection as a full consumer loss would cause unnecessary
> rebalancing in common scenarios such as a consumer process restart or
> a broker restart.
> 
> **What is persisted vs. in-memory.**
> 
> The **session itself is persisted** — the keep-alive state is not.
> 
> - **Persisted in the metadata store (the session):** the
> `ConsumerRegistration` — consumer name and its current segment
> assignment — keyed by `consumerName` under the subscription path. Once
> a consumer successfully registers, this entry is durable and outlives
> TCP disconnects, client restarts, and controller leader failovers. The
> assignment survives failover without forcing consumers to re-register.
> - **In-memory on the controller leader (the keep-alive):** whether
> each consumer is currently connected, and the grace-period timer that
> runs while it is disconnected. Keep-alive signals are **not** written
> to the metadata store; the leader observes the consumer's connection
> state directly and tracks the timer in RAM. This avoids a metadata
> store write on every liveness tick.
> 
> **Session semantics (steady state).**
> 
> - When a consumer registers, the `SubscriptionCoordinator` writes its
> `ConsumerRegistration` and marks it connected in-memory.
> - If the consumer's connection drops, the leader does **not**
> immediately remove the consumer. It transitions the in-memory state to
> "disconnected" and starts a configurable **session grace period**
> timer for that consumer.
> - If the same `consumerName` reconnects within the grace period, the
> timer is cancelled and the consumer resumes with the same persisted
> assignment — no rebalance, no other consumer disturbed.
> - If the grace period expires, the leader deletes the
> `ConsumerRegistration` from the metadata store and triggers a
> rebalance for the remaining consumers.
> 
> **Behavior on controller leader failover.**
> 
> Because liveness timers are in-memory only, they are lost when the
> leader broker crashes. The new leader:
> 
> 1. Loads all `ConsumerRegistration` entries for each subscription from
> the metadata store, restoring the prior segment assignment.
> 2. Treats every consumer as "just disconnected" and starts a fresh
> grace-period timer for each. No timestamps carry across from the
> previous leader.
> 3. Consumers reconnecting to the new leader within the fresh grace
> period resume with the same assignment — seamless from their
> perspective.
> 4. Consumers that fail to reconnect before the fresh timer expires are
> evicted and their segments are redistributed.
> 
> This means a leader failover always gives clients a full grace period
> to reconnect, regardless of how long they had already been
> disconnected under the old leader. The trade-off is explicit: we keep
> the metadata store writes proportional to real membership changes
> (register / unregister / rebalance) rather than to liveness ticks.
> 
> **Covered scenarios.**
> 
> - **Consumer process restart.** A client process restarts and
> reconnects with the same `consumerName` well within the grace period —
> the same segments are reassigned, no redistribution.
> - **Controller leader restart.** The leader is re-elected on a
> different broker. Consumer entries and assignments are restored from
> the metadata store; the new leader starts a fresh grace-period timer
> for every consumer so reconnects are transparent.
> - **Segment-owning broker restart.** Segment topics are reassigned by
> the standard Pulsar load manager; the consumer's segment-to-consumer
> assignment is unaffected and reconnects to the new owning broker are
> transparent at the consumer-assignment level.
> 
> The grace period and related tunables are configurable per broker and
> will be specified in a follow-up sub-PIP.
> 
> ### Layout Change Propagation
> 
> When a split or merge completes and the metadata is updated:
> 
> 1. The controller reloads the metadata and creates a new `SegmentLayout`.
> 2. `notifySubscriptions()` is called, which iterates all
> `SubscriptionCoordinator` instances.
> 3. Each coordinator calls `onLayoutChange(newLayout)`, which
> recomputes assignments against the new active segments and pushes
> updates to affected consumers.
> 
> ---
> 
> ## Leader Election and Failover
> 
> ### Election
> 
> Each `ScalableTopicController` participates in leader election via the
> metadata store's `LeaderElection` API. The election path is:
> 
> ```
> /topics/{tenant}/{namespace}/{encodedTopicName}/controller
> ```
> 
> The elected leader writes its broker service URL as the election
> value. Clients discover the controller by reading this value
> (delivered as part of the DAG watch session response).
> 
> ### Failover
> 
> When the leader broker fails:
> 
> 1. The metadata store detects the session loss and clears the leader lock.
> 2. Other brokers with active controllers for the same topic receive a
> `NoLeader` state change notification.
> 3. The `ScalableTopicService.onLeaderStateChange()` handler re-invokes
> `controller.initialize()`, which reloads metadata and re-attempts
> election.
> 4. The new leader restores in-memory state from the persisted
> metadata, including all subscriptions and their consumer registrations
> with current assignments. Consumers reconnecting within the session
> grace period find their sessions intact and resume with the same
> segment assignment without a rebalance (see [Consumer Session
> Lifecycle](#consumer-session-lifecycle)).
> 
> ### Non-leader brokers
> 
> A broker that loses leadership or never wins it retains the controller
> instance but all mutating operations (`splitSegment`, `mergeSegments`,
> `registerConsumer`) throw `IllegalStateException` after a
> `checkLeader()` guard. The controller remains available for read
> operations like `getLayout()`.
> 
> ---
> 
> ## Scalable Topic Lifecycle
> 
> ### Creation
> 
> ```
> Admin API (PUT /admin/v2/scalable/{tenant}/{namespace}/{topic})
>   └─> ScalableTopicService.createScalableTopic(topic, numInitialSegments)
>         ├─> ScalableTopicController.createInitialMetadata(numInitialSegments)
>         │     └─> Divides [0x0000, 0xFFFF] into N equal ranges
>         │         Creates N SegmentInfo records (ACTIVE, no parents)
>         │         Returns ScalableTopicMetadata with epoch=0
>         ├─> ScalableTopicResources.createScalableTopicAsync(topic, metadata)
>         │     └─> Writes metadata to store at /topics/{t}/{ns}/{topic}
>         └─> For each segment: createSegmentAsync via Segments admin API
>               └─> Creates segment:// topic on owning broker
> ```
> 
> ### Deletion
> 
> ```
> Admin API (DELETE /admin/v2/scalable/{tenant}/{namespace}/{topic})
>   └─> ScalableTopicService.deleteScalableTopic(topic)
>         ├─> releaseController(topic)
>         │     └─> Closes leader election, removes from controllers map
>         ├─> Load metadata from store
>         ├─> For each segment: deleteSegmentAsync via Segments admin API
>         │     └─> Deletes segment:// topic on owning broker (best-effort)
>         └─> ScalableTopicResources.deleteScalableTopicAsync(topic)
>               └─> Removes metadata from store
> ```
> 
> ---
> 
> ## Public-Facing Changes
> 
> ### Binary Protocol
> 
> Three new protocol commands (added to `PulsarApi.proto`):
> 
> | Command | Direction | Field ID | Description |
> |---------|-----------|----------|-------------|
> | `CommandScalableTopicLookup` | Client → Broker | 70 | Initiates a
> DAG watch session |
> | `CommandScalableTopicUpdate` | Broker → Client | 71 | Initial
> response and subsequent pushed updates |
> | `CommandScalableTopicClose` | Client → Broker | 72 | Closes the DAG
> watch session |
> 
> New protocol messages:
> 
> - `ScalableTopicDAG` — the full segment DAG with broker addresses and
> controller URL.
> - `SegmentInfoProto` — per-segment record in the DAG.
> - `SegmentBrokerAddress` — maps a segment ID to its owning broker.
> - `SegmentState` enum — `ACTIVE` or `SEALED`.
> 
> ### Admin API
> 
> **Scalable Topics** (`/admin/v2/scalable`):
> 
> | Method | Path | Description |
> |--------|------|-------------|
> | `GET` | `/{tenant}/{namespace}` | List scalable topics |
> | `PUT` | `/{tenant}/{namespace}/{topic}` | Create scalable topic |
> | `GET` | `/{tenant}/{namespace}/{topic}` | Get topic metadata |
> | `DELETE` | `/{tenant}/{namespace}/{topic}` | Delete scalable topic |
> | `GET` | `/{tenant}/{namespace}/{topic}/stats` | Get aggregated stats
> for the scalable topic (segment counts, per-segment rates,
> per-subscription state, consumer counts) |
> | `PUT` | `/{tenant}/{namespace}/{topic}/subscriptions/{subscription}`
> | Create a subscription on the scalable topic. The controller
> propagates it to all active segments by issuing `createSubscription`
> on each `segment://` topic via the Segments admin API. |
> | `DELETE` | `/{tenant}/{namespace}/{topic}/subscriptions/{subscription}`
> | Delete a subscription. The controller unregisters all consumers and
> deletes the subscription from every segment. |
> | `POST` | `/{tenant}/{namespace}/{topic}/split/{segmentId}` | Split a 
> segment |
> | `POST` | `/{tenant}/{namespace}/{topic}/merge/{segmentId1}/{segmentId2}`
> | Merge two segments |
> 
> **Segments** (`/admin/v2/segments`):
> 
> | Method | Path | Description |
> |--------|------|-------------|
> | `PUT` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Create segment topic 
> |
> | `POST` | `/{tenant}/{namespace}/{topic}/{descriptor}/terminate` |
> Terminate segment |
> | `DELETE` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Delete
> segment topic |
> 
> ### Admin Client API
> 
> New `ScalableTopics` interface on `PulsarAdmin`:
> 
> - `listScalableTopics(namespace)` / `listScalableTopicsAsync(namespace)`
> - `createScalableTopic(topic, numSegments)` / `createScalableTopicAsync(...)`
> - `getMetadata(topic)` / `getMetadataAsync(topic)`
> - `getStats(topic)` / `getStatsAsync(topic)`
> - `deleteScalableTopic(topic)` / `deleteScalableTopicAsync(topic)`
> - `createSubscription(topic, subscription)` / `createSubscriptionAsync(...)`
> - `deleteSubscription(topic, subscription)` / `deleteSubscriptionAsync(...)`
> - `splitSegment(topic, segmentId)` / `splitSegmentAsync(...)`
> - `mergeSegments(topic, segmentId1, segmentId2)` / `mergeSegmentsAsync(...)`
> - `createSegment(segmentTopic, subscriptions)` / `createSegmentAsync(...)`
> - `terminateSegment(segmentTopic)` / `terminateSegmentAsync(...)`
> - `deleteSegment(segmentTopic, force)` / `deleteSegmentAsync(...)`
> 
> ### Configuration
> 
> | Property | Default | Description |
> |----------|---------|-------------|
> | `scalableTopicEnabled` | `true` | Enable scalable topic support on
> the broker |
> 
> Additional configuration for auto-split thresholds and consumer
> controller session grace period will be specified in follow-up
> sub-PIPs.
> 
> ### Metadata Store Paths
> 
> | Path | Content |
> |------|---------|
> | `/topics/{tenant}/{namespace}/{topic}` | `ScalableTopicMetadata`
> JSON — segment DAG and global topic state |
> | `/topics/{tenant}/{namespace}/{topic}/controller` | Leader broker
> URL (ephemeral) |
> | `/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}`
> | `SubscriptionMetadata` — subscription-level config and the persisted
> set of consumer registrations |
> | 
> `/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/consumers/{consumerName}`
> | `ConsumerRegistration` — the durable session: consumer name and its
> current segment assignment. Keep-alive state (connected/disconnected,
> grace-period timer) is in-memory on the controller leader only and is
> **not** persisted here. |
> 
> ---
> 
> ## Backward & Forward Compatibility
> 
> ### Upgrade
> 
> The controller is a new component with no interaction with existing
> partitioned or non-partitioned topics. Enabling `scalableTopicEnabled`
> activates the new code paths. Existing topics are unaffected.
> 
> ### Downgrade / Rollback
> 
> Scalable topic metadata and segment data remain in the metadata store
> and BookKeeper. Rolling back to a version without scalable topic
> support leaves this data intact but inaccessible. Upgrading again
> restores access.
> 
> ### Client Compatibility
> 
> The existing Pulsar client SDK rejects `topic://` and `segment://`
> domains with a clear error message directing users to the V5 client
> SDK. This check is enforced in `PulsarClientImpl` for producer,
> consumer, and reader creation paths.
> 
> ---
> 
> ## Security Considerations
> 
> The controller follows the same tenant/namespace/topic authorization
> model as existing Pulsar topics:
> 
> - Admin API operations require topic-level admin permissions.
> - DAG watch sessions require topic-level lookup permissions.
> - Consumer registration requires topic-level consume permissions.
> - Segment operations are internal and authenticated via the broker's
> internal admin client.
> 
> The controller leader lock in the metadata store is accessible only to
> authenticated brokers.
> 
> ---
> 
> ## Links
> 
> - Parent PIP: [PIP-460: Scalable Topics](pip-460.md)
> - V5 Client API: [PIP-466: New Java Client API (V5)](pip-466.md)
> 
> --
> Matteo Merli
> <[email protected]>
> 
> 

Reply via email to