dao-jun commented on code in PR #25774:
URL: https://github.com/apache/pulsar/pull/25774#discussion_r3241892479
##########
pip/pip-470.md:
##########
@@ -0,0 +1,587 @@
+# PIP-470: Dynamic Bookie Cluster Switching for Brokers
+
+## Motivation
+
+Apache Pulsar's broker layer is bound to a fixed BookKeeper (Bookie) cluster
at startup time through the `bookkeeperMetadataServiceUri` configuration. This
static binding becomes a bottleneck in several real-world operational scenarios:
+
+1. **Slow Failure Recovery**: When a Bookie cluster experiences capacity
exhaustion, sustained latency degradation, or non-recoverable failures, the
only mitigation today is to wait for the Bookie cluster to recover. In
elastic-storage deployments where a single Bookie cluster backs 100+ broker
clusters, the blast radius is enormous.
+
+2. **No Capacity Escape Hatch**: When a Bookie cluster approaches its capacity
ceiling, brokers cannot "spill over" to a fresh Bookie cluster. Operators must
scale the existing cluster in place, which is slow and risky.
+
+3. **No Runtime Re-targeting**: There is no supported runtime mechanism to
migrate brokers from one Bookie cluster to another without restart and full
data migration.
+
+4. **Data Plane Coupling**: Bookie cluster switching deals with the data
plane: terabytes of ledger bytes, ongoing publishes, ongoing acknowledgments,
and live cursors. A copy-everything-then-cutover approach is infeasible.
+
+This PIP proposes a **safe, ledger-attribution-driven framework** that allows
operators to switch a broker cluster's underlying Bookie cluster at runtime,
with **zero data-plane downtime** and **per-ledger routing** that preserves the
readability of historical data on the old cluster.
+
+The framework enables:
+
+- **Zero-downtime switching** — publish and consume continue without
interruption
+- **Per-ledger routing** — each ledger carries its own cluster attribution;
reads are routed to the cluster that physically holds the bytes
+- **No bulk data migration** — Topic Data Ledgers are NOT migrated; they age
out naturally via TTL/Retention
+- **Targeted migration of small, long-lived metadata ledgers** — only Schema
Ledgers and Cursor Ledgers are migrated, while preserving their original
`ledgerId`
+- **Automatic rollback** within a configurable window
+- **Operator-driven, not auto-failover** — avoids split-brain
+
+## Goal
+
+Provide a runtime mechanism by which a broker cluster can be re-pointed from
one Bookie cluster (`oldCluster`) to a new Bookie cluster (`newCluster`)
without service interruption, while preserving the ability to read historical
data that physically resides on `oldCluster`.
+
+### In Scope
+
+- Per-broker-cluster configuration of multiple registered Bookie clusters via
Broker-ZK metadata
+- A `DynamicBookKeeperClientFactory` that holds multiple BK clients
simultaneously
+- Per-ledger cluster attribution stored in proto fields
(`LedgerInfo.bookieClusterName`, `ManagedCursorInfo.bookieClusterName`,
`PositionInfo.bookieClusterName`)
+- Read-path routing keyed by attribution field (`BookieClusterReadRouter`)
+- Asynchronous post-switch migration of Schema and Cursor Ledgers,
**preserving the original `ledgerId`** via
`BookKeeper.asyncCreateLedgerAdv(ledgerId, ...)`
+- CLI (`pulsar-admin bookie-clusters`) + REST API + Java Admin SDK
(three-layer consistency)
+- Automatic rollback within a configurable window
+- Support for both **co-located deployments** (Broker-ZK and Bookie-ZK share
the same physical ZK ensemble with different chroots) and **split deployments**
+
+### Out of Scope
+
+- **Topic Data Ledger migration**: Topic data is NOT migrated; old Topic Data
Ledgers age out via the topic's TTL/Retention policy. Old data is read from the
old cluster via attribution-based routing for as long as the old cluster is
reachable.
+- **Automatic failure detection and switch triggering**: Switching is
operator-initiated to avoid split-brain.
+- **Cross-cluster (geo-replicated) coordination**: Each broker cluster
switches its own Bookie cluster independently.
+- **BookKeeper-cluster-side internal migration tools**: This PIP only
orchestrates from the broker side; recovering data within a Bookie cluster
(e.g., underreplication) is delegated to BookKeeper itself.
+- **Changing `metadataStoreUrl` (Broker-ZK)**: Only the Bookie cluster
(`bookkeeperMetadataServiceUri`) is switched; Broker-ZK remains the same
throughout.
+
+## High Level Design
+
+The framework introduces three new long-lived broker components:
+
+1. **`DynamicBookKeeperClientFactory`** — Maintains a `Map<clusterName,
BookKeeper>` and exposes the currently active client. Replaces the singleton
`BookKeeper` previously held by `ManagedLedgerClientFactory`.
+2. **`BookieClusterReadRouter`** — On every read, resolves the target cluster
from the per-ledger attribution field and returns the matching BK client.
+3. **`BookieClusterSwitchOrchestrator`** — Drives the post-switch state
machine (`BUILD → PROMOTE → CLEANUP`) for Schema and Cursor Ledgers.
+
+### Key Principles
+
+| Principle | Description |
+|-----------|-------------|
+| **BookieClusterName as Single Source of Truth** | At ledger creation time,
the active cluster name is "stamped" into the ledger's metadata. Reads are
routed by that stamp. No history table, no boundary heuristics. |
+| **Attribution Field, Not Side ZNode** | The attribution lives directly
inside `MLDataFormats.proto`'s `LedgerInfo` / `ManagedCursorInfo` and
`SchemaStorageFormat.proto`'s `PositionInfo`. No
"/admin/bookie-clusters/schema-ledger-clusters" side path is introduced. This
eliminates dual-source-of-truth and cache-coherence risks. |
+| **Topic Data: Don't Migrate** | Topic Data Ledgers have TTL/Retention; let
them age out. Switching does not move terabytes of data. |
+| **Schema / Cursor: Migrate, but Preserve `ledgerId`** | Schema/Cursor are
tied to topic/subscription lifecycles and cannot be left behind. We use
`asyncCreateLedgerAdv(ledgerId, ...)` to recreate the **same `ledgerId`** in
the new cluster, copy entries with **identical `entryId`**, then CAS only the
`bookieClusterName` field. **`cursorsLedgerId`, `position.ledgerId`,
`position.entryId` are byte-for-byte unchanged** in Broker-ZK. |
+| **Operator-Driven, Single Active Cluster** | Switching is triggered by
`pulsar-admin bookie-clusters switch`; auto-failover is rejected. At any moment
exactly one cluster is `status=ACTIVE` in Broker-ZK. |
+| **Broker-ZK is the Switch Source-of-Truth** | All cluster registrations and
the switch directive live in Broker-ZK. No Global-ZK, no external coordinator. |
+| **Minimum proto Change** | Only `optional` fields are added
(`bookieClusterName`, `properties`). proto2 forward/backward compatibility
holds. |
+
+## Detailed Design
+
+### Switch Phases
+
+```
+NOT_REGISTERED
+ ↓
+[Operator: pulsar-admin bookie-clusters register --name newCluster --uri ...
--status STANDBY]
+ ↓
+REGISTERED (STANDBY)
+ ↓
+[Operator: pulsar-admin bookie-clusters switch --target newCluster]
+ ↓ (POST /switch internally invokes /precheck for idgen-long, see I11)
+SWITCH_TRIGGERED ← Coordinator broker writes
/admin/bookie-clusters/switch-target
+ ↓
+LIVE_DUAL_READ ← All brokers' watchers fire; new writes go to newCluster;
+ ↓ reads still go to oldCluster for ledgers attributed to it
+BUILD_PROMOTE_CLEANUP
+ ├── BUILD ← Coordinator centrally copies Schema/Cursor ledger bytes to
newCluster
+ │ with the SAME ledgerId; entries copied with the SAME entryId
+ ├── PROMOTE ← Schema: coordinator CAS-rewrites SchemaLocator (changes
only
+ │ position.bookieClusterName)
+ │ Cursor: coordinator forwards POST /internal/promote-cursor
+ │ to topic owner broker; owner does CAS in ManagedCursorImpl
lock
+ └── CLEANUP ← After rollback window, delete old-cluster copies; promote
+ Broker-ZK status: oldCluster→DEPRECATED, newCluster→ACTIVE
+ ↓
+DONE / DONE_WITH_FAILURES
+
+(Operator-triggered, within rollback window):
+ROLLBACK ← CAS attribution fields back to oldCluster; new-cluster copies are
+ deleted asynchronously
+```
+
+### Phase 1: Cluster Registration
+
+```bash
+pulsar-admin bookie-clusters register \
+ --name new-cluster \
+ --metadata-service-uri zk+null://new-zk:2181/ledgers-v2 \
+ --status STANDBY
+```
+
+The `BookieClusterConfigManager` validates that:
+- The `metadata-service-uri` does not collide with any already-registered
cluster (prevents accidental aliasing in the BK client map).
+- For co-located deployments, the chroot must differ from existing clusters.
+
+### Phase 2: Pre-Switch Validation (`precheck`)
+
+```bash
+pulsar-admin bookie-clusters precheck --name new-cluster
+```
+
+`IdgenPrecheckService` verifies invariant **I11**: the new cluster's
`/ledgers/idgen-long` must be advanced beyond `max(ledgerId)` of the source
cluster. Otherwise, `asyncCreateLedgerAdv(ledgerId, ...)` during BUILD will
collide with newly-allocated ledgerIds. The precheck returns a structured
result:
+
+```json
+{
+ "ready": true,
+ "idgenAdvancedBeyondSource": true,
+ "sourceClusterName": "old-cluster",
+ "targetClusterName": "new-cluster",
+ "sourceMaxHighOrderBit": 12345,
+ "targetMaxHighOrderBit": 1012345
+}
+```
+
+`POST /switch` invokes `precheck` inline; failure returns HTTP 409 (bypassable
via `bookieClusterSwitchSkipIdgenPrecheck=true` in lab/staging only).
+
+### Phase 3: Switch Trigger
+
+```bash
+pulsar-admin bookie-clusters switch --target new-cluster
+```
+
+The receiving broker (origin) writes the `switch-target` znode in Broker-ZK
with CAS:
+
+```
+/admin/bookie-clusters/switch-target
+ body: "new-cluster\norigin=https://broker-1:8080"
+```
+
+All brokers' `BookieClusterConfigWatcher` fire, executing the callback chain:
+
+1. `DynamicBookKeeperClientFactory.switchToCluster(newCluster)` — hot-swap
`activeClusterName`; new BK client lazily created if not yet present
+2. First switch: `BookieClusterReadRouter.enableDualRead()` (records
`initialClusterName`, `switchLedgerIdBoundary`, `switchTimestampMillis`)
+ Subsequent switches: `BookieClusterReadRouter.recordSwitchEpoch()` (only
refreshes `switchTimestampMillis`)
+3. `BookieClusterSwitchMetrics.recordSwitchSuccess()`
+4. **Origin broker only**:
`BookieClusterSwitchOrchestrator.runStateMachine(BUILD → PROMOTE → CLEANUP)`
+
+Non-origin brokers do not start the orchestrator. If the `switch-target` znode
body lacks the `origin=` line (e.g., legacy or operator-edited), brokers fall
back to leader election to pick exactly one orchestrator.
+
+### Phase 4: BUILD (Schema + Cursor Ledger Bytes Copy)
+
+The orchestrator runs **two stages in parallel** with bounded concurrency
(default 16):
+
+#### Schema Ledger BUILD (centralized)
+
+```
+for each schemaId in /schemas/* (parallel, bounded):
+ locator = brokerZk.get("/schemas/<schemaId>")
+ for each indexEntry in locator.indexList:
+ ledgerId = indexEntry.position.ledgerId
+ cluster = indexEntry.position.bookieClusterName ?? activeCluster
+ if cluster == oldCluster:
+ LedgerCopyUtil.copyLedgerPreservingIds(
+ ledgerId, oldBk, newBk,
+ customMetadata, // identical to source ledger
+ ensembleSize, qw, qa)
+ // Internally:
+ // newBk.asyncCreateLedgerAdv(ledgerId, ...) ← SAME ledgerId
+ // for entryId in [0, lastAddConfirmed]:
+ // newLh.addEntry(entryId, srcEntry) ← SAME entryId
+ // newLh.close()
+```
+
+#### Cursor Ledger BUILD (centralized)
+
+Identical pattern, scanning `/managed-ledgers/<topic>/<cursorName>` znodes,
copying each `cursorsLedgerId`.
+
+**Why BUILD is safe to centralize**:
+1. Reads of the old ledger are read-only — no contention with owner brokers'
write paths.
+2. Writes to the new cluster are to a **dormant copy** — no online reader sees
it yet (attribution is still `oldCluster`).
+3. The orchestrator uses dedicated BK clients with isolated thread pools.
+
+#### Idempotency on `LedgerExistException`
+
+If `asyncCreateLedgerAdv(ledgerId, ...)` returns `LedgerExistException` (BUILD
retry / partial prior run), `LedgerCopyUtil.verifyDstMatchesSrc` opens both
ledgers and compares `getLastAddConfirmed()` and `getLength()`. If both match,
the existing ledger is treated as a successful idempotent copy
(`migration_ledger_id_conflict_total` is incremented for visibility). Mismatch
→ record as `failedLedgers`; attribution remains on the old cluster.
+
+### Phase 5: PROMOTE (CAS Attribution Field)
+
+#### Schema PROMOTE (centralized)
+
+```
+for each schemaId built successfully:
+ retry up to MAX_PROMOTE_RETRIES (5):
+ (locator, version) = brokerZk.getWithVersion("/schemas/<schemaId>")
+ updated = locator.toBuilder()
+ .clearIndex()
+ .addAllIndex(locator.indexList.map { ie ->
+ if (ie.position.bookieClusterName == oldCluster):
+ ie.toBuilder().setPosition(
+ ie.position.toBuilder()
+ .setBookieClusterName(newCluster) // ★ ONLY field
changed
+ .build()
+ ).build()
+ else: ie // already newCluster (concurrent putSchema) — leave
alone
+ })
+ .build()
+ try brokerZk.put("/schemas/<schemaId>", updated.toByteArray(), version)
+ catch BadVersionException: continue // re-read and retry
+```
+
+**Crucially**, `position.ledgerId`, `position.entryId`, `info.version`, and
`info.hash` are **byte-for-byte unchanged** (invariant **I10**). Only
`bookieClusterName` is rewritten.
+
+#### Cursor PROMOTE (forwarded to topic owner)
+
+Cursor is more delicate because `ManagedCursorImpl` is a **stateful object
owned by exactly one broker** (the topic owner) and holds an in-process lock
that serializes markDelete flushes. Letting the orchestrator broker write
`ManagedCursorInfo` directly would race with the owner's pending markDelete CAS.
+
+```
+[orchestrator broker]
+ for each (topic, cursorName, cursorsLedgerId) built successfully:
+ ownerUrl = namespaceService.findOwnerHttpUrl(topic)
+ POST {ownerUrl}/admin/v2/bookie-clusters/internal/promote-cursor
+ body: {topic, cursorName, targetClusterName=newCluster,
+ expectedCursorsLedgerId=cursorsLedgerId}
+
+[owner broker]
+ cursor = brokerService.getTopic(topic).getCursor(cursorName)
+ synchronized (cursor):
+ if cursor.cursorsLedgerId != expectedCursorsLedgerId:
+ return 409 // cursor rolled over since BUILD; coordinator records as
+ // promotedViaRolloverFallback (the new cursorsLedgerId
+ // was already stamped into newCluster at rollover time
+ // by the writer-side attribution stamping)
+ metaStore.asyncUpdateCursorInfo(
+ cursor.path,
+ ManagedCursorInfo.newBuilder(cursor.managedCursorInfo)
+ .setBookieClusterName(newCluster) // ★ ONLY field changed
+ .build(),
+ expectedZkVersion)
+ // BadVersion → re-read; bookieClusterName is orthogonal to
+ // markDeleteLedgerId/markDeleteEntryId, so retry converges.
+```
+
+**Invariant I9**: PROMOTE must change only `bookieClusterName`;
`cursorsLedgerId`, `markDeleteLedgerId`, `markDeleteEntryId`, individual
deleted ranges, batched deletion indices, and cursor properties are preserved.
+
+#### Concurrent putSchema and PROMOTE
+
+For Schema, concurrent `putSchema` writes are naturally serialized via the
`/schemas/<schemaId>` znode CAS:
+- If `putSchema` lands first, PROMOTE re-reads and sees the newly appended
`IndexEntry` (already stamped `newCluster` because writes go to the active BK
client) — leaves it alone, rewrites the rest.
+- If PROMOTE lands first, `putSchema` re-reads and appends a new `IndexEntry`
with `newCluster` stamp.
+
+Either way, the converged state is "all entries stamped `newCluster`".
+
+### Phase 6: CLEANUP
+
+After the rollback window (`bookieClusterDualReadTimeoutDays`, default 15)
elapses, or upon explicit `pulsar-admin bookie-clusters cleanup`:
+
+1. Delete old-cluster ledgers via `oldBk.asyncDeleteLedger(ledgerId)`.
+2. CAS-flip Broker-ZK status: `oldCluster.status = DEPRECATED`,
`newCluster.status = ACTIVE` (invariant **I12**, two sequential CAS —
`MetadataStore` API does not expose multi-op).
+3. Optionally close the old BK client via `pulsar-admin bookie-clusters
close-client --name oldCluster`.
+
+`POST /admin/v2/bookie-clusters/cleanup` accepts:
+
+```json
+{
+ "dryRun": true, // default; preview only
+ "srcClusterFilter": "...", // optional; restrict to one source cluster
+ "maxDeletions": 0, // 0 = unlimited
+ "force": false // bypass dual-read-window guard (lab only)
+}
+```
+
+### Phase 7: ROLLBACK (within window)
+
+If issues are detected, the operator triggers rollback. The reverse-CAS flips
`bookieClusterName` back to `oldCluster` for affected Schema/Cursor;
new-cluster ledgers are scheduled for delayed deletion. Because **`ledgerId`
and `entryId` were never touched** in Broker-ZK, rollback is a constant-time
metadata flip — no data movement.
+
+### Failure Handling
+
+| Stage | Failure | Behavior |
+|-------|---------|----------|
+| precheck | idgen-long not advanced | HTTP 409; switch refused |
+| Switch trigger | New BK client init fails | `activeClusterName` stays on
old; switch rejected |
+| BUILD | Source ledger unreadable | Record in `failedLedgers`; attribution
stays on old cluster (still readable) |
+| BUILD | `LedgerExistException` + LAC mismatch | Record in `failedLedgers`;
do not PROMOTE; operator can retry switch |
+| PROMOTE | CAS BadVersion (5 retries exhausted) | Log warn; not counted in
`failedLedgers`; next `switch --target <same>` retries (orchestrator is
idempotent) |
+| PROMOTE | Cursor owner returns 409 (rollover) | Recorded as
`promotedViaRolloverFallback`; old `cursorsLedgerId` ages out naturally |
+| CLEANUP | `promoteActiveCluster` CAS fails | Orchestrator finishes in `DONE`
/ `DONE_WITH_FAILURES`; in-memory `activeClusterName` is correct, but Broker-ZK
ACTIVE may diverge → operator runs `register --status ACTIVE` to reconcile |
+
+## Implementation Details
+
+### Data Structures
+
+#### Cluster Registry (Broker-ZK)
+
+```
+/admin/bookie-clusters/ ← bookieClusterConfigPath (configurable)
+ ├── old-cluster ← znode body = JSON
+ │ {
+ │ "name": "old-cluster",
+ │ "metadataServiceUri": "zk://old-zk:2181/ledgers",
+ │ "status": "ACTIVE" // exactly ONE cluster ACTIVE at a time
+ │ }
+ └── new-cluster
+ {
+ "name": "new-cluster",
+ "metadataServiceUri": "zk+null://new-zk:2181/ledgers-v2",
+ "status": "STANDBY"
+ }
+
+/admin/bookie-clusters/switch-target ← written on switch
+ body: "new-cluster\norigin=https://broker-1:8080"
+```
+
+#### proto Changes
+
+**`managed-ledger/src/main/proto/MLDataFormats.proto`**:
+
+```protobuf
+message ManagedLedgerInfo {
+ message LedgerInfo {
+ required int64 ledgerId = 1;
+ optional int64 entries = 2;
+ optional int64 size = 3;
+ optional int64 timestamp = 4;
+ optional OffloadContext offloadContext = 5;
+ optional string bookieClusterName = 6; // ★ new (Topic Data Ledger
attribution)
+ repeated KeyValue properties = 7; // ★ new (extension slot)
Review Comment:
we already have the field `properties` in PIP-404. And maybe we can also use
the field to store `bookieClusterName`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]