+1 nonbinding

Lari Hotari <[email protected]> 于2026年4月21日周二 00:44写道:

> +1 (binding)
>
> -Lari
>
> On 2026/04/17 04:03:36 Matteo Merli wrote:
> > Matteo Merli <[email protected]>
> >
> > Mon, Apr 13, 11:11 PM (3 days ago)
> > to Dev
> > https://github.com/apache/pulsar/pull/25516
> >
> >
> > -----
> >
> > # PIP-468: Scalable Topic Controller
> >
> > *Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*
> >
> > ## Motivation
> >
> > [PIP-460](pip-460.md) introduces scalable topics as a new topic type
> > in Pulsar, built on a DAG of range segments that can be split and
> > merged to scale independently of the initial configuration. PIP-460
> > defines the overall vision but defers the detailed design of each
> > component to dedicated sub-PIPs.
> >
> > This PIP specifies the **Scalable Topic Controller** — the broker-side
> > component responsible for:
> >
> > 1. **Segment lifecycle management** — creating, terminating, and
> > deleting the underlying segment topics that back a scalable topic.
> > 2. **Segment layout coordination** — executing split and merge
> > operations with correct ordering guarantees so that no messages are
> > lost and all subscription cursors exist before producers are
> > redirected.
> > 3. **Consumer assignment** — coordinating which consumers receive
> > messages from which segments for a given subscription.
> > 4. **Leader election** — ensuring exactly one broker acts as the
> > controller for each scalable topic, with automatic failover.
> > 5. **DAG watch sessions** — pushing topology updates to connected
> > clients (producers and consumers) when the segment layout changes.
> >
> > Without a well-defined controller, split and merge operations would be
> > unsafe: producers could write to segments that have no subscription
> > cursors, consumers could miss messages during topology changes, and
> > concurrent layout modifications could corrupt the segment DAG.
> >
> > ---
> >
> > ## Design
> >
> > ### Architecture Overview
> >
> > The controller subsystem consists of four layers:
> >
> > ```
> > ┌──────────────────────────────────────────────────────────┐
> > │                    Admin API Layer                        │
> > │         ScalableTopics REST  +  Segments REST             │
> > └──────────────┬───────────────────────┬───────────────────┘
> >                │                       │
> > ┌──────────────▼───────────────────────▼───────────────────┐
> > │                  ScalableTopicService                     │
> > │         Per-broker singleton; manages controllers         │
> > └──────────────────────────┬───────────────────────────────┘
> >                            │
> > ┌──────────────────────────▼───────────────────────────────┐
> > │              ScalableTopicController (per topic)          │
> > │   Leader-elected; owns layout, split/merge, consumers     │
> > │                                                           │
> > │   ┌─────────────────┐  ┌───────────────────────────────┐ │
> > │   │  SegmentLayout   │  │  SubscriptionCoordinator (×N) │ │
> > │   │  (immutable DAG) │  │  per-subscription assignments │ │
> > │   └─────────────────┘  └───────────────────────────────┘ │
> > └──────────────────────────┬───────────────────────────────┘
> >                            │
> > ┌──────────────────────────▼───────────────────────────────┐
> > │              ScalableTopicResources                       │
> > │         Metadata store access (read/write/watch)          │
> > └──────────────────────────────────────────────────────────┘
> > ```
> >
> > ### Broker Roles and Request Routing
> >
> > A scalable topic involves three distinct broker roles. Understanding
> > which broker handles which operation is critical to the design.
> >
> > #### Any broker — DAG watch sessions
> >
> > A **DAG watch session** (scalable topic lookup) can be served by **any
> > broker** in the cluster. The session reads segment metadata from the
> > shared metadata store and registers a watch for changes. No state is
> > held on the broker beyond the watch registration. This means producers
> > and consumers can connect to any broker for topic discovery — the same
> > way regular topic lookups work today.
> >
> > When the metadata changes (due to a split or merge performed by the
> > controller leader), the metadata store notification fires on whichever
> > broker is serving the watch session, and that broker pushes the
> > updated DAG to the client.
> >
> > #### Controller leader — layout mutations and consumer assignment
> >
> > Each scalable topic has exactly one **controller leader** across the
> > cluster, elected via the metadata store. The controller leader is the
> > only broker that can:
> >
> > - Execute **split** and **merge** operations (the multi-step protocols
> > described below).
> > - Accept **consumer registrations** and compute segment-to-consumer
> assignments.
> > - **Notify consumers** of assignment changes after topology updates.
> >
> > Clients discover the controller leader's broker URL through the DAG
> > watch session response (`controller_broker_url` field). StreamConsumer
> > and CheckpointConsumer clients then connect directly to the controller
> > leader to register and receive assignments.
> >
> > Admin API requests for split and merge are routed to the
> > `ScalableTopicService` on any broker, which obtains (or creates) a
> > local `ScalableTopicController` that participates in leader election.
> > If the local controller is the leader, it executes the operation
> > directly. If not, the request must be redirected to the leader (or the
> > controller must first win the election). The split and merge admin
> > endpoints do not require the request to land on the leader broker —
> > the `ScalableTopicService` handles this transparently.
> >
> > #### Segment-owning brokers — produce and consume
> >
> > Individual **segment topics** (`segment://`) are regular persistent
> > topics from the broker's perspective. They are distributed across the
> > cluster by the standard Pulsar **load manager** and namespace bundle
> > assignment — the same mechanism used for `persistent://` topics. Each
> > segment topic is owned by exactly one broker at any time, determined
> > by its namespace bundle hash.
> >
> > The controller leader does not need to own any of the segment topics.
> > When the controller needs to operate on a segment (create, terminate,
> > delete, discover subscriptions), it uses the **Segments admin API**,
> > which routes the request to whichever broker currently owns that
> > segment's namespace bundle. This decoupling means the controller can
> > coordinate segments spread across many brokers without requiring
> > co-location.
> >
> > ```
> > ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
> > │  Broker A    │     │  Broker B    │     │  Broker C    │
> > │              │     │  (controller │     │              │
> > │  segment-0   │     │   leader)    │     │  segment-2   │
> > │  segment-1   │     │              │     │              │
> > │              │     │  DAG watch   │     │  DAG watch   │
> > │  DAG watch   │     │  sessions    │     │  sessions    │
> > │  sessions    │     │              │     │              │
> > └─────────────┘     └─────────────┘     └─────────────┘
> >                            │
> >                     Split/merge ops
> >                     use admin API to
> >                     reach segments on
> >                     Broker A and C
> > ```
> >
> > In this example, Broker B is the controller leader but owns no
> > segments. When it executes a split of segment-0, it calls the Segments
> > admin API which routes to Broker A (the owner of segment-0).
> >
> > ### Components
> >
> > #### ScalableTopicService
> >
> > A per-broker singleton, created and owned by `BrokerService`. It
> > manages the lifecycle of all `ScalableTopicController` instances on
> > the broker.
> >
> > **Responsibilities:**
> >
> > - **Controller management**: maintains a map of active controllers
> > keyed by topic name. Creates controllers on demand via
> > `getOrCreateController()` and releases them on topic unload or broker
> > shutdown.
> > - **Topic lifecycle**: handles `createScalableTopic()` and
> > `deleteScalableTopic()` admin operations, including creating/deleting
> > the underlying segment topics via the Segments admin API.
> > - **Delegation**: routes `splitSegment()` and `mergeSegments()`
> > requests to the appropriate controller.
> > - **Leader state monitoring**: listens for leader election state
> > changes and re-attempts election when the current leader is lost.
> >
> > #### ScalableTopicController
> >
> > A per-topic coordinator. Only one instance across the cluster is the
> > **leader** for a given scalable topic, ensured by leader election
> > through the metadata store. The leader is the only instance that
> > modifies the segment layout or assigns consumers.
> >
> > **State:**
> >
> > | Field | Type | Description |
> > |-------|------|-------------|
> > | `topicName` | `TopicName` | The `topic://` name of the scalable topic |
> > | `currentLayout` | `SegmentLayout` | The current immutable snapshot
> > of the segment DAG |
> > | `subscriptions` | `Map<String, SubscriptionCoordinator>` |
> > Per-subscription consumer coordinators |
> > | `leaderState` | `LeaderElectionState` | Current leader election state |
> > | `leaderElection` | `LeaderElection<String>` | Metadata store leader
> > election handle; value is the broker URL |
> >
> > **Key operations:**
> >
> > - `initialize()` — loads the current metadata from the store, then
> > attempts leader election. The elected leader stores its broker service
> > URL so that clients can discover and connect to it.
> > - `splitSegment(segmentId)` — splits an active segment at its midpoint
> > (see [Split Protocol](#split-protocol)).
> > - `mergeSegments(segmentId1, segmentId2)` — merges two adjacent active
> > segments (see [Merge Protocol](#merge-protocol)).
> > - `registerConsumer(subscription, consumer)` — registers a consumer
> > and returns its initial segment assignment.
> > - `unregisterConsumer(subscription, consumer)` — removes a consumer
> > and triggers rebalancing.
> > - `getLeaderBrokerUrl()` — returns the current leader's broker URL,
> > used by clients to connect to the controller.
> >
> > #### SegmentLayout
> >
> > An immutable, versioned snapshot of the segment DAG. All mutation
> > methods (`splitSegment`, `mergeSegments`, `pruneSegment`) return a
> > **new** `SegmentLayout` instance — the original is never modified.
> > This ensures safe concurrent reads without locking.
> >
> > **Fields:**
> >
> > | Field | Type | Description |
> > |-------|------|-------------|
> > | `epoch` | `long` | Monotonically increasing version; incremented on
> > every layout change |
> > | `nextSegmentId` | `long` | Counter for assigning IDs to new segments |
> > | `allSegments` | `Map<Long, SegmentInfo>` | Complete DAG: all
> > segments (active + sealed) |
> > | `activeSegments` | `Map<Long, SegmentInfo>` | Filtered view: only
> > `ACTIVE` segments |
> >
> > **Key operations:**
> >
> > - `findActiveSegment(hash)` — returns the active segment whose hash
> > range contains the given hash value. Used by producers for message
> > routing.
> > - `splitSegment(segmentId)` — validates the segment is active,
> > computes the midpoint of its hash range, creates two child
> > `SegmentInfo` records covering `[start, mid]` and `[mid+1, end]`,
> > seals the parent, and returns a new layout with incremented epoch.
> > - `mergeSegments(id1, id2)` — validates both segments are active and
> > adjacent (the end of one equals `start - 1` of the other), creates a
> > merged `SegmentInfo` covering the combined range, seals both parents,
> > and returns a new layout.
> > - `getLineage(segmentId)` — returns the full ancestor + descendant
> > chain for a segment, used for DAG traversal during catch-up reads.
> > - `toMetadata()` / `fromMetadata()` — converts to/from the persisted
> > `ScalableTopicMetadata` format.
> >
> > #### SegmentInfo
> >
> > A record representing a single segment in the DAG:
> >
> > | Field | Type | Description |
> > |-------|------|-------------|
> > | `segmentId` | `long` | Unique, monotonically increasing identifier |
> > | `hashRange` | `HashRange` | The `[start, end]` inclusive hash range
> > this segment covers |
> > | `state` | `SegmentState` | `ACTIVE` or `SEALED` |
> > | `parentIds` | `List<Long>` | Parent segments (empty for root segments)
> |
> > | `childIds` | `List<Long>` | Child segments (empty for leaf/active
> segments) |
> > | `createdAtEpoch` | `long` | Layout epoch in which this segment was
> > created (`0` for the initial segments) |
> > | `sealedAtEpoch` | `long` | Layout epoch in which this segment was
> > sealed; only meaningful when `state == SEALED`. `0` when the segment
> > is `ACTIVE`; this sentinel is unambiguous because sealing always
> > happens as part of a split or merge, which increments the epoch — so a
> > segment can never be sealed at epoch `0`. |
> >
> > Factory methods: `SegmentInfo.active(id, range, epoch)` and
> > `SegmentInfo.sealed(...)`.
> >
> > #### SubscriptionCoordinator
> >
> > Manages segment-to-consumer assignments for a single subscription
> > within a scalable topic.
> >
> > **Assignment strategy:** round-robin. Active segments are sorted by
> > hash range start; consumers are sorted by name. Segments are
> > distributed evenly across consumers. When a consumer is added or
> > removed, or when the layout changes (split/merge), the coordinator
> > recomputes assignments and pushes updates to affected consumers.
> >
> > **Key operations:**
> >
> > - `addConsumer(consumer)` — adds a consumer, rebalances, and returns
> > the full assignment map.
> > - `removeConsumer(consumer)` — removes a consumer and redistributes
> > its segments.
> > - `onLayoutChange(newLayout)` — called after a split or merge;
> > recomputes all assignments against the new set of active segments.
> >
> > #### ConsumerRegistration and ConsumerAssignment
> >
> > `ConsumerRegistration` is a record identifying a connected consumer:
> >
> > | Field | Type | Description |
> > |-------|------|-------------|
> > | `consumerId` | `long` | Protocol-level consumer ID |
> > | `consumerName` | `String` | Stable name chosen by the client |
> > | `cnx` | `TransportCnx` | The connection to push assignment updates |
> >
> > `ConsumerAssignment` is the assignment sent to a consumer:
> >
> > | Field | Type | Description |
> > |-------|------|-------------|
> > | `layoutEpoch` | `long` | The layout epoch this assignment is based on |
> > | `assignedSegments` | `List<AssignedSegment>` | The segments assigned
> > to this consumer |
> >
> > Each `AssignedSegment` contains: `segmentId`, `hashRange`, and
> > `underlyingTopicName` (the `segment://` topic name the consumer should
> > connect to).
> >
> > ### Metadata Store Schema
> >
> > Scalable topic metadata is stored in the metadata store under a
> > well-known path structure:
> >
> > ```
> > /topics/{tenant}/{namespace}/{encodedTopicName}
> > ```
> >
> > The value at this path is a JSON-serialized `ScalableTopicMetadata`:
> >
> > ```json
> > {
> >   "epoch": 3,
> >   "nextSegmentId": 5,
> >   "segments": {
> >     "0": { "segmentId": 0, "hashRange": {"start": 0, "end": 65535},
> > "state": "SEALED", "parentIds": [], "childIds": [1, 2],
> > "createdAtEpoch": 0, "sealedAtEpoch": 1 },
> >     "1": { "segmentId": 1, "hashRange": {"start": 0, "end": 32767},
> > "state": "SEALED", "parentIds": [0], "childIds": [3, 4],
> > "createdAtEpoch": 1, "sealedAtEpoch": 3 },
> >     "2": { "segmentId": 2, "hashRange": {"start": 32768, "end":
> > 65535}, "state": "ACTIVE", "parentIds": [0], "childIds": [],
> > "createdAtEpoch": 1, "sealedAtEpoch": 0 },
> >     "3": { "segmentId": 3, "hashRange": {"start": 0, "end": 16383},
> > "state": "ACTIVE", "parentIds": [1], "childIds": [], "createdAtEpoch":
> > 3, "sealedAtEpoch": 0 },
> >     "4": { "segmentId": 4, "hashRange": {"start": 16384, "end":
> > 32767}, "state": "ACTIVE", "parentIds": [1], "childIds": [],
> > "createdAtEpoch": 3, "sealedAtEpoch": 0 }
> >   },
> >   "properties": {}
> > }
> > ```
> >
> > The controller leader lock is stored at:
> >
> > ```
> > /topics/{tenant}/{namespace}/{encodedTopicName}/controller
> > ```
> >
> > The value is the broker service URL of the elected leader.
> >
> > ### DAG Watch Sessions
> >
> > As described in [Broker Roles](#broker-roles-and-request-routing), any
> > broker can serve a DAG watch session because the segment metadata
> > lives in the shared metadata store — no controller leader involvement
> > is required. The session is established via the binary protocol:
> >
> > 1. **Client sends `CommandScalableTopicLookup`** with a
> > client-assigned `sessionId` and the `topic://` topic name.
> > 2. **Broker creates a `DagWatchSession`** that:
> >    - Loads the current `ScalableTopicMetadata` from the store.
> >    - Resolves which broker owns each active segment's `segment://`
> > topic (via topic lookup).
> >    - Reads the controller broker URL from the leader lock path.
> >    - Registers a metadata store notification listener for changes to
> > the topic's metadata path.
> > 3. **Broker sends `CommandScalableTopicUpdate`** with the initial
> > `ScalableTopicDAG` containing the full segment DAG, broker addresses,
> > and controller URL.
> > 4. **On metadata change**, the notification listener fires. The broker
> > reloads metadata, re-resolves broker addresses, and pushes an updated
> > `CommandScalableTopicUpdate` to the client.
> > 5. **Client sends `CommandScalableTopicClose`** to tear down the session.
> >
> > The `ScalableTopicDAG` protocol message contains:
> >
> > | Field | Type | Description |
> > |-------|------|-------------|
> > | `epoch` | `uint64` | Layout epoch |
> > | `segments` | `repeated SegmentInfoProto` | Full segment DAG |
> > | `segment_brokers` | `repeated SegmentBrokerAddress` | Broker URL for
> > each active segment |
> > | `controller_broker_url` | `string` | Controller leader's broker URL |
> > | `controller_broker_url_tls` | `string` | TLS variant |
> >
> > ---
> >
> > ## Split Protocol
> >
> > Splitting a segment is the core scaling-out operation. The protocol is
> > carefully ordered to guarantee that **no messages are lost** and **all
> > subscription cursors exist before producers are redirected**.
> >
> > ### Step-by-step
> >
> > ```
> >            Controller                   Metadata Store          Broker(s)
> >                │                             │                     │
> >   1. Discover  │──── getSubscriptions ───────>│                     │
> >   subscriptions│<─── [sub-A, sub-B] ─────────│                     │
> >                │                             │                     │
> >   2. Create    │──── createSegment(child1, ──>│ ─── route ────────> │
> >   children     │     [sub-A, sub-B])          │                     │
> > create topic
> >                │──── createSegment(child2, ──>│ ─── route ────────> │ +
> cursors
> >                │     [sub-A, sub-B])          │                     │
> >                │                             │                     │
> >   3. Terminate │──── terminateSegment ───────>│ ─── route ────────> │
> >   parent       │     (parent)                 │                     │
> seal ML
> >                │                             │                     │
> >   4. Update    │──── CAS update ─────────────>│                     │
> >   metadata     │     (atomic)                 │                     │
> >                │                             │                     │
> >   5. Notify    │──── push to consumers ──────────────────────────> │
> >   consumers    │     (rebalance)              │                     │
> > ```
> >
> > **Step 1: Discover subscriptions.** The controller queries the parent
> > segment topic for its list of subscriptions. It first checks locally
> > (if the segment is on this broker), falling back to the admin API for
> > remote segments.
> >
> > **Step 2: Create child segment topics.** Two new `segment://` topics
> > are created via the Segments admin API. Each creation request includes
> > the list of subscriptions discovered in step 1. The Segments API
> > handler creates the persistent topic and initializes subscription
> > cursors at the `Earliest` position. The admin API routes to whichever
> > broker owns the namespace bundle for each segment, so child segments
> > may be created on different brokers.
> >
> > **Step 3: Terminate parent segment.** The parent segment topic is
> > terminated via the Segments admin API, which writes a termination
> > marker to the managed ledger. After termination, producers writing to
> > the parent receive `TopicTerminated` and must re-route to child
> > segments.
> >
> > **Step 4: Atomic metadata update.** The controller issues a
> > compare-and-swap (CAS) update to the metadata store. The update
> > transitions the parent to `SEALED` state with child pointers, and adds
> > the two new `ACTIVE` child segments. The CAS guarantees atomicity — if
> > another operation modified the metadata concurrently, the update fails
> > and must be retried.
> >
> > **Step 5: Notify consumers.** All `SubscriptionCoordinator` instances
> > are notified of the layout change. They recompute segment-to-consumer
> > assignments and push updates to connected consumers.
> >
> > ### Why this ordering matters
> >
> > - Steps 1-2 **before** step 4: if we updated metadata first, clients
> > would discover the new segments immediately. Producers would start
> > writing to child segments that have no subscription cursors yet,
> > causing those messages to be missed by consumers.
> > - Step 3 **before** step 4: terminating the parent before the metadata
> > update ensures that by the time clients see the new layout, the parent
> > is already sealed. There is no window where both parent and children
> > are writable.
> > - Step 2 creates cursors at `Earliest`: this is safe because the child
> > topics are brand new (empty). The cursor starts at the beginning, and
> > the first message written by a redirected producer will be the first
> > message consumed.
> >
> > ---
> >
> > ## Merge Protocol
> >
> > Merging two adjacent segments is the core scaling-in operation. The
> > protocol follows the same ordering principle: **create first, update
> > metadata last**.
> >
> > ### Step-by-step
> >
> > **Step 1: Discover subscriptions.** The controller queries both parent
> > segments for their subscriptions and takes the union, so no
> > subscription is lost.
> >
> > **Step 2: Create merged segment topic.** A single new `segment://`
> > topic is created with all discovered subscriptions.
> >
> > **Step 3: Terminate both parent segments.** Both parents are
> > terminated via the admin API. This is done sequentially to avoid a
> > race where producers of one parent are still writing while the other
> > is already sealed.
> >
> > **Step 4: Atomic metadata update.** The CAS update seals both parents
> > with child pointers to the merged segment, and adds the new `ACTIVE`
> > merged segment.
> >
> > **Step 5: Notify consumers.** Same as split — rebalance and push.
> >
> > ### Cross-broker coordination
> >
> > Unlike splits, merges inherently involve multiple brokers because the
> > two parent segments may be owned by different brokers (assigned by the
> > load manager). The controller leader handles this transparently: all
> > segment operations (create, terminate, delete) go through the Segments
> > admin API, which routes each request to the broker that currently owns
> > the target segment's namespace bundle. The controller does not need to
> > know or care which broker owns which segment — the admin API routing
> > handles it.
> >
> > ---
> >
> > ## Segment Topic Management via Admin API
> >
> > Segment topics are managed exclusively through a dedicated REST API at
> > `/admin/v2/segments`. This is a deliberate design choice: segment
> > topics use the `segment://` domain and must not be confused with
> > regular `persistent://` topics.
> >
> > ### Endpoints
> >
> > | Method | Path | Description |
> > |--------|------|-------------|
> > | `PUT` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Create a
> > segment topic. Optional request body with subscription names to
> > pre-create. |
> > | `POST` | `/{tenant}/{namespace}/{topic}/{descriptor}/terminate` |
> > Terminate (seal) a segment topic. |
> > | `DELETE` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Delete a
> > segment topic. |
> >
> > The `{descriptor}` is the segment's hash range and ID in the format
> > `{hexStart}-{hexEnd}-{segmentId}` (e.g., `0000-7fff-1`).
> >
> > ### Create Segment
> >
> > When creating a segment topic, the handler:
> >
> > 1. Constructs the full `segment://` topic name from the path components.
> > 2. Validates namespace bundle ownership (the request is routed to the
> > owning broker).
> > 3. Creates the persistent topic via `BrokerService.getOrCreateTopic()`.
> > 4. For each subscription in the request body, creates a cursor at the
> > `Earliest` position.
> >
> > ### Terminate Segment
> >
> > Terminates the managed ledger backing the segment topic. After
> > termination, the topic accepts no further writes, and producers
> > receive `TopicTerminated`.
> >
> > ### Delete Segment
> >
> > Deletes the segment topic and its managed ledger. Used during scalable
> > topic deletion to clean up all underlying storage.
> >
> > ---
> >
> > ## Consumer Assignment
> >
> > Consumer assignment behavior depends on the consumer type:
> >
> > - **StreamConsumer** (ordered, cumulative ack) — requires coordinated
> > segment-to-consumer assignment from the controller leader. Each active
> > segment is owned by exactly one consumer at any time to preserve
> > per-segment ordering. This is the subject of the rest of this section.
> > - **CheckpointConsumer** (unmanaged, for connectors) — same as
> > StreamConsumer from a controller perspective: it registers with the
> > controller leader and receives an explicit segment assignment.
> > - **QueueConsumer** (unordered, individual ack) — **does not require
> > any controller-side assignment**. Because there is no ordering
> > requirement, every queue consumer attaches directly to **all**
> > segments of the scalable topic — both `ACTIVE` and `SEALED` — and each
> > segment-owning broker independently performs round-robin delivery
> > across the queue consumers attached to that segment. New segments
> > produced by a split are picked up transparently via the DAG watch
> > session push; sealed segments keep serving messages until their
> > backlog drains and are then dropped from the set. The controller
> > leader is not in the data path for queue consumers and no
> > `SubscriptionCoordinator` is involved for them.
> >
> > ### Registration Flow (StreamConsumer / CheckpointConsumer)
> >
> > 1. A `StreamConsumer` connects to the controller broker (discovered
> > via the DAG watch session's `controller_broker_url`).
> > 2. The consumer sends a registration request with its `subscription`
> > name and `consumerName`.
> > 3. The controller's `registerConsumer()` method routes to the
> > appropriate `SubscriptionCoordinator` (created on first use for that
> > subscription).
> > 4. The coordinator adds the consumer, recomputes assignments, and
> > returns the consumer's `ConsumerAssignment` — a list of `(segmentId,
> > hashRange, segmentTopicName)` tuples.
> > 5. The consumer connects to each assigned `segment://` topic and
> > begins consuming.
> >
> > ### Rebalancing
> >
> > Rebalancing occurs when:
> >
> > - A consumer is added or removed.
> > - A split or merge changes the set of active segments.
> >
> > The rebalancing algorithm is round-robin:
> >
> > 1. Collect all active segments, sorted by hash range start.
> > 2. Collect all consumers, sorted by name (for deterministic ordering).
> > 3. Assign segments to consumers in round-robin order.
> > 4. For each consumer whose assignment changed, push the new
> > `ConsumerAssignment`.
> >
> > ### Consumer Session Lifecycle
> >
> > A consumer's registration and its segment assignment are treated as a
> > **persistent session**, not as a transient association tied to a TCP
> > connection. This is a deliberate departure from the existing Pulsar
> > consumer model, where consumer liveness is asserted purely by the TCP
> > connection to the broker.
> >
> > **Rationale.** Scalable topic consumer assignments have non-trivial
> > cost: when a consumer disappears, its segments are redistributed among
> > the remaining consumers, each of which may need to reconnect to
> > different segment brokers and (for ordered consumers) drain in-flight
> > messages before the new assignment takes effect. Treating a brief
> > disconnection as a full consumer loss would cause unnecessary
> > rebalancing in common scenarios such as a consumer process restart or
> > a broker restart.
> >
> > **What is persisted vs. in-memory.**
> >
> > The **session itself is persisted** — the keep-alive state is not.
> >
> > - **Persisted in the metadata store (the session):** the
> > `ConsumerRegistration` — consumer name and its current segment
> > assignment — keyed by `consumerName` under the subscription path. Once
> > a consumer successfully registers, this entry is durable and outlives
> > TCP disconnects, client restarts, and controller leader failovers. The
> > assignment survives failover without forcing consumers to re-register.
> > - **In-memory on the controller leader (the keep-alive):** whether
> > each consumer is currently connected, and the grace-period timer that
> > runs while it is disconnected. Keep-alive signals are **not** written
> > to the metadata store; the leader observes the consumer's connection
> > state directly and tracks the timer in RAM. This avoids a metadata
> > store write on every liveness tick.
> >
> > **Session semantics (steady state).**
> >
> > - When a consumer registers, the `SubscriptionCoordinator` writes its
> > `ConsumerRegistration` and marks it connected in-memory.
> > - If the consumer's connection drops, the leader does **not**
> > immediately remove the consumer. It transitions the in-memory state to
> > "disconnected" and starts a configurable **session grace period**
> > timer for that consumer.
> > - If the same `consumerName` reconnects within the grace period, the
> > timer is cancelled and the consumer resumes with the same persisted
> > assignment — no rebalance, no other consumer disturbed.
> > - If the grace period expires, the leader deletes the
> > `ConsumerRegistration` from the metadata store and triggers a
> > rebalance for the remaining consumers.
> >
> > **Behavior on controller leader failover.**
> >
> > Because liveness timers are in-memory only, they are lost when the
> > leader broker crashes. The new leader:
> >
> > 1. Loads all `ConsumerRegistration` entries for each subscription from
> > the metadata store, restoring the prior segment assignment.
> > 2. Treats every consumer as "just disconnected" and starts a fresh
> > grace-period timer for each. No timestamps carry across from the
> > previous leader.
> > 3. Consumers reconnecting to the new leader within the fresh grace
> > period resume with the same assignment — seamless from their
> > perspective.
> > 4. Consumers that fail to reconnect before the fresh timer expires are
> > evicted and their segments are redistributed.
> >
> > This means a leader failover always gives clients a full grace period
> > to reconnect, regardless of how long they had already been
> > disconnected under the old leader. The trade-off is explicit: we keep
> > the metadata store writes proportional to real membership changes
> > (register / unregister / rebalance) rather than to liveness ticks.
> >
> > **Covered scenarios.**
> >
> > - **Consumer process restart.** A client process restarts and
> > reconnects with the same `consumerName` well within the grace period —
> > the same segments are reassigned, no redistribution.
> > - **Controller leader restart.** The leader is re-elected on a
> > different broker. Consumer entries and assignments are restored from
> > the metadata store; the new leader starts a fresh grace-period timer
> > for every consumer so reconnects are transparent.
> > - **Segment-owning broker restart.** Segment topics are reassigned by
> > the standard Pulsar load manager; the consumer's segment-to-consumer
> > assignment is unaffected and reconnects to the new owning broker are
> > transparent at the consumer-assignment level.
> >
> > The grace period and related tunables are configurable per broker and
> > will be specified in a follow-up sub-PIP.
> >
> > ### Layout Change Propagation
> >
> > When a split or merge completes and the metadata is updated:
> >
> > 1. The controller reloads the metadata and creates a new `SegmentLayout`.
> > 2. `notifySubscriptions()` is called, which iterates all
> > `SubscriptionCoordinator` instances.
> > 3. Each coordinator calls `onLayoutChange(newLayout)`, which
> > recomputes assignments against the new active segments and pushes
> > updates to affected consumers.
> >
> > ---
> >
> > ## Leader Election and Failover
> >
> > ### Election
> >
> > Each `ScalableTopicController` participates in leader election via the
> > metadata store's `LeaderElection` API. The election path is:
> >
> > ```
> > /topics/{tenant}/{namespace}/{encodedTopicName}/controller
> > ```
> >
> > The elected leader writes its broker service URL as the election
> > value. Clients discover the controller by reading this value
> > (delivered as part of the DAG watch session response).
> >
> > ### Failover
> >
> > When the leader broker fails:
> >
> > 1. The metadata store detects the session loss and clears the leader
> lock.
> > 2. Other brokers with active controllers for the same topic receive a
> > `NoLeader` state change notification.
> > 3. The `ScalableTopicService.onLeaderStateChange()` handler re-invokes
> > `controller.initialize()`, which reloads metadata and re-attempts
> > election.
> > 4. The new leader restores in-memory state from the persisted
> > metadata, including all subscriptions and their consumer registrations
> > with current assignments. Consumers reconnecting within the session
> > grace period find their sessions intact and resume with the same
> > segment assignment without a rebalance (see [Consumer Session
> > Lifecycle](#consumer-session-lifecycle)).
> >
> > ### Non-leader brokers
> >
> > A broker that loses leadership or never wins it retains the controller
> > instance but all mutating operations (`splitSegment`, `mergeSegments`,
> > `registerConsumer`) throw `IllegalStateException` after a
> > `checkLeader()` guard. The controller remains available for read
> > operations like `getLayout()`.
> >
> > ---
> >
> > ## Scalable Topic Lifecycle
> >
> > ### Creation
> >
> > ```
> > Admin API (PUT /admin/v2/scalable/{tenant}/{namespace}/{topic})
> >   └─> ScalableTopicService.createScalableTopic(topic, numInitialSegments)
> >         ├─>
> ScalableTopicController.createInitialMetadata(numInitialSegments)
> >         │     └─> Divides [0x0000, 0xFFFF] into N equal ranges
> >         │         Creates N SegmentInfo records (ACTIVE, no parents)
> >         │         Returns ScalableTopicMetadata with epoch=0
> >         ├─> ScalableTopicResources.createScalableTopicAsync(topic,
> metadata)
> >         │     └─> Writes metadata to store at /topics/{t}/{ns}/{topic}
> >         └─> For each segment: createSegmentAsync via Segments admin API
> >               └─> Creates segment:// topic on owning broker
> > ```
> >
> > ### Deletion
> >
> > ```
> > Admin API (DELETE /admin/v2/scalable/{tenant}/{namespace}/{topic})
> >   └─> ScalableTopicService.deleteScalableTopic(topic)
> >         ├─> releaseController(topic)
> >         │     └─> Closes leader election, removes from controllers map
> >         ├─> Load metadata from store
> >         ├─> For each segment: deleteSegmentAsync via Segments admin API
> >         │     └─> Deletes segment:// topic on owning broker (best-effort)
> >         └─> ScalableTopicResources.deleteScalableTopicAsync(topic)
> >               └─> Removes metadata from store
> > ```
> >
> > ---
> >
> > ## Public-Facing Changes
> >
> > ### Binary Protocol
> >
> > Three new protocol commands (added to `PulsarApi.proto`):
> >
> > | Command | Direction | Field ID | Description |
> > |---------|-----------|----------|-------------|
> > | `CommandScalableTopicLookup` | Client → Broker | 70 | Initiates a
> > DAG watch session |
> > | `CommandScalableTopicUpdate` | Broker → Client | 71 | Initial
> > response and subsequent pushed updates |
> > | `CommandScalableTopicClose` | Client → Broker | 72 | Closes the DAG
> > watch session |
> >
> > New protocol messages:
> >
> > - `ScalableTopicDAG` — the full segment DAG with broker addresses and
> > controller URL.
> > - `SegmentInfoProto` — per-segment record in the DAG.
> > - `SegmentBrokerAddress` — maps a segment ID to its owning broker.
> > - `SegmentState` enum — `ACTIVE` or `SEALED`.
> >
> > ### Admin API
> >
> > **Scalable Topics** (`/admin/v2/scalable`):
> >
> > | Method | Path | Description |
> > |--------|------|-------------|
> > | `GET` | `/{tenant}/{namespace}` | List scalable topics |
> > | `PUT` | `/{tenant}/{namespace}/{topic}` | Create scalable topic |
> > | `GET` | `/{tenant}/{namespace}/{topic}` | Get topic metadata |
> > | `DELETE` | `/{tenant}/{namespace}/{topic}` | Delete scalable topic |
> > | `GET` | `/{tenant}/{namespace}/{topic}/stats` | Get aggregated stats
> > for the scalable topic (segment counts, per-segment rates,
> > per-subscription state, consumer counts) |
> > | `PUT` | `/{tenant}/{namespace}/{topic}/subscriptions/{subscription}`
> > | Create a subscription on the scalable topic. The controller
> > propagates it to all active segments by issuing `createSubscription`
> > on each `segment://` topic via the Segments admin API. |
> > | `DELETE` | `/{tenant}/{namespace}/{topic}/subscriptions/{subscription}`
> > | Delete a subscription. The controller unregisters all consumers and
> > deletes the subscription from every segment. |
> > | `POST` | `/{tenant}/{namespace}/{topic}/split/{segmentId}` | Split a
> segment |
> > | `POST` |
> `/{tenant}/{namespace}/{topic}/merge/{segmentId1}/{segmentId2}`
> > | Merge two segments |
> >
> > **Segments** (`/admin/v2/segments`):
> >
> > | Method | Path | Description |
> > |--------|------|-------------|
> > | `PUT` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Create segment
> topic |
> > | `POST` | `/{tenant}/{namespace}/{topic}/{descriptor}/terminate` |
> > Terminate segment |
> > | `DELETE` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Delete
> > segment topic |
> >
> > ### Admin Client API
> >
> > New `ScalableTopics` interface on `PulsarAdmin`:
> >
> > - `listScalableTopics(namespace)` / `listScalableTopicsAsync(namespace)`
> > - `createScalableTopic(topic, numSegments)` /
> `createScalableTopicAsync(...)`
> > - `getMetadata(topic)` / `getMetadataAsync(topic)`
> > - `getStats(topic)` / `getStatsAsync(topic)`
> > - `deleteScalableTopic(topic)` / `deleteScalableTopicAsync(topic)`
> > - `createSubscription(topic, subscription)` /
> `createSubscriptionAsync(...)`
> > - `deleteSubscription(topic, subscription)` /
> `deleteSubscriptionAsync(...)`
> > - `splitSegment(topic, segmentId)` / `splitSegmentAsync(...)`
> > - `mergeSegments(topic, segmentId1, segmentId2)` /
> `mergeSegmentsAsync(...)`
> > - `createSegment(segmentTopic, subscriptions)` /
> `createSegmentAsync(...)`
> > - `terminateSegment(segmentTopic)` / `terminateSegmentAsync(...)`
> > - `deleteSegment(segmentTopic, force)` / `deleteSegmentAsync(...)`
> >
> > ### Configuration
> >
> > | Property | Default | Description |
> > |----------|---------|-------------|
> > | `scalableTopicEnabled` | `true` | Enable scalable topic support on
> > the broker |
> >
> > Additional configuration for auto-split thresholds and consumer
> > controller session grace period will be specified in follow-up
> > sub-PIPs.
> >
> > ### Metadata Store Paths
> >
> > | Path | Content |
> > |------|---------|
> > | `/topics/{tenant}/{namespace}/{topic}` | `ScalableTopicMetadata`
> > JSON — segment DAG and global topic state |
> > | `/topics/{tenant}/{namespace}/{topic}/controller` | Leader broker
> > URL (ephemeral) |
> > | `/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}`
> > | `SubscriptionMetadata` — subscription-level config and the persisted
> > set of consumer registrations |
> > |
> `/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/consumers/{consumerName}`
> > | `ConsumerRegistration` — the durable session: consumer name and its
> > current segment assignment. Keep-alive state (connected/disconnected,
> > grace-period timer) is in-memory on the controller leader only and is
> > **not** persisted here. |
> >
> > ---
> >
> > ## Backward & Forward Compatibility
> >
> > ### Upgrade
> >
> > The controller is a new component with no interaction with existing
> > partitioned or non-partitioned topics. Enabling `scalableTopicEnabled`
> > activates the new code paths. Existing topics are unaffected.
> >
> > ### Downgrade / Rollback
> >
> > Scalable topic metadata and segment data remain in the metadata store
> > and BookKeeper. Rolling back to a version without scalable topic
> > support leaves this data intact but inaccessible. Upgrading again
> > restores access.
> >
> > ### Client Compatibility
> >
> > The existing Pulsar client SDK rejects `topic://` and `segment://`
> > domains with a clear error message directing users to the V5 client
> > SDK. This check is enforced in `PulsarClientImpl` for producer,
> > consumer, and reader creation paths.
> >
> > ---
> >
> > ## Security Considerations
> >
> > The controller follows the same tenant/namespace/topic authorization
> > model as existing Pulsar topics:
> >
> > - Admin API operations require topic-level admin permissions.
> > - DAG watch sessions require topic-level lookup permissions.
> > - Consumer registration requires topic-level consume permissions.
> > - Segment operations are internal and authenticated via the broker's
> > internal admin client.
> >
> > The controller leader lock in the metadata store is accessible only to
> > authenticated brokers.
> >
> > ---
> >
> > ## Links
> >
> > - Parent PIP: [PIP-460: Scalable Topics](pip-460.md)
> > - V5 Client API: [PIP-466: New Java Client API (V5)](pip-466.md)
> >
> > --
> > Matteo Merli
> > <[email protected]>
> >
> >
>

Reply via email to