https://github.com/apache/pulsar/pull/25774
----

# PIP-477: Dynamic Bookie Cluster Switching for Brokers

> **Revision 2 (Plugin-First Architecture)** — This revision reframes
PIP-477
> per the design feedback to build on top of the storage-layer pluggability
> introduced by [PIP-384](./pip-384.md), the per-ledger metadata extension
slot
> introduced by [PIP-404](./pip-404.md), the `CustomCommandFactory` SPI
introduced
> by [PIP-201](./pip-201.md), and the existing `AdditionalServlet` SPI —
instead
> of adding a self-contained subsystem to Pulsar core.

## Motivation

Apache Pulsar's broker layer is bound to a fixed BookKeeper (Bookie) cluster
at startup through the `bookkeeperMetadataServiceUri` configuration. This
static binding becomes a bottleneck in several real-world operational
scenarios:

1. **Slow failure recovery** — When a Bookie cluster experiences capacity
exhaustion, sustained latency degradation, or non-recoverable failures, the
only mitigation today is to wait for the Bookie cluster to recover. In
elastic-storage deployments where one Bookie cluster backs 100+ broker
clusters, the blast radius is enormous.
2. **No capacity escape hatch** — When a Bookie cluster approaches its
capacity ceiling, brokers cannot "spill over" to a fresh Bookie cluster.
3. **No runtime re-targeting** — There is no supported runtime mechanism to
migrate brokers from one Bookie cluster to another without restart and full
data migration.
4. **Data plane coupling** — Switching deals with the data plane: terabytes
of
ledger bytes, ongoing publishes, ongoing acknowledgments, and live cursors.
A copy-everything-then-cutover approach is infeasible.

This PIP proposes a **safe, ledger-attribution-driven framework** that
allows
operators to switch a broker cluster's underlying Bookie cluster at runtime,
with **zero data-plane downtime** and **per-ledger routing** that preserves
the
readability of historical data on the old cluster.

The framework enables:

- **Zero-downtime switching** — publish and consume continue without
interruption
- **Per-ledger routing** — each ledger carries its own cluster attribution
- **No bulk data migration** — Topic Data Ledgers age out naturally via
TTL/Retention
- **Targeted migration of small, long-lived metadata ledgers** — only
Schema and
Cursor Ledgers are copied, preserving the original `ledgerId`
- **Automatic rollback** within a configurable window
- **Operator-driven, not auto-failover** — avoids split-brain

## Design Principle: Build on PIP-384, Not Around It

The feature is delivered as **a `ManagedLedgerStorage` implementation that
holds multiple `BookkeeperManagedLedgerStorageClass` instances**, packaged
as
an out-of-tree NAR plugin. The framing PIP-384 establishes —
*"a topic resolves to one storage class"* — is generalized here to:

> *"A topic resolves to one **active** storage class at a time, with
> **attribution-driven routing for previously-written ledgers**."*

Concretely, this PIP:

- Lives in a separate Maven module (`pulsar-bookie-cluster-switching`) and
ships as a NAR; the broker loads it via the existing
`managedLedgerStorageClassName`
configuration hook (PIP-384).
- Re-uses **`LedgerInfo.properties`** (PIP-404, tag 6) and
**`ManagedCursorInfo.cursorProperties`** (existing, tag 8) for per-ledger
cluster attribution. **Zero new proto fields** in `MLDataFormats.proto`.
- Ships its REST surface via **`AdditionalServlet`**; ships its CLI via
**`CustomCommandFactory` NAR** (PIP-201). **Zero changes** to
`pulsar-broker`
REST classes or `pulsar-admin` CLI core.
- Requires only **three narrow SPI hooks** in core (Section "Required Core
SPI
Additions"), each useful to *any* custom `ManagedLedgerStorage` author and
much smaller than inlining the full feature.
- When the plugin NAR is not installed, broker behavior is **byte-for-byte
identical** to unmodified Pulsar; the ~99% of users that never switch BK
clusters pay zero cost in core surface area.

## Goal

Provide a runtime mechanism by which a broker cluster can be re-pointed from
one Bookie cluster (`oldCluster`) to a new Bookie cluster (`newCluster`)
without service interruption, while preserving the ability to read
historical
data that physically resides on `oldCluster`.

### In Scope

- A plugin-provided `ManagedLedgerStorage` implementation
(`MultiClusterManagedLedgerStorage`) that holds a
`Map<clusterName, BookkeeperManagedLedgerStorageClass>` and exposes the
currently active one.
- Per-ledger cluster attribution stored in the **existing**
`LedgerInfo.properties` and `ManagedCursorInfo.cursorProperties` slots under
the reserved key `_pulsar.bookieClusterName`.
- Per-broker-cluster registry of multiple Bookie clusters, stored in
Broker-ZK under a plugin-owned path.
- A plugin-owned `BookieClusterReadRouter` that selects a BK client at
ledger-handle open time based on the attribution property.
- A plugin-owned `BookieClusterSwitchOrchestrator` that drives
`BUILD → PROMOTE → CLEANUP` for Schema and Cursor Ledgers (migration
preserves the original `ledgerId` via `BookKeeper.asyncCreateLedgerAdv`).
- Plugin-provided CLI (`pulsar-admin bookie-clusters …`) via PIP-201
`CustomCommandFactory`.
- Plugin-provided REST API under `/admin/v2/bookie-clusters/*` via
`AdditionalServlet`.
- Three narrow core SPI hooks (see Section "Required Core SPI Additions").

### Out of Scope

- **Topic Data Ledger migration** — not migrated; ages out via
TTL/Retention.
- **Automatic failure detection and switch triggering** — operator-initiated
to avoid split-brain.
- **Cross-cluster (geo-replicated) coordination** — each broker cluster
switches independently.
- **BookKeeper-cluster-side internal migration tools** — delegated to
BookKeeper.
- **Changing `metadataStoreUrl` (Broker-ZK)** — only the Bookie cluster is
switched; Broker-ZK remains the same throughout.
- **New proto fields in `MLDataFormats.proto`** — reuses existing extension
slots from PIP-404 and `cursorProperties`.

## High-Level Architecture

```
┌────────────────────────────────────────────────────────────────────────┐
│ Pulsar core (unchanged + 3 narrow SPI hooks) │
│ │
│ ManagedLedgerStorage SPI ─── PIP-384 │
│ ManagedLedgerStorageClass SPI ─── PIP-384 │
│ LedgerInfo.properties (KV) ─── PIP-404 │
│ ManagedCursorInfo.cursorProperties ─── pre-existing │
│ AdditionalServlet SPI ─── pre-existing │
│ CustomCommandFactory SPI ─── PIP-201 │
│ │
│ + ManagedLedgerConfig.activeBookKeeperSupplier ←── NEW (S1) │
│ + ManagedLedgerConfig.bookKeeperResolver ←── NEW (S2) │
│ + SchemaStorageBookKeeperProvider SPI ←── NEW (S3) │
└────────────────────────────┬───────────────────────────────────────────┘
│ SPI / NAR loading
┌────────────────────────────▼───────────────────────────────────────────┐
│ pulsar-bookie-cluster-switching (NAR, opt-in) │
│ │
│ MultiClusterManagedLedgerStorage impl ManagedLedgerStorage │
│ ├─ Map<name, BookkeeperManagedLedgerStorageClass> │
│ ├─ activeClusterName (volatile) │
│ └─ getDefaultStorageClass() → active │
│ │
│ RoutingManagedLedgerFactory wraps ManagedLedgerFactoryImpl │
│ └─ Per-ledger BK resolution from LedgerInfo.properties │
│ │
│ RoutingSchemaBookKeeperProvider impl SchemaStorageBookKeeperProvider│
│ └─ Per-position BK resolution from SchemaLocator extension │
│ │
│ BookieClusterConfigManager / ConfigWatcher │
│ └─ Owns ZK path /admin/bookie-clusters/* (plugin-owned) │
│ │
│ BookieClusterSwitchOrchestrator BUILD → PROMOTE → CLEANUP │
│ │
│ REST /admin/v2/bookie-clusters/* via AdditionalServlet │
│ CLI pulsar-admin bookie-clusters via CustomCommandFactory NAR │
└────────────────────────────────────────────────────────────────────────┘
```

### Key Principles

| Principle | Description |
|-----------|-------------|
| **Attribution as Single Source of Truth** | Every ledger at creation is
"stamped" with the active cluster name inside its existing metadata
properties. Reads are routed by that stamp. No history table, no boundary
heuristics, no side znode. |
| **Reuse Extension Slots, Don't Add Proto Fields** |
`LedgerInfo.properties` (PIP-404) and `ManagedCursorInfo.cursorProperties`
carry the attribution. Only `SchemaStorageFormat.proto` needs one tiny
extension (Section "Required Core SPI Additions" S3). |
| **Topic Data: Don't Migrate** | Topic Data Ledgers have TTL/Retention;
let them age out. Switching does not move terabytes of data. |
| **Schema / Cursor: Migrate, but Preserve `ledgerId`** | We use
`asyncCreateLedgerAdv(ledgerId,
…)` to recreate the **same `ledgerId`** in the new cluster, copy entries
with **identical `entryId`**, then CAS only the attribution property.
Business keys (`cursorsLedgerId`, `position.ledgerId`, `position.entryId`)
are byte-for-byte unchanged in Broker-ZK. |
| **Plugin-First, Opt-In** | When the NAR is absent or
`managedLedgerStorageClassName` ≠ the multi-cluster class, the broker is
byte-for-byte unmodified Pulsar. |
| **Operator-Driven, Single Active Cluster** | At any moment exactly one
cluster is `status=ACTIVE` in Broker-ZK. Auto-failover is rejected. |
| **Broker-ZK is the Switch Source-of-Truth** | All cluster registrations
and the switch directive live in Broker-ZK under a plugin-owned path. No
Global-ZK, no external coordinator. |

## Required Core SPI Additions

These are the **only** changes to Pulsar core required by this PIP. Each is
a
narrow, generally-useful hook that any custom `ManagedLedgerStorage`
implementation could leverage — not specific to BK switching.

### S1. `ManagedLedgerConfig.activeBookKeeperSupplier`

**Problem.** Today `ManagedLedgerImpl` captures the `BookKeeper` client once
in its constructor (via `ManagedLedgerFactoryImpl.bookkeeperFactory`). When
a
custom storage class needs to hot-swap the underlying BK client (e.g.
cluster
switch), there is no clean injection point on the write path.

**Hook.** Add an optional supplier:

```java
// managed-ledger/.../ManagedLedgerConfig.java
private Supplier<BookKeeper> activeBookKeeperSupplier; // nullable

public Supplier<BookKeeper> getActiveBookKeeperSupplier() { … }
public ManagedLedgerConfig setActiveBookKeeperSupplier(Supplier<BookKeeper>
s) { … }
```

`ManagedLedgerImpl.createLedgerAfterClosed`, `rollCurrentLedgerIfFull`, and
`ManagedCursorImpl.doCreateNewMetadataLedger` resolve the BK client as:

```java
BookKeeper bk = config.getActiveBookKeeperSupplier() != null
? config.getActiveBookKeeperSupplier().get()
: this.bookKeeper; // backward-compatible default
```

**Scope.** Pure additive. When the supplier is unset (default), behavior is
identical. ~30 lines of change in `managed-ledger/`.

### S2. `ManagedLedgerConfig.bookKeeperResolver`

**Problem.** Reads (and deletes) need a *per-ledger* resolution path. Today
`ManagedLedgerImpl` uses the constructor-captured `bookKeeper` for every
`asyncOpenLedger`/`asyncDeleteLedger`. After a switch, ledgers that
physically
live on the old cluster must be opened with the old client.

**Hook.** Add an optional resolver function keyed on `LedgerInfo`:

```java
// managed-ledger/.../ManagedLedgerConfig.java
private Function<LedgerInfo, BookKeeper> bookKeeperResolver; // nullable

// And the analogous function for cursor metadata ledgers:
private Function<ManagedCursorInfo, BookKeeper> cursorBookKeeperResolver;
```

All call sites that today reference `this.bookKeeper` to open or delete a
specific ledger (the comprehensive list is in
`ManagedLedgerImpl.getLedgerHandle`, `internalAsyncOpenCursor`,
`asyncDeleteLedger`, `asyncDeleteFromBookKeeper`, …) become:

```java
BookKeeper bk = resolveBookKeeperForLedger(ledgerInfo);
```

Where:

```java
private BookKeeper resolveBookKeeperForLedger(LedgerInfo li) {
var resolver = config.getBookKeeperResolver();
return resolver != null ? resolver.apply(li) : this.bookKeeper;
}
```

**Scope.** Pure additive; when resolvers are unset, behavior is identical.
~80 lines of change in `managed-ledger/`, mostly mechanical substitutions.

### S3. `SchemaStorageBookKeeperProvider` SPI

**Problem.** `BookkeeperSchemaStorage` captures `this.bookKeeper` at
`start()` time (line ~101). It directly opens schema ledgers via that single
client. To support per-ledger routing we need to inject a resolver and to
attribute each schema ledger to a cluster.

**Hook (two parts).**

1. **Add a tiny extension slot to `SchemaStorageFormat.proto`** — this is
the
*only* proto change in this PIP:

```protobuf
message PositionInfo {
required int64 ledgerId = 1;
required int64 entryId = 2;
repeated KeyValue properties = 3; // ★ NEW — generic extension slot,
// mirrors PIP-404 LedgerInfo.properties
}

message KeyValue {
required string key = 1;
required string value = 2;
}
```

This is the minimal possible change: a generic `properties` slot symmetric
to PIP-404. It does **not** mention bookie clusters; any future plugin can
use it.

2. **Add a provider SPI for the schema storage BK client:**

```java
// pulsar-broker/.../service/schema/SchemaStorageBookKeeperProvider.java
public interface SchemaStorageBookKeeperProvider {
/** Returns the BK client to use for the given schema position. */
BookKeeper resolve(PositionInfo position);
/** Returns the BK client to use for *creating* a new schema ledger. */
BookKeeper active();
}
```

`BookkeeperSchemaStorage` resolves its provider as:

```java
var provider = pulsar.getSchemaStorageBookKeeperProvider(); // nullable
BookKeeper bk = provider != null ? provider.resolve(position) : this.
bookKeeper;
```

The default (provider == null) preserves today's behavior exactly.

**Scope.** One proto field (additive, `repeated` is forward/backward
compatible in proto2) + one SPI interface + ~50 lines in
`BookkeeperSchemaStorage`. No new REST endpoint, no CLI change.

### Summary: Core Diff Footprint

| Component | LOC | Risk |
|-----------|-----|------|
| `ManagedLedgerConfig` (S1, S2 setters/getters) | ~40 | Trivial additive |
| `ManagedLedgerImpl` / `ManagedCursorImpl` resolver wiring | ~80 |
Mechanical, well-scoped |
| `SchemaStorageFormat.proto` `PositionInfo.properties` (S3) | 1 field |
Additive, proto2-safe |
| `SchemaStorageBookKeeperProvider` SPI + `BookkeeperSchemaStorage` wiring
| ~50 | Additive |
| **Total core diff** | **~170 LOC, 1 new optional proto field** | **Zero
behavioral change when SPI unused** |

Everything else lives in the plugin NAR.

## Plugin Module: `pulsar-bookie-cluster-switching`

A new top-level Maven module that builds a NAR (
`pulsar-bookie-cluster-switching-<version>.nar`).

### Module Layout

```
pulsar-bookie-cluster-switching/
├─ src/main/java/.../
│ ├─ storage/
│ │ ├─ MultiClusterManagedLedgerStorage.java (implements
ManagedLedgerStorage)
│ │ ├─ MultiClusterStorageClass.java (implements
BookkeeperManagedLedgerStorageClass)
│ │ └─ RoutingManagedLedgerFactory.java
│ ├─ schema/
│ │ └─ RoutingSchemaBookKeeperProvider.java (implements
SchemaStorageBookKeeperProvider)
│ ├─ registry/
│ │ ├─ BookieClusterConfigManager.java
│ │ └─ BookieClusterConfigWatcher.java
│ ├─ orchestrator/
│ │ ├─ BookieClusterSwitchOrchestrator.java
│ │ ├─ LedgerCopyUtil.java
│ │ └─ IdgenPrecheckService.java
│ ├─ rest/
│ │ └─ BookieClusterAdminServlet.java (implements AdditionalServlet)
│ └─ cli/
│ └─ BookieClusterCommandFactory.java (implements CustomCommandFactory)
└─ src/main/resources/META-INF/services/
├─ org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlet
└─ org.apache.pulsar.admin.cli.extensions.CustomCommandFactory
```

### Wiring on Broker Start

The plugin is activated by setting in `broker.conf`:

```properties
managedLedgerStorageClassName=org.apache.pulsar.ext.bookieswitch.MultiClusterManagedLedgerStorage
additionalServlets=bookie-clusters-admin
additionalServletDirectory=./plugins
```

Then `ManagedLedgerStorage.create()` (PIP-384 entry point) reflectively
loads
`MultiClusterManagedLedgerStorage`. Inside its `initialize()`:

1. Reads its own configuration namespace (prefix `bookieClusterSwitch.*`).
2. Reads `/admin/bookie-clusters/*` from Broker-ZK to discover registered
clusters and resolve the initial `activeClusterName` (tier-1 →
Broker-ZK `status=ACTIVE` cluster; tier-2 → optional
`bookieClusterSwitch.currentClusterName` hint; tier-3 → derive from
`bookkeeperMetadataServiceUri` and auto-register as `ACTIVE`).
3. Builds one `BookkeeperManagedLedgerStorageClass` per registered cluster
(each owns its own `BookKeeper` client and `StatsProvider`).
4. Constructs a `RoutingManagedLedgerFactory` that decorates a base
`ManagedLedgerFactoryImpl` and injects the per-ledger BK resolver via the
S1/S2 hooks above.
5. Starts `BookieClusterConfigWatcher` to receive switch directives.
6. (Origin-broker-only after a switch) starts the orchestrator.

The plugin's `AdditionalServlet` instance is independently discovered by
`AdditionalServlets.load(…)` in `WebService`; the plugin's
`CustomCommandFactory` is independently discovered by `pulsar-admin`'s NAR
class loader.

## Detailed Design

### Per-Ledger Attribution Encoding

| Ledger type | proto container | Encoding | Key |
|-------------|-----------------|----------|-----|
| Topic Data | `ManagedLedgerInfo.LedgerInfo.properties` (PIP-404, tag 6) |
`KeyValue` | `_pulsar.bookieClusterName` |
| Cursor | `ManagedCursorInfo.cursorProperties` (existing, tag 8) |
`StringProperty` | `_pulsar.bookieClusterName` |
| Schema | `SchemaStorageFormat.PositionInfo.properties` (NEW, tag 3, S3) |
`KeyValue` | `_pulsar.bookieClusterName` |

The reserved key prefix `_pulsar.` is **owned by Pulsar core** to avoid
collisions with plugin- or user-defined properties. Plugin code uses a
constant:

```java
public static final String ATTR_KEY = "_pulsar.bookieClusterName";
```

**Compatibility.**

- **proto2 forward-compat:** `repeated KeyValue properties` already exists
in
`LedgerInfo` (PIP-404). For `PositionInfo` it's added with a new tag (3),
matching the symmetric pattern.
- **Old broker reading new metadata:** ignores `properties` it doesn't
understand; preserves them via `UnknownFieldSet` on rewrite (already the
case for PIP-404).
- **New broker reading old metadata (no attribution):** the plugin's
resolver
falls back to the active cluster (which equals the original cluster for
pre-switch ledgers; correct).

### Write-Path Stamping

`RoutingManagedLedgerFactory` wraps the base factory. When `ManagedLedger`
asks for a new ledger:

```java
// Pseudocode of the stamping path
String activeCluster = multiClusterStorage.getActiveClusterName();
BookKeeper bk = multiClusterStorage.getBookKeeper(activeCluster); // via S1
LedgerHandle lh = bk.asyncCreateLedger(...);

// In the same metadata CAS that records the new LedgerInfo:
LedgerInfo info = baseInfo.toBuilder()
.addProperties(KeyValue.newBuilder()
.setKey(ATTR_KEY)
.setValue(activeCluster)
.build())
.build();
```

The same pattern applies to:

- `ManagedCursorImpl.doCreateNewMetadataLedger` → stamps
`cursorProperties[_pulsar.bookieClusterName] = activeCluster`.
- `BookkeeperSchemaStorage.createLedger` (via S3 provider) → stamps
`PositionInfo.properties[_pulsar.bookieClusterName] = activeCluster`.

The stamp is written in the **same Broker-ZK CAS** as the business field
(`ledgerId`, `cursorsLedgerId`, `position`), guaranteeing atomicity
(invariant **I1**).

### Read-Path Routing

```java
// RoutingManagedLedgerFactory -- injected as
ManagedLedgerConfig.bookKeeperResolver (S2)
BookKeeper resolve(LedgerInfo li) {
String cluster = li.getPropertiesList().stream()
.filter(kv -> ATTR_KEY.equals(kv.getKey()))
.map(KeyValue::getValue)
.findFirst()
.orElseGet(multiClusterStorage::getActiveClusterName); // legacy fallback
return multiClusterStorage.getBookKeeper(cluster);
}
```

Identical pattern for cursors (consulting `cursorProperties`) and schema
positions (consulting `PositionInfo.properties`).

**Invariant I4** (entry-level consistency within a single read): the
resolver
is consulted exactly once at `LedgerHandle` open time; the resulting
`LedgerHandle` is bound to one BK client for the entire read of that ledger.

### Switch Phases (Operator-Driven State Machine)

```
NOT_REGISTERED
↓
[Operator: pulsar-admin bookie-clusters register --name newCluster --uri …
--status STANDBY]
↓
REGISTERED (STANDBY)
↓
[Operator: pulsar-admin bookie-clusters switch --target newCluster]
↓ (inline precheck: idgen-long advanced)
SWITCH_TRIGGERED ← Coordinator broker writes
/admin/bookie-clusters/switch-target
↓
LIVE_DUAL_READ ← All brokers' watchers fire; new writes stamped newCluster;
↓ reads of pre-stamped ledgers routed by attribution
BUILD_PROMOTE_CLEANUP
├── BUILD ← Coordinator copies Schema/Cursor ledger bytes to newCluster
│ with the SAME ledgerId; entries copied with the SAME entryId
├── PROMOTE ← Schema: coordinator CAS-rewrites SchemaLocator
│ (changes only PositionInfo.properties[ATTR_KEY])
│ Cursor: coordinator forwards POST /internal/promote-cursor
│ to topic owner; owner CAS in ManagedCursorImpl lock
└── CLEANUP ← After rollback window, delete old-cluster copies; promote
Broker-ZK status: oldCluster→DEPRECATED, newCluster→ACTIVE
↓
DONE / DONE_WITH_FAILURES

(Operator-triggered, within window):
ROLLBACK ← Reverse-CAS attribution back to oldCluster; new-cluster
copies scheduled for delayed deletion.
```

All states above live **inside the plugin**. The orchestrator persists its
progress under `/admin/bookie-clusters/orchestrator/<switchId>/...`.

### Phase 1: Registration

```bash
pulsar-admin bookie-clusters register \
--name new-cluster \
--metadata-service-uri zk+null://new-zk:2181/ledgers-v2 \
--status STANDBY
```

`BookieClusterConfigManager` (plugin) validates:

- `metadata-service-uri` does not collide with any already-registered
cluster.
- For co-located deployments, the chroot must differ from existing clusters.

### Phase 2: Precheck (Invariant I11)

```bash
pulsar-admin bookie-clusters precheck --name new-cluster
```

`IdgenPrecheckService` (plugin) verifies the new cluster's
`/ledgers/idgen-long`
is advanced beyond `max(ledgerId)` of the source cluster. Otherwise
`asyncCreateLedgerAdv(ledgerId, …)` during BUILD would collide with
newly-allocated ledgerIds. `POST /switch` invokes precheck inline; failure
returns HTTP 409 (bypassable via plugin config in lab/staging only).

### Phase 3: Switch Trigger

```bash
pulsar-admin bookie-clusters switch --target new-cluster
```

The receiving broker (origin) writes:

```
/admin/bookie-clusters/switch-target
body: { "target": "new-cluster", "origin": "https://broker-1:8080";,
"epoch": 7, "ts": 1716256800000 }
```

All brokers' `BookieClusterConfigWatcher` fire and execute, in order:

1. `MultiClusterManagedLedgerStorage.switchActiveCluster(newCluster)` —
hot-swap `activeClusterName`; new BK client lazily created if not present.
2. First switch: enable dual-read in the read router; subsequent switches:
refresh epoch.
3. Emit metrics.
4. **Origin broker only:** start the orchestrator (`BUILD → PROMOTE →
CLEANUP`).
Non-origin brokers do not start an orchestrator. If the directive body
lacks `origin`, brokers fall back to leader election among themselves
(using the existing `LeaderElectionService`) to pick exactly one
orchestrator.

### Phase 4: BUILD (Schema + Cursor Ledger Bytes)

The orchestrator runs **two parallel stages** with bounded concurrency.

#### Schema Ledger BUILD (centralized)

```
for each schemaId in /schemas/* (parallel, bounded):
locator = brokerZk.get("/schemas/<schemaId>")
for each indexEntry in locator.indexList:
ledgerId = indexEntry.position.ledgerId
cluster = readClusterAttr(indexEntry.position) ?? activeClusterAtFirstWrite
if cluster == oldCluster:
LedgerCopyUtil.copyLedgerPreservingIds(
ledgerId, oldBk, newBk,
customMetadata, // identical to source ledger
ensembleSize, qw, qa)
// Internally:
// newBk.asyncCreateLedgerAdv(ledgerId, …) ← SAME ledgerId
// for entryId in [0, lastAddConfirmed]:
// newLh.addEntry(entryId, srcEntry) ← SAME entryId
// newLh.close()
```

#### Cursor Ledger BUILD (centralized)

Identical pattern over `/managed-ledgers/<topic>/<cursorName>` znodes,
copying
each `cursorsLedgerId`.

**Why BUILD is safe to centralize:**

1. Reads of the old ledger are read-only — no contention with owner brokers'
write paths.
2. Writes to the new cluster are to a **dormant copy** — no online reader
sees it yet (attribution still says `oldCluster`).
3. The orchestrator uses dedicated BK clients with isolated thread pools.

#### Idempotency on `LedgerExistException`

If `asyncCreateLedgerAdv(ledgerId, …)` returns `LedgerExistException` (BUILD
retry / partial prior run), `LedgerCopyUtil.verifyDstMatchesSrc` opens both
ledgers and compares `getLastAddConfirmed()` and `getLength()`. If both
match, the existing ledger is treated as a successful idempotent copy
(`migration_ledger_id_conflict_total` is incremented for visibility).
Mismatch → record as `failedLedgers`; attribution remains on the old
cluster.

### Phase 5: PROMOTE (CAS Attribution Property)

#### Schema PROMOTE (centralized)

```java
for (schemaId : builtSchemas) {
for (retry < MAX_PROMOTE_RETRIES = 5) {
(locator, version) = brokerZk.getWithVersion("/schemas/<schemaId>");
SchemaLocator.Builder updated = locator.toBuilder().clearIndex();
for (IndexEntry ie : locator.indexList) {
if (readClusterAttr(ie.position).equals(oldCluster)) {
updated.addIndex(ie.toBuilder()
.setPosition(setClusterAttr(ie.position, newCluster)) // ★ ONLY changed
.build());
} else {
updated.addIndex(ie); // already newCluster
}
}
try { brokerZk.put("/schemas/<schemaId>", updated.build().toByteArray(),
version); break; }
catch (BadVersionException) { continue; }
}
}
```

**Crucially**, `position.ledgerId`, `position.entryId`, `info.version`, and
`info.hash` are **byte-for-byte unchanged** (invariant **I10**). Only the
attribution property is rewritten.

#### Cursor PROMOTE (forwarded to topic owner)

Cursor PROMOTE must be executed by the topic owner inside the
`ManagedCursorImpl` lock to avoid racing with markDelete flush CAS.

```
[orchestrator broker]
for each (topic, cursorName, cursorsLedgerId) built successfully:
ownerUrl = namespaceService.findOwnerHttpUrl(topic)
POST {ownerUrl}/admin/v2/bookie-clusters/internal/promote-cursor
body: {topic, cursorName, targetClusterName=newCluster,
expectedCursorsLedgerId=cursorsLedgerId}

[owner broker] ← handled by plugin's AdditionalServlet
cursor = brokerService.getTopic(topic).getCursor(cursorName)
synchronized (cursor) {
if (cursor.cursorsLedgerId != expectedCursorsLedgerId) {
return 409; // cursor rolled over since BUILD; recorded as
// promotedViaRolloverFallback (rolled-over ledger was
// already stamped newCluster at rollover time)
}
ManagedCursorInfo updated =
ManagedCursorInfo.newBuilder(cursor.managedCursorInfo)
.clearCursorProperties()
.addAllCursorProperties(rewriteAttrProperty(cursor.cursorProperties,
newCluster))
.build();
metaStore.asyncUpdateCursorInfo(cursor.path, updated, expectedZkVersion);
// BadVersion → re-read; attribution is orthogonal to
// markDeleteLedgerId / markDeleteEntryId, so retry converges.
}
```

The owner-side handler lives in the plugin's `AdditionalServlet`; **no
change to `pulsar-broker` core REST classes is needed**.

**Invariant I9**: PROMOTE must change only the attribution property entry in
`cursorProperties`; `cursorsLedgerId`, `markDeleteLedgerId`,
`markDeleteEntryId`, deleted ranges, batched deletion indices, and **all
other** cursor properties are preserved.

#### Concurrent putSchema and PROMOTE

For Schema, concurrent `putSchema` writes are naturally serialized via
the `/schemas/<schemaId>` znode CAS. The argument is identical to the
original PIP-470: either order converges to "all entries stamped
newCluster".

### Phase 6: CLEANUP

After the rollback window elapses, or upon explicit
`pulsar-admin bookie-clusters cleanup`:

1. Delete old-cluster ledgers via `oldBk.asyncDeleteLedger(ledgerId)`.
2. CAS-flip Broker-ZK status: `oldCluster.status = DEPRECATED`,
`newCluster.status = ACTIVE` (two sequential CAS).
3. Optionally close the old BK client via
`pulsar-admin bookie-clusters close-client --name oldCluster`.

### Phase 7: ROLLBACK (within window)

The reverse-CAS flips the attribution property back to `oldCluster` for
affected Schema/Cursor; new-cluster ledgers are scheduled for delayed
deletion. Because `ledgerId` and `entryId` were never touched in
Broker-ZK, rollback is a constant-time metadata flip — no data movement.

### Failure Handling

| Stage | Failure | Behavior |
|-------|---------|----------|
| precheck | idgen-long not advanced | HTTP 409; switch refused |
| Switch trigger | New BK client init fails | `activeClusterName` stays on
old; switch rejected |
| BUILD | Source ledger unreadable | Record in `failedLedgers`; attribution
stays on old cluster (still readable) |
| BUILD | `LedgerExistException` + LAC mismatch | Record in `failedLedgers`;
do not PROMOTE; operator can retry switch |
| PROMOTE | CAS BadVersion (5 retries exhausted) | Log warn; not counted in
`failedLedgers`; next `switch --target <same>` retries (orchestrator is
idempotent) |
| PROMOTE | Cursor owner returns 409 (rollover) | Recorded as
`promotedViaRolloverFallback`; old `cursorsLedgerId` ages out naturally |
| CLEANUP | `promoteActiveCluster` CAS fails | Orchestrator finishes
`DONE_WITH_FAILURES`; in-memory `activeClusterName` is correct; operator
runs `register --status ACTIVE` to reconcile |

## Data Structures

### Cluster Registry (Broker-ZK, plugin-owned path)

```
/admin/bookie-clusters/ ← plugin-owned (configurable)
├── old-cluster ← JSON body
│ { "name": "old-cluster",
│ "metadataServiceUri": "zk://old-zk:2181/ledgers",
│ "status": "ACTIVE" }
└── new-cluster
{ "name": "new-cluster",
"metadataServiceUri": "zk+null://new-zk:2181/ledgers-v2",
"status": "STANDBY" }

/admin/bookie-clusters/switch-target ← written on switch
body: { "target": "new-cluster",
"origin": "https://broker-1:8080";,
"epoch": 7,
"ts": 1716256800000 }

/admin/bookie-clusters/orchestrator/<switchId>/... ← orchestrator progress
```

Path prefix is configurable via plugin config; the plugin uses
`MetadataStore` (PIP-45) so the implementation is ZK-agnostic.

### proto Changes (this PIP)

**`pulsar-broker/src/main/proto/SchemaStorageFormat.proto`** — *the only*
proto change required:

```protobuf
message PositionInfo {
required int64 ledgerId = 1;
required int64 entryId = 2;
repeated KeyValue properties = 3; // ★ NEW (mirror of PIP-404)
}

message KeyValue {
required string key = 1;
required string value = 2;
}
```

**`managed-ledger/src/main/proto/MLDataFormats.proto`** — **NO CHANGE**.

- `LedgerInfo.properties` (tag 6) was added by PIP-404; this PIP reuses it.
- `ManagedCursorInfo.cursorProperties` (tag 8) already exists; this PIP
reuses it.

This is the single largest delta versus the previous draft of PIP-477: we
delete three planned proto fields (`LedgerInfo.bookieClusterName`, the
second
`LedgerInfo.properties` slot that duplicated PIP-404, and
`ManagedCursorInfo.bookieClusterName`) and keep only one (
`PositionInfo.properties`).

### Reserved Property Key

```java
package org.apache.pulsar.common.protocol;

public final class ReservedLedgerPropertyKeys {
/** Reserved key for per-ledger Bookie-cluster attribution (PIP-477). */
public static final String BOOKIE_CLUSTER_NAME = "_pulsar.bookieClusterName"
;
/** All keys under this prefix are reserved for Pulsar core/internal use. */
public static final String RESERVED_PREFIX = "_pulsar.";
}
```

Defined in core so that any other component (and any other plugin) sees the
reservation. **This is the only "core type" addition outside the three SPI
hooks above** — and it is a constant, not behavior.

## CLI Commands (Plugin-Provided via PIP-201)

The plugin ships a `CustomCommandFactory` NAR. After dropping the NAR into
`pulsar-admin`'s extension directory, the following commands appear:

```bash
# Cluster lifecycle
pulsar-admin bookie-clusters list
pulsar-admin bookie-clusters get --name <cluster>
pulsar-admin bookie-clusters register --name <cluster>
--metadata-service-uri <uri> [--status STANDBY]
pulsar-admin bookie-clusters unregister --name <cluster>

# Switch
pulsar-admin bookie-clusters precheck --name <newCluster>
pulsar-admin bookie-clusters switch --target <newCluster>
pulsar-admin bookie-clusters status

# Read mode override (post-switch)
pulsar-admin bookie-clusters set-read-mode --mode <DUAL_READ|
OLD_CLUSTER_ONLY|NEW_CLUSTER_ONLY>

# Old-cluster decommission
pulsar-admin bookie-clusters list-cleanup-targets
pulsar-admin bookie-clusters cleanup [--dry-run] [--src-cluster <old>]
[--max-deletions N] [--force]
pulsar-admin bookie-clusters close-client --name <oldCluster>
```

No change to `pulsar-admin` core. No change to `pulsar-client-tools`.

## REST API (Plugin-Provided via `AdditionalServlet`)

The plugin registers one `AdditionalServlet` with base path
`/admin/v2/bookie-clusters/`. All endpoints require superuser; when the
plugin is not installed, the path returns 404 (and the CLI returns a clear
"feature not installed" error).

| Method | Path | Purpose |
|--------|------|---------|
| `GET` | `/` | List registered clusters |
| `GET` | `/{clusterName}` | Get one cluster |
| `POST` | `/{clusterName}` | Register; rejects URI duplication (400) |
| `DELETE` | `/{clusterName}` | Unregister; rejects ACTIVE (409) |
| `POST` | `/switch` | Trigger switch (inline precheck) |
| `GET` | `/status` | Active cluster, read mode, orchestrator progress |
| `POST` | `/read-mode` | Manually override read mode |
| `GET` | `/precheck/{clusterName}` | Validate idgen-long advancement |
| `GET` | `/cleanup-targets` | List old-cluster ledgers eligible for
deletion |
| `POST` | `/cleanup` | Delete old-cluster ledgers |
| `POST` | `/internal/promote-cursor` | Hidden RPC: coordinator → owner |
| `POST` | `/close-client/{clusterName}` | Safely close old BK client |

No changes to `pulsar-broker`'s `org.apache.pulsar.broker.admin.v2` package.

## Safety Guarantees

### Why This Approach is Safe

1. **Single owner per ledger.** Each ledger carries exactly one attribution
value at any moment. No dual-write to two clusters of the same `ledgerId`
simultaneously (BUILD writes to a dormant copy invisible to readers).
2. **No bulk data movement.** Topic Data Ledgers — the largest and most
active dataset — are not migrated. Blast radius is bounded to Schema and
Cursor Ledgers (KB to MB each).
3. **Atomic per-ledger cutover.** PROMOTE is a single ZK CAS per Schema
znode (covering all versions) or per Cursor znode. No "half-promoted" state.
4. **Reversible.** Within the rollback window, rollback is a constant-time
CAS flip — no data movement.
5. **`ledgerId` preservation.** Schema/Cursor migration preserves `ledgerId`
end-to-end (invariant **I8**). Broker-ZK business keys
(`cursorsLedgerId`, `position.ledgerId`) are byte-for-byte unchanged.

### Data Plane Continuity

| Operation | During Switch | During BUILD/PROMOTE | After CLEANUP |
|-----------|---------------|---------------------|---------------|
| Publish to existing topic | ✅ Continues | ✅ Continues | ✅ Continues
(writes go to new cluster) |
| Consume from existing topic | ✅ Continues | ✅ Continues (read routed by
attribution) | ✅ Continues |
| markDelete (cursor advance) | ✅ Continues | ✅ Continues (orthogonal to
PROMOTE CAS) | ✅ Continues |
| putSchema | ✅ Continues | ✅ Continues (concurrent with PROMOTE;
CAS-serialized) | ✅ Continues |
| Create new topic | ✅ Continues | ✅ Continues (new ledgers stamped
newCluster) | ✅ Continues |
| BookKeeper ledger writes | ✅ Continues | ✅ Continues | ✅ Continues |

**No publish/consume pause.** New writes route to the new cluster
immediately upon switch; reads of historical data continue against the old
cluster until that ledger is migrated or its retention expires.

### Critical Invariants

| # | Invariant | Violation Consequence |
|---|-----------|-----------------------|
| **I1** | New ledger's attribution property is written in the same
Broker-ZK CAS as `ledgerId` / `cursorsLedgerId` / `position` |
Misattribution → reads routed to wrong cluster |
| **I2** | Old ledgers remain readable from the old cluster (if alive) at
every phase | Production incident |
| **I3** | Every Broker-ZK metadata update uses CAS | Lost update |
| **I4** | Within a single read, BK client choice is consistent at the
entry level (resolver consulted once at handle open) | `NoSuchLedger` /
partial read |
| **I5** | Old BK client closure must be preceded by confirmation of no
active reads | Cascading read failures |
| **I6** | Broker restart accurately recovers switch state from Broker-ZK |
Startup anomaly |
| **I7** | New proto field `PositionInfo.properties` obeys proto2 `optional`
/`repeated` compatibility | Compatibility break |
| **I8** | Migration preserves `ledgerId` end-to-end; entries copied with
same `entryId`; `customMetadata` preserved | Routing chaos / consumer
divergence |
| **I9** | Cursor PROMOTE CAS changes ONLY the attribution entry in
`cursorProperties`; never `cursorsLedgerId`, `markDeleteLedgerId`,
`markDeleteEntryId`, or any other property | Consumption position lost or
rewound |
| **I10** | Schema PROMOTE CAS changes ONLY the attribution entry in
`PositionInfo.properties`; never `position.ledgerId/entryId`, `info.version`,
`info.hash` | Rollback impossible / historical reads broken |
| **I11** | New cluster's `idgen-long` advanced past `max(active ledgerId
on old cluster)` before switch | New ledger creation collides with migrated
`ledgerId` |
| **I12** | At any moment, Broker-ZK has exactly ONE cluster with
`status=ACTIVE` | Split-brain on `activeClusterName` resolution |
| **I13** | Cursor PROMOTE CAS must be executed by topic owner inside
`ManagedCursorImpl` lock; coordinator forwards via RPC with
`expectedCursorsLedgerId` | Owner's in-memory `ManagedCursorInfo`
desynchronizes from ZK |
| **I14** | Schema BUILD/PROMOTE/CLEANUP all run on the coordinator broker
centrally (no "owner" concept for schema) | Unnecessary cross-broker
scheduling complexity |
| **I15** | Delete path AND first-open of last ledger route by per-ledger
attribution (`ManagedLedgerConfig.bookKeeperResolver` /
`cursorBookKeeperResolver`), not the constructor-captured `bookKeeper` |
`NoSuchLedger` → orphan bytes / topic load failure |
| **I16** | The reserved prefix `_pulsar.` in `LedgerInfo.properties` /
`cursorProperties` / `PositionInfo.properties` is owned by Pulsar core;
plugins must not write keys under this prefix unless documented | Property
collision between plugins |

## Configuration

### `broker.conf` Additions (core; opt-in)

| Property | Type | Default | Description |
|----------|------|---------|-------------|
| `managedLedgerStorageClassName` | String | `ManagedLedgerClientFactory`
(default impl) | **Set to
`org.apache.pulsar.ext.bookieswitch.MultiClusterManagedLedgerStorage` to
enable this feature.** Existing PIP-384 hook, no new config. |
| `additionalServlets` | comma list | `""` | Add `bookie-clusters-admin` to
enable the REST API. Existing hook. |
| `additionalServletDirectory` | String | `./plugins` | Where the plugin
NAR is located. Existing hook. |

**No new `broker.conf` keys are required by core.** All plugin-specific
configuration lives under a single namespace owned by the plugin
(see below).

### Plugin Configuration (read by the plugin from `broker.conf`)

| Property | Type | Default | Dynamic | Description |
|----------|------|---------|---------|-------------|
| `bookieClusterSwitch.configPath` | String | `/admin/bookie-clusters` | no
| Broker-ZK path for cluster registry |
| `bookieClusterSwitch.currentClusterName` | String | `""` | no | Optional
tier-2 fallback when Broker-ZK has no `ACTIVE` cluster |
| `bookieClusterSwitch.skipIdgenPrecheck` | boolean | `false` | yes |
Bypass invariant I11 precheck. **Lab/staging only.** |
| `bookieClusterSwitch.dualReadTimeoutDays` | int | `15` | yes | DUAL_READ
retention window after a switch |
| `bookieClusterSwitch.migrationConcurrency` | int | `16` | yes | Bound on
parallel BUILD operations |
| `bookieClusterSwitch.rollbackWindowHours` | int | `24` | yes | Time
window during which rollback is permitted |
| `bookieClusterSwitch.oldLedgerReadTimeoutMs` | int | `5000` | yes | Read
timeout for ledgers attributed to an old cluster |

These are read by the plugin via `ServiceConfiguration.getProperties()`; no
new fields are added to `ServiceConfiguration`.

### No Configuration Change Required to Start a Switch

Once the plugin NAR is installed and `managedLedgerStorageClassName` is
pointed at it, **no further `broker.conf` change is needed** to perform a
switch — registration, switch trigger, cleanup, and decommission are all
done via CLI. Brokers continue using their existing
`bookkeeperMetadataServiceUri`
as the implicit source cluster.

## Backward & Forward Compatibility

- **Brokers without the plugin** are byte-for-byte unmodified Pulsar (the
three SPI hooks default to no-op when unset).
- **Brokers with the plugin but `managedLedgerStorageClassName` left at
default**
are byte-for-byte unmodified Pulsar.
- **Brokers with the plugin enabled** read pre-existing ledgers correctly:
ledgers lacking the attribution property fall back to the active cluster
(which equals the original cluster, since no switch has occurred yet for
those ledgers).
- **Rolling downgrade from plugin-enabled back to plugin-absent:**
- `LedgerInfo.properties[_pulsar.bookieClusterName]` is preserved as an
unknown property and ignored by the legacy code path (the legacy code
uses the singleton BK client, which is the active cluster — correct as
long as no switch has ever happened).
- Switching back after a switch has occurred and CLEANUP has run is fine:
all live ledgers are on `newCluster`, which is the only cluster the
legacy code knows about.
- Switching back **during** the dual-read window is unsafe (ledgers on
`oldCluster` would be opened on `newCluster`'s BK client and fail).
The operator must `cleanup --force` first or wait for the window.
- **`SchemaStorageFormat.PositionInfo.properties`**: proto2 `repeated`
additive field. Old brokers preserve it via `UnknownFieldSet`.

## Rollout Plan

| Stage | Action | Verification |
|-------|--------|--------------|
| **P0** | Land the three core SPI hooks (S1, S2, S3) + the constants
class, with `managedLedgerStorageClassName` default unchanged | Zero
behavioral change in any existing test |
| **P1** | Release the plugin NAR as a separate Apache-licensed artifact |
Plugin builds against released Pulsar API |
| **P2** | Test cluster: install NAR; register a STANDBY cluster; do not
switch | Watcher healthy; `precheck` passes; no observable change to
ongoing traffic |
| **P3** | Test cluster: trigger switch | Attribution stamped on all new
ledgers; existing `position.ledgerId/entryId` byte-for-byte unchanged |
| **P4** | Simulate Scenario B (old cluster alive): full
BUILD/PROMOTE/CLEANUP | Schema/Cursor migration completes with **identical
`ledgerId` on both clusters**; old reads route correctly throughout |
| **P5** | Simulate Scenario A (old cluster down): forced cutover |
Already-migrated Schema/Cursor still readable; not-yet-migrated reports
clean errors with structured retry-after info |
| **P6** | Test rollback within window | Broker-ZK business znodes
byte-for-byte unchanged; reads route back to old cluster |
| **P7** | Production canary: small-traffic, degradable services first | ≥
24h observation between steps |
| **P8** | (Future, optional) If the plugin proves broadly valuable,
promote it into the Pulsar tree as a shipped module, still opt-in via
configuration | — |

## Alternatives Considered

### Alt-1: Inline the feature in core (the original PIP-477 draft)

- **Pros.** Single binary; no NAR loading; simpler operator story for the
feature itself.
- **Cons.** Adds ~5,000 LOC, four new proto fields, a new ZK subtree, a new
REST package, and new CLI subcommands to `pulsar-broker` /
`pulsar-client-tools` that the ~99% of users who never switch BK clusters
must ship and maintain. Couples BK cluster switching tightly to
Pulsar's release cycle. Duplicates the PIP-404 properties slot.
**Rejected** in favor of the plugin-first approach.

### Alt-2: Side znode `/admin/bookie-clusters/ledger-attribution/...`

- **Pros.** Avoids any new proto field.
- **Cons.** Introduces a dual source of truth (znode vs. ledger metadata)
and a cache-coherence problem (broker restart, znode loss). **Rejected.**

### Alt-3: Per-topic only (use PIP-384's persistence policy verbatim)

- **Pros.** Zero new code beyond PIP-384.
- **Cons.** Requires migrating every existing topic via topic-level policy
change before the switch — not feasible for clusters with millions of
topics; loses the "history stays on old cluster" property; cannot serve
the elastic-storage spillover use case. **Rejected.**

### Alt-4: Encode attribution as a custom `KeyValue` in BookKeeper's
`customMetadata` instead of Pulsar's ledger metadata

- **Pros.** Decouples from any Pulsar proto.
- **Cons.** Requires *opening* the ledger to know its cluster, which is
circular (we need the cluster to know which BK to open it on). Stamps are
unobservable from Broker-ZK alone. **Rejected.**

## References

- [PIP-45: Pluggable Metadata Interface](
https://github.com/apache/pulsar/wiki/PIP-45%3A-Pluggable-metadata-interface
)
- [PIP-201: Extensions mechanism for Pulsar Admin CLI tools](./pip-201.md)
- [PIP-384: ManagedLedger interface decoupling](./pip-384.md)
- [PIP-404: Introduce per-ledger properties](./pip-404.md)
- `AdditionalServlet` SPI —
`pulsar-broker-common/.../web/plugin/servlet/AdditionalServlet.java`
- [BookKeeper `asyncCreateLedgerAdv`](
https://bookkeeper.apache.org/docs/api/javadoc/org/apache/bookkeeper/client/BookKeeper.html#createLedgerAdv-long-int-int-int-org.apache.bookkeeper.client.BookKeeper.DigestType-byte:A-)
— preserves `ledgerId` across clusters
- [BookKeeper `LedgerHandleAdv.addEntry(entryId, data)`](
https://bookkeeper.apache.org/docs/api/javadoc/org/apache/bookkeeper/client/LedgerHandleAdv.html)
— preserves `entryId` ordering
- `MLDataFormats.proto` — managed-ledger metadata schema (reused, unchanged)
- `SchemaStorageFormat.proto` — schema storage metadata schema (one new
field)

Reply via email to