Closing the vote with

3 binding +1s:
 * Matteo
 * Lari
 * Penghui

1 non-biding +1:
 * Tao

Thanks,
Matteo

--
Matteo Merli
<[email protected]>

On Tue, Apr 21, 2026 at 9:40 AM PengHui Li <[email protected]> wrote:
>
> +1 (binding)
>
> Regards,
> Penghui
>
> On Mon, Apr 20, 2026 at 11:39 PM Tao Jiuming <[email protected]> wrote:
>
> > +1 nonbinding
> >
> > Lari Hotari <[email protected]> 于2026年4月21日周二 00:44写道:
> >
> > > +1 (binding)
> > >
> > > -Lari
> > >
> > > On 2026/04/17 04:03:36 Matteo Merli wrote:
> > > > Matteo Merli <[email protected]>
> > > >
> > > > Mon, Apr 13, 11:11 PM (3 days ago)
> > > > to Dev
> > > > https://github.com/apache/pulsar/pull/25516
> > > >
> > > >
> > > > -----
> > > >
> > > > # PIP-468: Scalable Topic Controller
> > > >
> > > > *Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*
> > > >
> > > > ## Motivation
> > > >
> > > > [PIP-460](pip-460.md) introduces scalable topics as a new topic type
> > > > in Pulsar, built on a DAG of range segments that can be split and
> > > > merged to scale independently of the initial configuration. PIP-460
> > > > defines the overall vision but defers the detailed design of each
> > > > component to dedicated sub-PIPs.
> > > >
> > > > This PIP specifies the **Scalable Topic Controller** — the broker-side
> > > > component responsible for:
> > > >
> > > > 1. **Segment lifecycle management** — creating, terminating, and
> > > > deleting the underlying segment topics that back a scalable topic.
> > > > 2. **Segment layout coordination** — executing split and merge
> > > > operations with correct ordering guarantees so that no messages are
> > > > lost and all subscription cursors exist before producers are
> > > > redirected.
> > > > 3. **Consumer assignment** — coordinating which consumers receive
> > > > messages from which segments for a given subscription.
> > > > 4. **Leader election** — ensuring exactly one broker acts as the
> > > > controller for each scalable topic, with automatic failover.
> > > > 5. **DAG watch sessions** — pushing topology updates to connected
> > > > clients (producers and consumers) when the segment layout changes.
> > > >
> > > > Without a well-defined controller, split and merge operations would be
> > > > unsafe: producers could write to segments that have no subscription
> > > > cursors, consumers could miss messages during topology changes, and
> > > > concurrent layout modifications could corrupt the segment DAG.
> > > >
> > > > ---
> > > >
> > > > ## Design
> > > >
> > > > ### Architecture Overview
> > > >
> > > > The controller subsystem consists of four layers:
> > > >
> > > > ```
> > > > ┌──────────────────────────────────────────────────────────┐
> > > > │                    Admin API Layer                        │
> > > > │         ScalableTopics REST  +  Segments REST             │
> > > > └──────────────┬───────────────────────┬───────────────────┘
> > > >                │                       │
> > > > ┌──────────────▼───────────────────────▼───────────────────┐
> > > > │                  ScalableTopicService                     │
> > > > │         Per-broker singleton; manages controllers         │
> > > > └──────────────────────────┬───────────────────────────────┘
> > > >                            │
> > > > ┌──────────────────────────▼───────────────────────────────┐
> > > > │              ScalableTopicController (per topic)          │
> > > > │   Leader-elected; owns layout, split/merge, consumers     │
> > > > │                                                           │
> > > > │   ┌─────────────────┐  ┌───────────────────────────────┐ │
> > > > │   │  SegmentLayout   │  │  SubscriptionCoordinator (×N) │ │
> > > > │   │  (immutable DAG) │  │  per-subscription assignments │ │
> > > > │   └─────────────────┘  └───────────────────────────────┘ │
> > > > └──────────────────────────┬───────────────────────────────┘
> > > >                            │
> > > > ┌──────────────────────────▼───────────────────────────────┐
> > > > │              ScalableTopicResources                       │
> > > > │         Metadata store access (read/write/watch)          │
> > > > └──────────────────────────────────────────────────────────┘
> > > > ```
> > > >
> > > > ### Broker Roles and Request Routing
> > > >
> > > > A scalable topic involves three distinct broker roles. Understanding
> > > > which broker handles which operation is critical to the design.
> > > >
> > > > #### Any broker — DAG watch sessions
> > > >
> > > > A **DAG watch session** (scalable topic lookup) can be served by **any
> > > > broker** in the cluster. The session reads segment metadata from the
> > > > shared metadata store and registers a watch for changes. No state is
> > > > held on the broker beyond the watch registration. This means producers
> > > > and consumers can connect to any broker for topic discovery — the same
> > > > way regular topic lookups work today.
> > > >
> > > > When the metadata changes (due to a split or merge performed by the
> > > > controller leader), the metadata store notification fires on whichever
> > > > broker is serving the watch session, and that broker pushes the
> > > > updated DAG to the client.
> > > >
> > > > #### Controller leader — layout mutations and consumer assignment
> > > >
> > > > Each scalable topic has exactly one **controller leader** across the
> > > > cluster, elected via the metadata store. The controller leader is the
> > > > only broker that can:
> > > >
> > > > - Execute **split** and **merge** operations (the multi-step protocols
> > > > described below).
> > > > - Accept **consumer registrations** and compute segment-to-consumer
> > > assignments.
> > > > - **Notify consumers** of assignment changes after topology updates.
> > > >
> > > > Clients discover the controller leader's broker URL through the DAG
> > > > watch session response (`controller_broker_url` field). StreamConsumer
> > > > and CheckpointConsumer clients then connect directly to the controller
> > > > leader to register and receive assignments.
> > > >
> > > > Admin API requests for split and merge are routed to the
> > > > `ScalableTopicService` on any broker, which obtains (or creates) a
> > > > local `ScalableTopicController` that participates in leader election.
> > > > If the local controller is the leader, it executes the operation
> > > > directly. If not, the request must be redirected to the leader (or the
> > > > controller must first win the election). The split and merge admin
> > > > endpoints do not require the request to land on the leader broker —
> > > > the `ScalableTopicService` handles this transparently.
> > > >
> > > > #### Segment-owning brokers — produce and consume
> > > >
> > > > Individual **segment topics** (`segment://`) are regular persistent
> > > > topics from the broker's perspective. They are distributed across the
> > > > cluster by the standard Pulsar **load manager** and namespace bundle
> > > > assignment — the same mechanism used for `persistent://` topics. Each
> > > > segment topic is owned by exactly one broker at any time, determined
> > > > by its namespace bundle hash.
> > > >
> > > > The controller leader does not need to own any of the segment topics.
> > > > When the controller needs to operate on a segment (create, terminate,
> > > > delete, discover subscriptions), it uses the **Segments admin API**,
> > > > which routes the request to whichever broker currently owns that
> > > > segment's namespace bundle. This decoupling means the controller can
> > > > coordinate segments spread across many brokers without requiring
> > > > co-location.
> > > >
> > > > ```
> > > > ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
> > > > │  Broker A    │     │  Broker B    │     │  Broker C    │
> > > > │              │     │  (controller │     │              │
> > > > │  segment-0   │     │   leader)    │     │  segment-2   │
> > > > │  segment-1   │     │              │     │              │
> > > > │              │     │  DAG watch   │     │  DAG watch   │
> > > > │  DAG watch   │     │  sessions    │     │  sessions    │
> > > > │  sessions    │     │              │     │              │
> > > > └─────────────┘     └─────────────┘     └─────────────┘
> > > >                            │
> > > >                     Split/merge ops
> > > >                     use admin API to
> > > >                     reach segments on
> > > >                     Broker A and C
> > > > ```
> > > >
> > > > In this example, Broker B is the controller leader but owns no
> > > > segments. When it executes a split of segment-0, it calls the Segments
> > > > admin API which routes to Broker A (the owner of segment-0).
> > > >
> > > > ### Components
> > > >
> > > > #### ScalableTopicService
> > > >
> > > > A per-broker singleton, created and owned by `BrokerService`. It
> > > > manages the lifecycle of all `ScalableTopicController` instances on
> > > > the broker.
> > > >
> > > > **Responsibilities:**
> > > >
> > > > - **Controller management**: maintains a map of active controllers
> > > > keyed by topic name. Creates controllers on demand via
> > > > `getOrCreateController()` and releases them on topic unload or broker
> > > > shutdown.
> > > > - **Topic lifecycle**: handles `createScalableTopic()` and
> > > > `deleteScalableTopic()` admin operations, including creating/deleting
> > > > the underlying segment topics via the Segments admin API.
> > > > - **Delegation**: routes `splitSegment()` and `mergeSegments()`
> > > > requests to the appropriate controller.
> > > > - **Leader state monitoring**: listens for leader election state
> > > > changes and re-attempts election when the current leader is lost.
> > > >
> > > > #### ScalableTopicController
> > > >
> > > > A per-topic coordinator. Only one instance across the cluster is the
> > > > **leader** for a given scalable topic, ensured by leader election
> > > > through the metadata store. The leader is the only instance that
> > > > modifies the segment layout or assigns consumers.
> > > >
> > > > **State:**
> > > >
> > > > | Field | Type | Description |
> > > > |-------|------|-------------|
> > > > | `topicName` | `TopicName` | The `topic://` name of the scalable
> > topic |
> > > > | `currentLayout` | `SegmentLayout` | The current immutable snapshot
> > > > of the segment DAG |
> > > > | `subscriptions` | `Map<String, SubscriptionCoordinator>` |
> > > > Per-subscription consumer coordinators |
> > > > | `leaderState` | `LeaderElectionState` | Current leader election
> > state |
> > > > | `leaderElection` | `LeaderElection<String>` | Metadata store leader
> > > > election handle; value is the broker URL |
> > > >
> > > > **Key operations:**
> > > >
> > > > - `initialize()` — loads the current metadata from the store, then
> > > > attempts leader election. The elected leader stores its broker service
> > > > URL so that clients can discover and connect to it.
> > > > - `splitSegment(segmentId)` — splits an active segment at its midpoint
> > > > (see [Split Protocol](#split-protocol)).
> > > > - `mergeSegments(segmentId1, segmentId2)` — merges two adjacent active
> > > > segments (see [Merge Protocol](#merge-protocol)).
> > > > - `registerConsumer(subscription, consumer)` — registers a consumer
> > > > and returns its initial segment assignment.
> > > > - `unregisterConsumer(subscription, consumer)` — removes a consumer
> > > > and triggers rebalancing.
> > > > - `getLeaderBrokerUrl()` — returns the current leader's broker URL,
> > > > used by clients to connect to the controller.
> > > >
> > > > #### SegmentLayout
> > > >
> > > > An immutable, versioned snapshot of the segment DAG. All mutation
> > > > methods (`splitSegment`, `mergeSegments`, `pruneSegment`) return a
> > > > **new** `SegmentLayout` instance — the original is never modified.
> > > > This ensures safe concurrent reads without locking.
> > > >
> > > > **Fields:**
> > > >
> > > > | Field | Type | Description |
> > > > |-------|------|-------------|
> > > > | `epoch` | `long` | Monotonically increasing version; incremented on
> > > > every layout change |
> > > > | `nextSegmentId` | `long` | Counter for assigning IDs to new segments
> > |
> > > > | `allSegments` | `Map<Long, SegmentInfo>` | Complete DAG: all
> > > > segments (active + sealed) |
> > > > | `activeSegments` | `Map<Long, SegmentInfo>` | Filtered view: only
> > > > `ACTIVE` segments |
> > > >
> > > > **Key operations:**
> > > >
> > > > - `findActiveSegment(hash)` — returns the active segment whose hash
> > > > range contains the given hash value. Used by producers for message
> > > > routing.
> > > > - `splitSegment(segmentId)` — validates the segment is active,
> > > > computes the midpoint of its hash range, creates two child
> > > > `SegmentInfo` records covering `[start, mid]` and `[mid+1, end]`,
> > > > seals the parent, and returns a new layout with incremented epoch.
> > > > - `mergeSegments(id1, id2)` — validates both segments are active and
> > > > adjacent (the end of one equals `start - 1` of the other), creates a
> > > > merged `SegmentInfo` covering the combined range, seals both parents,
> > > > and returns a new layout.
> > > > - `getLineage(segmentId)` — returns the full ancestor + descendant
> > > > chain for a segment, used for DAG traversal during catch-up reads.
> > > > - `toMetadata()` / `fromMetadata()` — converts to/from the persisted
> > > > `ScalableTopicMetadata` format.
> > > >
> > > > #### SegmentInfo
> > > >
> > > > A record representing a single segment in the DAG:
> > > >
> > > > | Field | Type | Description |
> > > > |-------|------|-------------|
> > > > | `segmentId` | `long` | Unique, monotonically increasing identifier |
> > > > | `hashRange` | `HashRange` | The `[start, end]` inclusive hash range
> > > > this segment covers |
> > > > | `state` | `SegmentState` | `ACTIVE` or `SEALED` |
> > > > | `parentIds` | `List<Long>` | Parent segments (empty for root
> > segments)
> > > |
> > > > | `childIds` | `List<Long>` | Child segments (empty for leaf/active
> > > segments) |
> > > > | `createdAtEpoch` | `long` | Layout epoch in which this segment was
> > > > created (`0` for the initial segments) |
> > > > | `sealedAtEpoch` | `long` | Layout epoch in which this segment was
> > > > sealed; only meaningful when `state == SEALED`. `0` when the segment
> > > > is `ACTIVE`; this sentinel is unambiguous because sealing always
> > > > happens as part of a split or merge, which increments the epoch — so a
> > > > segment can never be sealed at epoch `0`. |
> > > >
> > > > Factory methods: `SegmentInfo.active(id, range, epoch)` and
> > > > `SegmentInfo.sealed(...)`.
> > > >
> > > > #### SubscriptionCoordinator
> > > >
> > > > Manages segment-to-consumer assignments for a single subscription
> > > > within a scalable topic.
> > > >
> > > > **Assignment strategy:** round-robin. Active segments are sorted by
> > > > hash range start; consumers are sorted by name. Segments are
> > > > distributed evenly across consumers. When a consumer is added or
> > > > removed, or when the layout changes (split/merge), the coordinator
> > > > recomputes assignments and pushes updates to affected consumers.
> > > >
> > > > **Key operations:**
> > > >
> > > > - `addConsumer(consumer)` — adds a consumer, rebalances, and returns
> > > > the full assignment map.
> > > > - `removeConsumer(consumer)` — removes a consumer and redistributes
> > > > its segments.
> > > > - `onLayoutChange(newLayout)` — called after a split or merge;
> > > > recomputes all assignments against the new set of active segments.
> > > >
> > > > #### ConsumerRegistration and ConsumerAssignment
> > > >
> > > > `ConsumerRegistration` is a record identifying a connected consumer:
> > > >
> > > > | Field | Type | Description |
> > > > |-------|------|-------------|
> > > > | `consumerId` | `long` | Protocol-level consumer ID |
> > > > | `consumerName` | `String` | Stable name chosen by the client |
> > > > | `cnx` | `TransportCnx` | The connection to push assignment updates |
> > > >
> > > > `ConsumerAssignment` is the assignment sent to a consumer:
> > > >
> > > > | Field | Type | Description |
> > > > |-------|------|-------------|
> > > > | `layoutEpoch` | `long` | The layout epoch this assignment is based
> > on |
> > > > | `assignedSegments` | `List<AssignedSegment>` | The segments assigned
> > > > to this consumer |
> > > >
> > > > Each `AssignedSegment` contains: `segmentId`, `hashRange`, and
> > > > `underlyingTopicName` (the `segment://` topic name the consumer should
> > > > connect to).
> > > >
> > > > ### Metadata Store Schema
> > > >
> > > > Scalable topic metadata is stored in the metadata store under a
> > > > well-known path structure:
> > > >
> > > > ```
> > > > /topics/{tenant}/{namespace}/{encodedTopicName}
> > > > ```
> > > >
> > > > The value at this path is a JSON-serialized `ScalableTopicMetadata`:
> > > >
> > > > ```json
> > > > {
> > > >   "epoch": 3,
> > > >   "nextSegmentId": 5,
> > > >   "segments": {
> > > >     "0": { "segmentId": 0, "hashRange": {"start": 0, "end": 65535},
> > > > "state": "SEALED", "parentIds": [], "childIds": [1, 2],
> > > > "createdAtEpoch": 0, "sealedAtEpoch": 1 },
> > > >     "1": { "segmentId": 1, "hashRange": {"start": 0, "end": 32767},
> > > > "state": "SEALED", "parentIds": [0], "childIds": [3, 4],
> > > > "createdAtEpoch": 1, "sealedAtEpoch": 3 },
> > > >     "2": { "segmentId": 2, "hashRange": {"start": 32768, "end":
> > > > 65535}, "state": "ACTIVE", "parentIds": [0], "childIds": [],
> > > > "createdAtEpoch": 1, "sealedAtEpoch": 0 },
> > > >     "3": { "segmentId": 3, "hashRange": {"start": 0, "end": 16383},
> > > > "state": "ACTIVE", "parentIds": [1], "childIds": [], "createdAtEpoch":
> > > > 3, "sealedAtEpoch": 0 },
> > > >     "4": { "segmentId": 4, "hashRange": {"start": 16384, "end":
> > > > 32767}, "state": "ACTIVE", "parentIds": [1], "childIds": [],
> > > > "createdAtEpoch": 3, "sealedAtEpoch": 0 }
> > > >   },
> > > >   "properties": {}
> > > > }
> > > > ```
> > > >
> > > > The controller leader lock is stored at:
> > > >
> > > > ```
> > > > /topics/{tenant}/{namespace}/{encodedTopicName}/controller
> > > > ```
> > > >
> > > > The value is the broker service URL of the elected leader.
> > > >
> > > > ### DAG Watch Sessions
> > > >
> > > > As described in [Broker Roles](#broker-roles-and-request-routing), any
> > > > broker can serve a DAG watch session because the segment metadata
> > > > lives in the shared metadata store — no controller leader involvement
> > > > is required. The session is established via the binary protocol:
> > > >
> > > > 1. **Client sends `CommandScalableTopicLookup`** with a
> > > > client-assigned `sessionId` and the `topic://` topic name.
> > > > 2. **Broker creates a `DagWatchSession`** that:
> > > >    - Loads the current `ScalableTopicMetadata` from the store.
> > > >    - Resolves which broker owns each active segment's `segment://`
> > > > topic (via topic lookup).
> > > >    - Reads the controller broker URL from the leader lock path.
> > > >    - Registers a metadata store notification listener for changes to
> > > > the topic's metadata path.
> > > > 3. **Broker sends `CommandScalableTopicUpdate`** with the initial
> > > > `ScalableTopicDAG` containing the full segment DAG, broker addresses,
> > > > and controller URL.
> > > > 4. **On metadata change**, the notification listener fires. The broker
> > > > reloads metadata, re-resolves broker addresses, and pushes an updated
> > > > `CommandScalableTopicUpdate` to the client.
> > > > 5. **Client sends `CommandScalableTopicClose`** to tear down the
> > session.
> > > >
> > > > The `ScalableTopicDAG` protocol message contains:
> > > >
> > > > | Field | Type | Description |
> > > > |-------|------|-------------|
> > > > | `epoch` | `uint64` | Layout epoch |
> > > > | `segments` | `repeated SegmentInfoProto` | Full segment DAG |
> > > > | `segment_brokers` | `repeated SegmentBrokerAddress` | Broker URL for
> > > > each active segment |
> > > > | `controller_broker_url` | `string` | Controller leader's broker URL |
> > > > | `controller_broker_url_tls` | `string` | TLS variant |
> > > >
> > > > ---
> > > >
> > > > ## Split Protocol
> > > >
> > > > Splitting a segment is the core scaling-out operation. The protocol is
> > > > carefully ordered to guarantee that **no messages are lost** and **all
> > > > subscription cursors exist before producers are redirected**.
> > > >
> > > > ### Step-by-step
> > > >
> > > > ```
> > > >            Controller                   Metadata Store
> > Broker(s)
> > > >                │                             │                     │
> > > >   1. Discover  │──── getSubscriptions ───────>│                     │
> > > >   subscriptions│<─── [sub-A, sub-B] ─────────│                     │
> > > >                │                             │                     │
> > > >   2. Create    │──── createSegment(child1, ──>│ ─── route ────────> │
> > > >   children     │     [sub-A, sub-B])          │                     │
> > > > create topic
> > > >                │──── createSegment(child2, ──>│ ─── route ────────> │ +
> > > cursors
> > > >                │     [sub-A, sub-B])          │                     │
> > > >                │                             │                     │
> > > >   3. Terminate │──── terminateSegment ───────>│ ─── route ────────> │
> > > >   parent       │     (parent)                 │                     │
> > > seal ML
> > > >                │                             │                     │
> > > >   4. Update    │──── CAS update ─────────────>│                     │
> > > >   metadata     │     (atomic)                 │                     │
> > > >                │                             │                     │
> > > >   5. Notify    │──── push to consumers ──────────────────────────> │
> > > >   consumers    │     (rebalance)              │                     │
> > > > ```
> > > >
> > > > **Step 1: Discover subscriptions.** The controller queries the parent
> > > > segment topic for its list of subscriptions. It first checks locally
> > > > (if the segment is on this broker), falling back to the admin API for
> > > > remote segments.
> > > >
> > > > **Step 2: Create child segment topics.** Two new `segment://` topics
> > > > are created via the Segments admin API. Each creation request includes
> > > > the list of subscriptions discovered in step 1. The Segments API
> > > > handler creates the persistent topic and initializes subscription
> > > > cursors at the `Earliest` position. The admin API routes to whichever
> > > > broker owns the namespace bundle for each segment, so child segments
> > > > may be created on different brokers.
> > > >
> > > > **Step 3: Terminate parent segment.** The parent segment topic is
> > > > terminated via the Segments admin API, which writes a termination
> > > > marker to the managed ledger. After termination, producers writing to
> > > > the parent receive `TopicTerminated` and must re-route to child
> > > > segments.
> > > >
> > > > **Step 4: Atomic metadata update.** The controller issues a
> > > > compare-and-swap (CAS) update to the metadata store. The update
> > > > transitions the parent to `SEALED` state with child pointers, and adds
> > > > the two new `ACTIVE` child segments. The CAS guarantees atomicity — if
> > > > another operation modified the metadata concurrently, the update fails
> > > > and must be retried.
> > > >
> > > > **Step 5: Notify consumers.** All `SubscriptionCoordinator` instances
> > > > are notified of the layout change. They recompute segment-to-consumer
> > > > assignments and push updates to connected consumers.
> > > >
> > > > ### Why this ordering matters
> > > >
> > > > - Steps 1-2 **before** step 4: if we updated metadata first, clients
> > > > would discover the new segments immediately. Producers would start
> > > > writing to child segments that have no subscription cursors yet,
> > > > causing those messages to be missed by consumers.
> > > > - Step 3 **before** step 4: terminating the parent before the metadata
> > > > update ensures that by the time clients see the new layout, the parent
> > > > is already sealed. There is no window where both parent and children
> > > > are writable.
> > > > - Step 2 creates cursors at `Earliest`: this is safe because the child
> > > > topics are brand new (empty). The cursor starts at the beginning, and
> > > > the first message written by a redirected producer will be the first
> > > > message consumed.
> > > >
> > > > ---
> > > >
> > > > ## Merge Protocol
> > > >
> > > > Merging two adjacent segments is the core scaling-in operation. The
> > > > protocol follows the same ordering principle: **create first, update
> > > > metadata last**.
> > > >
> > > > ### Step-by-step
> > > >
> > > > **Step 1: Discover subscriptions.** The controller queries both parent
> > > > segments for their subscriptions and takes the union, so no
> > > > subscription is lost.
> > > >
> > > > **Step 2: Create merged segment topic.** A single new `segment://`
> > > > topic is created with all discovered subscriptions.
> > > >
> > > > **Step 3: Terminate both parent segments.** Both parents are
> > > > terminated via the admin API. This is done sequentially to avoid a
> > > > race where producers of one parent are still writing while the other
> > > > is already sealed.
> > > >
> > > > **Step 4: Atomic metadata update.** The CAS update seals both parents
> > > > with child pointers to the merged segment, and adds the new `ACTIVE`
> > > > merged segment.
> > > >
> > > > **Step 5: Notify consumers.** Same as split — rebalance and push.
> > > >
> > > > ### Cross-broker coordination
> > > >
> > > > Unlike splits, merges inherently involve multiple brokers because the
> > > > two parent segments may be owned by different brokers (assigned by the
> > > > load manager). The controller leader handles this transparently: all
> > > > segment operations (create, terminate, delete) go through the Segments
> > > > admin API, which routes each request to the broker that currently owns
> > > > the target segment's namespace bundle. The controller does not need to
> > > > know or care which broker owns which segment — the admin API routing
> > > > handles it.
> > > >
> > > > ---
> > > >
> > > > ## Segment Topic Management via Admin API
> > > >
> > > > Segment topics are managed exclusively through a dedicated REST API at
> > > > `/admin/v2/segments`. This is a deliberate design choice: segment
> > > > topics use the `segment://` domain and must not be confused with
> > > > regular `persistent://` topics.
> > > >
> > > > ### Endpoints
> > > >
> > > > | Method | Path | Description |
> > > > |--------|------|-------------|
> > > > | `PUT` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Create a
> > > > segment topic. Optional request body with subscription names to
> > > > pre-create. |
> > > > | `POST` | `/{tenant}/{namespace}/{topic}/{descriptor}/terminate` |
> > > > Terminate (seal) a segment topic. |
> > > > | `DELETE` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Delete a
> > > > segment topic. |
> > > >
> > > > The `{descriptor}` is the segment's hash range and ID in the format
> > > > `{hexStart}-{hexEnd}-{segmentId}` (e.g., `0000-7fff-1`).
> > > >
> > > > ### Create Segment
> > > >
> > > > When creating a segment topic, the handler:
> > > >
> > > > 1. Constructs the full `segment://` topic name from the path
> > components.
> > > > 2. Validates namespace bundle ownership (the request is routed to the
> > > > owning broker).
> > > > 3. Creates the persistent topic via `BrokerService.getOrCreateTopic()`.
> > > > 4. For each subscription in the request body, creates a cursor at the
> > > > `Earliest` position.
> > > >
> > > > ### Terminate Segment
> > > >
> > > > Terminates the managed ledger backing the segment topic. After
> > > > termination, the topic accepts no further writes, and producers
> > > > receive `TopicTerminated`.
> > > >
> > > > ### Delete Segment
> > > >
> > > > Deletes the segment topic and its managed ledger. Used during scalable
> > > > topic deletion to clean up all underlying storage.
> > > >
> > > > ---
> > > >
> > > > ## Consumer Assignment
> > > >
> > > > Consumer assignment behavior depends on the consumer type:
> > > >
> > > > - **StreamConsumer** (ordered, cumulative ack) — requires coordinated
> > > > segment-to-consumer assignment from the controller leader. Each active
> > > > segment is owned by exactly one consumer at any time to preserve
> > > > per-segment ordering. This is the subject of the rest of this section.
> > > > - **CheckpointConsumer** (unmanaged, for connectors) — same as
> > > > StreamConsumer from a controller perspective: it registers with the
> > > > controller leader and receives an explicit segment assignment.
> > > > - **QueueConsumer** (unordered, individual ack) — **does not require
> > > > any controller-side assignment**. Because there is no ordering
> > > > requirement, every queue consumer attaches directly to **all**
> > > > segments of the scalable topic — both `ACTIVE` and `SEALED` — and each
> > > > segment-owning broker independently performs round-robin delivery
> > > > across the queue consumers attached to that segment. New segments
> > > > produced by a split are picked up transparently via the DAG watch
> > > > session push; sealed segments keep serving messages until their
> > > > backlog drains and are then dropped from the set. The controller
> > > > leader is not in the data path for queue consumers and no
> > > > `SubscriptionCoordinator` is involved for them.
> > > >
> > > > ### Registration Flow (StreamConsumer / CheckpointConsumer)
> > > >
> > > > 1. A `StreamConsumer` connects to the controller broker (discovered
> > > > via the DAG watch session's `controller_broker_url`).
> > > > 2. The consumer sends a registration request with its `subscription`
> > > > name and `consumerName`.
> > > > 3. The controller's `registerConsumer()` method routes to the
> > > > appropriate `SubscriptionCoordinator` (created on first use for that
> > > > subscription).
> > > > 4. The coordinator adds the consumer, recomputes assignments, and
> > > > returns the consumer's `ConsumerAssignment` — a list of `(segmentId,
> > > > hashRange, segmentTopicName)` tuples.
> > > > 5. The consumer connects to each assigned `segment://` topic and
> > > > begins consuming.
> > > >
> > > > ### Rebalancing
> > > >
> > > > Rebalancing occurs when:
> > > >
> > > > - A consumer is added or removed.
> > > > - A split or merge changes the set of active segments.
> > > >
> > > > The rebalancing algorithm is round-robin:
> > > >
> > > > 1. Collect all active segments, sorted by hash range start.
> > > > 2. Collect all consumers, sorted by name (for deterministic ordering).
> > > > 3. Assign segments to consumers in round-robin order.
> > > > 4. For each consumer whose assignment changed, push the new
> > > > `ConsumerAssignment`.
> > > >
> > > > ### Consumer Session Lifecycle
> > > >
> > > > A consumer's registration and its segment assignment are treated as a
> > > > **persistent session**, not as a transient association tied to a TCP
> > > > connection. This is a deliberate departure from the existing Pulsar
> > > > consumer model, where consumer liveness is asserted purely by the TCP
> > > > connection to the broker.
> > > >
> > > > **Rationale.** Scalable topic consumer assignments have non-trivial
> > > > cost: when a consumer disappears, its segments are redistributed among
> > > > the remaining consumers, each of which may need to reconnect to
> > > > different segment brokers and (for ordered consumers) drain in-flight
> > > > messages before the new assignment takes effect. Treating a brief
> > > > disconnection as a full consumer loss would cause unnecessary
> > > > rebalancing in common scenarios such as a consumer process restart or
> > > > a broker restart.
> > > >
> > > > **What is persisted vs. in-memory.**
> > > >
> > > > The **session itself is persisted** — the keep-alive state is not.
> > > >
> > > > - **Persisted in the metadata store (the session):** the
> > > > `ConsumerRegistration` — consumer name and its current segment
> > > > assignment — keyed by `consumerName` under the subscription path. Once
> > > > a consumer successfully registers, this entry is durable and outlives
> > > > TCP disconnects, client restarts, and controller leader failovers. The
> > > > assignment survives failover without forcing consumers to re-register.
> > > > - **In-memory on the controller leader (the keep-alive):** whether
> > > > each consumer is currently connected, and the grace-period timer that
> > > > runs while it is disconnected. Keep-alive signals are **not** written
> > > > to the metadata store; the leader observes the consumer's connection
> > > > state directly and tracks the timer in RAM. This avoids a metadata
> > > > store write on every liveness tick.
> > > >
> > > > **Session semantics (steady state).**
> > > >
> > > > - When a consumer registers, the `SubscriptionCoordinator` writes its
> > > > `ConsumerRegistration` and marks it connected in-memory.
> > > > - If the consumer's connection drops, the leader does **not**
> > > > immediately remove the consumer. It transitions the in-memory state to
> > > > "disconnected" and starts a configurable **session grace period**
> > > > timer for that consumer.
> > > > - If the same `consumerName` reconnects within the grace period, the
> > > > timer is cancelled and the consumer resumes with the same persisted
> > > > assignment — no rebalance, no other consumer disturbed.
> > > > - If the grace period expires, the leader deletes the
> > > > `ConsumerRegistration` from the metadata store and triggers a
> > > > rebalance for the remaining consumers.
> > > >
> > > > **Behavior on controller leader failover.**
> > > >
> > > > Because liveness timers are in-memory only, they are lost when the
> > > > leader broker crashes. The new leader:
> > > >
> > > > 1. Loads all `ConsumerRegistration` entries for each subscription from
> > > > the metadata store, restoring the prior segment assignment.
> > > > 2. Treats every consumer as "just disconnected" and starts a fresh
> > > > grace-period timer for each. No timestamps carry across from the
> > > > previous leader.
> > > > 3. Consumers reconnecting to the new leader within the fresh grace
> > > > period resume with the same assignment — seamless from their
> > > > perspective.
> > > > 4. Consumers that fail to reconnect before the fresh timer expires are
> > > > evicted and their segments are redistributed.
> > > >
> > > > This means a leader failover always gives clients a full grace period
> > > > to reconnect, regardless of how long they had already been
> > > > disconnected under the old leader. The trade-off is explicit: we keep
> > > > the metadata store writes proportional to real membership changes
> > > > (register / unregister / rebalance) rather than to liveness ticks.
> > > >
> > > > **Covered scenarios.**
> > > >
> > > > - **Consumer process restart.** A client process restarts and
> > > > reconnects with the same `consumerName` well within the grace period —
> > > > the same segments are reassigned, no redistribution.
> > > > - **Controller leader restart.** The leader is re-elected on a
> > > > different broker. Consumer entries and assignments are restored from
> > > > the metadata store; the new leader starts a fresh grace-period timer
> > > > for every consumer so reconnects are transparent.
> > > > - **Segment-owning broker restart.** Segment topics are reassigned by
> > > > the standard Pulsar load manager; the consumer's segment-to-consumer
> > > > assignment is unaffected and reconnects to the new owning broker are
> > > > transparent at the consumer-assignment level.
> > > >
> > > > The grace period and related tunables are configurable per broker and
> > > > will be specified in a follow-up sub-PIP.
> > > >
> > > > ### Layout Change Propagation
> > > >
> > > > When a split or merge completes and the metadata is updated:
> > > >
> > > > 1. The controller reloads the metadata and creates a new
> > `SegmentLayout`.
> > > > 2. `notifySubscriptions()` is called, which iterates all
> > > > `SubscriptionCoordinator` instances.
> > > > 3. Each coordinator calls `onLayoutChange(newLayout)`, which
> > > > recomputes assignments against the new active segments and pushes
> > > > updates to affected consumers.
> > > >
> > > > ---
> > > >
> > > > ## Leader Election and Failover
> > > >
> > > > ### Election
> > > >
> > > > Each `ScalableTopicController` participates in leader election via the
> > > > metadata store's `LeaderElection` API. The election path is:
> > > >
> > > > ```
> > > > /topics/{tenant}/{namespace}/{encodedTopicName}/controller
> > > > ```
> > > >
> > > > The elected leader writes its broker service URL as the election
> > > > value. Clients discover the controller by reading this value
> > > > (delivered as part of the DAG watch session response).
> > > >
> > > > ### Failover
> > > >
> > > > When the leader broker fails:
> > > >
> > > > 1. The metadata store detects the session loss and clears the leader
> > > lock.
> > > > 2. Other brokers with active controllers for the same topic receive a
> > > > `NoLeader` state change notification.
> > > > 3. The `ScalableTopicService.onLeaderStateChange()` handler re-invokes
> > > > `controller.initialize()`, which reloads metadata and re-attempts
> > > > election.
> > > > 4. The new leader restores in-memory state from the persisted
> > > > metadata, including all subscriptions and their consumer registrations
> > > > with current assignments. Consumers reconnecting within the session
> > > > grace period find their sessions intact and resume with the same
> > > > segment assignment without a rebalance (see [Consumer Session
> > > > Lifecycle](#consumer-session-lifecycle)).
> > > >
> > > > ### Non-leader brokers
> > > >
> > > > A broker that loses leadership or never wins it retains the controller
> > > > instance but all mutating operations (`splitSegment`, `mergeSegments`,
> > > > `registerConsumer`) throw `IllegalStateException` after a
> > > > `checkLeader()` guard. The controller remains available for read
> > > > operations like `getLayout()`.
> > > >
> > > > ---
> > > >
> > > > ## Scalable Topic Lifecycle
> > > >
> > > > ### Creation
> > > >
> > > > ```
> > > > Admin API (PUT /admin/v2/scalable/{tenant}/{namespace}/{topic})
> > > >   └─> ScalableTopicService.createScalableTopic(topic,
> > numInitialSegments)
> > > >         ├─>
> > > ScalableTopicController.createInitialMetadata(numInitialSegments)
> > > >         │     └─> Divides [0x0000, 0xFFFF] into N equal ranges
> > > >         │         Creates N SegmentInfo records (ACTIVE, no parents)
> > > >         │         Returns ScalableTopicMetadata with epoch=0
> > > >         ├─> ScalableTopicResources.createScalableTopicAsync(topic,
> > > metadata)
> > > >         │     └─> Writes metadata to store at /topics/{t}/{ns}/{topic}
> > > >         └─> For each segment: createSegmentAsync via Segments admin API
> > > >               └─> Creates segment:// topic on owning broker
> > > > ```
> > > >
> > > > ### Deletion
> > > >
> > > > ```
> > > > Admin API (DELETE /admin/v2/scalable/{tenant}/{namespace}/{topic})
> > > >   └─> ScalableTopicService.deleteScalableTopic(topic)
> > > >         ├─> releaseController(topic)
> > > >         │     └─> Closes leader election, removes from controllers map
> > > >         ├─> Load metadata from store
> > > >         ├─> For each segment: deleteSegmentAsync via Segments admin API
> > > >         │     └─> Deletes segment:// topic on owning broker
> > (best-effort)
> > > >         └─> ScalableTopicResources.deleteScalableTopicAsync(topic)
> > > >               └─> Removes metadata from store
> > > > ```
> > > >
> > > > ---
> > > >
> > > > ## Public-Facing Changes
> > > >
> > > > ### Binary Protocol
> > > >
> > > > Three new protocol commands (added to `PulsarApi.proto`):
> > > >
> > > > | Command | Direction | Field ID | Description |
> > > > |---------|-----------|----------|-------------|
> > > > | `CommandScalableTopicLookup` | Client → Broker | 70 | Initiates a
> > > > DAG watch session |
> > > > | `CommandScalableTopicUpdate` | Broker → Client | 71 | Initial
> > > > response and subsequent pushed updates |
> > > > | `CommandScalableTopicClose` | Client → Broker | 72 | Closes the DAG
> > > > watch session |
> > > >
> > > > New protocol messages:
> > > >
> > > > - `ScalableTopicDAG` — the full segment DAG with broker addresses and
> > > > controller URL.
> > > > - `SegmentInfoProto` — per-segment record in the DAG.
> > > > - `SegmentBrokerAddress` — maps a segment ID to its owning broker.
> > > > - `SegmentState` enum — `ACTIVE` or `SEALED`.
> > > >
> > > > ### Admin API
> > > >
> > > > **Scalable Topics** (`/admin/v2/scalable`):
> > > >
> > > > | Method | Path | Description |
> > > > |--------|------|-------------|
> > > > | `GET` | `/{tenant}/{namespace}` | List scalable topics |
> > > > | `PUT` | `/{tenant}/{namespace}/{topic}` | Create scalable topic |
> > > > | `GET` | `/{tenant}/{namespace}/{topic}` | Get topic metadata |
> > > > | `DELETE` | `/{tenant}/{namespace}/{topic}` | Delete scalable topic |
> > > > | `GET` | `/{tenant}/{namespace}/{topic}/stats` | Get aggregated stats
> > > > for the scalable topic (segment counts, per-segment rates,
> > > > per-subscription state, consumer counts) |
> > > > | `PUT` | `/{tenant}/{namespace}/{topic}/subscriptions/{subscription}`
> > > > | Create a subscription on the scalable topic. The controller
> > > > propagates it to all active segments by issuing `createSubscription`
> > > > on each `segment://` topic via the Segments admin API. |
> > > > | `DELETE` |
> > `/{tenant}/{namespace}/{topic}/subscriptions/{subscription}`
> > > > | Delete a subscription. The controller unregisters all consumers and
> > > > deletes the subscription from every segment. |
> > > > | `POST` | `/{tenant}/{namespace}/{topic}/split/{segmentId}` | Split a
> > > segment |
> > > > | `POST` |
> > > `/{tenant}/{namespace}/{topic}/merge/{segmentId1}/{segmentId2}`
> > > > | Merge two segments |
> > > >
> > > > **Segments** (`/admin/v2/segments`):
> > > >
> > > > | Method | Path | Description |
> > > > |--------|------|-------------|
> > > > | `PUT` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Create segment
> > > topic |
> > > > | `POST` | `/{tenant}/{namespace}/{topic}/{descriptor}/terminate` |
> > > > Terminate segment |
> > > > | `DELETE` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Delete
> > > > segment topic |
> > > >
> > > > ### Admin Client API
> > > >
> > > > New `ScalableTopics` interface on `PulsarAdmin`:
> > > >
> > > > - `listScalableTopics(namespace)` /
> > `listScalableTopicsAsync(namespace)`
> > > > - `createScalableTopic(topic, numSegments)` /
> > > `createScalableTopicAsync(...)`
> > > > - `getMetadata(topic)` / `getMetadataAsync(topic)`
> > > > - `getStats(topic)` / `getStatsAsync(topic)`
> > > > - `deleteScalableTopic(topic)` / `deleteScalableTopicAsync(topic)`
> > > > - `createSubscription(topic, subscription)` /
> > > `createSubscriptionAsync(...)`
> > > > - `deleteSubscription(topic, subscription)` /
> > > `deleteSubscriptionAsync(...)`
> > > > - `splitSegment(topic, segmentId)` / `splitSegmentAsync(...)`
> > > > - `mergeSegments(topic, segmentId1, segmentId2)` /
> > > `mergeSegmentsAsync(...)`
> > > > - `createSegment(segmentTopic, subscriptions)` /
> > > `createSegmentAsync(...)`
> > > > - `terminateSegment(segmentTopic)` / `terminateSegmentAsync(...)`
> > > > - `deleteSegment(segmentTopic, force)` / `deleteSegmentAsync(...)`
> > > >
> > > > ### Configuration
> > > >
> > > > | Property | Default | Description |
> > > > |----------|---------|-------------|
> > > > | `scalableTopicEnabled` | `true` | Enable scalable topic support on
> > > > the broker |
> > > >
> > > > Additional configuration for auto-split thresholds and consumer
> > > > controller session grace period will be specified in follow-up
> > > > sub-PIPs.
> > > >
> > > > ### Metadata Store Paths
> > > >
> > > > | Path | Content |
> > > > |------|---------|
> > > > | `/topics/{tenant}/{namespace}/{topic}` | `ScalableTopicMetadata`
> > > > JSON — segment DAG and global topic state |
> > > > | `/topics/{tenant}/{namespace}/{topic}/controller` | Leader broker
> > > > URL (ephemeral) |
> > > > | `/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}`
> > > > | `SubscriptionMetadata` — subscription-level config and the persisted
> > > > set of consumer registrations |
> > > > |
> > >
> > `/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/consumers/{consumerName}`
> > > > | `ConsumerRegistration` — the durable session: consumer name and its
> > > > current segment assignment. Keep-alive state (connected/disconnected,
> > > > grace-period timer) is in-memory on the controller leader only and is
> > > > **not** persisted here. |
> > > >
> > > > ---
> > > >
> > > > ## Backward & Forward Compatibility
> > > >
> > > > ### Upgrade
> > > >
> > > > The controller is a new component with no interaction with existing
> > > > partitioned or non-partitioned topics. Enabling `scalableTopicEnabled`
> > > > activates the new code paths. Existing topics are unaffected.
> > > >
> > > > ### Downgrade / Rollback
> > > >
> > > > Scalable topic metadata and segment data remain in the metadata store
> > > > and BookKeeper. Rolling back to a version without scalable topic
> > > > support leaves this data intact but inaccessible. Upgrading again
> > > > restores access.
> > > >
> > > > ### Client Compatibility
> > > >
> > > > The existing Pulsar client SDK rejects `topic://` and `segment://`
> > > > domains with a clear error message directing users to the V5 client
> > > > SDK. This check is enforced in `PulsarClientImpl` for producer,
> > > > consumer, and reader creation paths.
> > > >
> > > > ---
> > > >
> > > > ## Security Considerations
> > > >
> > > > The controller follows the same tenant/namespace/topic authorization
> > > > model as existing Pulsar topics:
> > > >
> > > > - Admin API operations require topic-level admin permissions.
> > > > - DAG watch sessions require topic-level lookup permissions.
> > > > - Consumer registration requires topic-level consume permissions.
> > > > - Segment operations are internal and authenticated via the broker's
> > > > internal admin client.
> > > >
> > > > The controller leader lock in the metadata store is accessible only to
> > > > authenticated brokers.
> > > >
> > > > ---
> > > >
> > > > ## Links
> > > >
> > > > - Parent PIP: [PIP-460: Scalable Topics](pip-460.md)
> > > > - V5 Client API: [PIP-466: New Java Client API (V5)](pip-466.md)
> > > >
> > > > --
> > > > Matteo Merli
> > > > <[email protected]>
> > > >
> > > >
> > >
> >

Reply via email to