Sounds good! I added a few comments and questions on the PR itself. -Lari
On 2026/01/29 20:04:00 Matteo Merli wrote: > https://github.com/apache/pulsar/pull/25196 > > ``` > > # PIP-454: Metadata Store Migration Framework > > ## Motivation > > Apache Pulsar currently uses Apache ZooKeeper as its metadata store for > broker coordination, topic metadata, namespace policies, and BookKeeper > ledger management. While ZooKeeper has served well, there are several > motivations for enabling migration to alternative metadata stores: > > 1. **Operational Simplicity**: Alternative metadata stores like Oxia may > offer simpler operations, better observability, or reduced operational > overhead compared to ZooKeeper ensembles. > > 2. **Performance Characteristics**: Different metadata stores have > different performance profiles. Some workloads may benefit from stores > optimized for high throughput or low latency. > > 3. **Deployment Flexibility**: Organizations may prefer metadata stores > that align better with their existing infrastructure and expertise. > > 4. **Zero-Downtime Migration**: Operators need a safe, automated way to > migrate metadata between stores without service interruption. > > Currently, there is no supported path to migrate from one metadata store to > another without cluster downtime. This PIP proposes a **safe, simple > migration framework** that ensures metadata consistency by avoiding complex > dual-write/dual-read patterns. The framework enables: > > - **Zero-downtime migration** from any metadata store to any other > supported store > - **Automatic ephemeral node recreation** in the target store > - **Version preservation** to ensure conditional writes continue working > - **Automatic failure recovery** if issues are detected > - **Minimal configuration changes** - no config updates needed until after > migration completes > > ## Goal > > Provide a safe, automated framework for migrating Apache Pulsar's metadata > from one store implementation (e.g., ZooKeeper) to another (e.g., Oxia) > with zero service interruption. > > ### In Scope > > - Migration framework supporting any source → any target metadata store > - Automatic ephemeral node recreation by brokers and bookies > - Persistent data copy with version preservation > - CLI commands for migration control and monitoring > - Automatic failure recovery during migration > - Support for broker and bookie participation > - Read-only mode during migration for consistency > > ### Out of Scope > > - Developing new metadata store implementations (Oxia, Etcd support already > exists) > - Cross-cluster metadata synchronization (different use case) > - Automated rollback after COMPLETED phase (requires manual intervention) > - Migration of configuration metadata store (can be done separately) > > ## High Level Design > > The migration framework introduces a **DualMetadataStore** wrapper that > transparently handles migration without modifying existing metadata store > implementations. > > ### Key Principles > > 1. **Transparent Wrapping**: The `DualMetadataStore` wraps the existing > source store (e.g., `ZKMetadataStore`) without modifying its implementation. > > 2. **Lazy Target Initialization**: The target store is only initialized > when migration begins, triggered by a flag in the source store. > > 3. **Ephemeral-First Approach**: Before copying persistent data, all > brokers and bookies recreate their ephemeral nodes in the target store. > This ensures the cluster is "live" in both stores during migration. > > 4. **Read-Only Mode During Migration**: To ensure consistency, all metadata > writes are blocked during PREPARATION and COPYING phases. Components > receive `SessionLost` events to defer non-critical operations (e.g., ledger > rollovers). > > 5. **Phase-Based Migration**: Migration proceeds through well-defined > phases (PREPARATION → COPYING → COMPLETED). > > 6. **Generic Framework**: The framework is agnostic to specific store > implementations - it works with any source and target that implement the > `MetadataStore` interface. > > 7. **Guaranteed Consistency**: By blocking writes during migration and > using atomic copy, metadata is **always in a consistent state**. No > dual-write complexity, no data divergence, no consistency issues. > > ## Detailed Design > > ### Migration Phases > > ``` > NOT_STARTED > ↓ > PREPARATION ← All brokers/bookies recreate ephemeral nodes in target > ← Metadata writes are BLOCKED (read-only mode) > ↓ > COPYING ← Coordinator copies persistent data source → target > ← Metadata writes still BLOCKED > ↓ > COMPLETED ← Migration complete, all services using target store > ← Metadata writes ENABLED on target > ↓ > After validation period: > * Update config and restart brokers & bookies > * Decommission source store > > (If errors occur): > FAILED ← Rollback to source store, writes ENABLED > ``` > > ### Phase 1: NOT_STARTED → PREPARATION > > **Participant Registration (at startup):** > Each broker and bookie registers itself as a migration participant by > creating a sequential ephemeral node: > - Path: `/pulsar/migration-coordinator/participants/id-NNNN` (sequential) > - This allows the coordinator to know how many participants exist before > migration starts > > **Administrator triggers migration:** > ```bash > pulsar-admin metadata-migration start --target oxia://oxia1:6648 > ``` > > **Coordinator actions:** > 1. Creates migration flag in source store: > `/pulsar/migration-coordinator/migration` > ```json > { > "phase": "PREPARATION", > "targetUrl": "oxia://oxia1:6648" > } > ``` > > **Broker/Bookie actions (automatic, triggered by watching the flag):** > 1. Detect migration flag via watch on > `/pulsar/migration-coordinator/migration` > 2. Defer non-critical metadata writes (e.g., ledger rollovers, bundle > ownership changes) > 3. Initialize connection to target store > 4. Recreate ALL ephemeral nodes in target store > 5. **Delete** participant registration node to signal "ready" > > **Coordinator waits for all participant nodes to be deleted (indicating all > participants are ready)** > > ### Phase 2: PREPARATION → COPYING > > **Coordinator actions:** > 1. Updates phase to `COPYING` > 2. Performs recursive copy of persistent data from source → target: > - Skips ephemeral nodes (already recreated) > - Concurrent operations limited by semaphore (default: 1000 pending ops) > - Breadth-first traversal to process all paths > - Progress logged periodically > > **During this phase:** > - Brokers/bookies continue normal READ operations > - Metadata WRITES are BLOCKED (return failure) > - Ephemeral nodes remain alive in both stores > - All reads still go to source store > > **During this phase:** > - Metadata writes are BLOCKED (return error to clients) > - Metadata reads continue normally from source store > - **Data plane operations unaffected**: Publish/consume, ledger writes > continue normally > - Version-id and modification count preserved using direct Oxia client > - Breadth-first traversal with max 1000 concurrent operations > > **Estimated duration:** > - **< 30 seconds** for typical deployments with up to **500 MB of > metadata** in ZooKeeper > > **Impact on operations:** > - ✅ Existing topics: Publish and consume continue without interruption > - ✅ BookKeeper: Ledger writes and reads continue normally > - ✅ Clients: Connected producers and consumers unaffected > - ❌ Admin operations: Topic/namespace creation blocked temporarily > - ❌ Bundle operations: Load balancing deferred until completion > > ### Phase 3: COPYING → COMPLETED > > **Coordinator actions:** > 1. Updates phase to `COMPLETED` > 2. Logs success message with total copied node count > > **Broker/Bookie actions (automatic, triggered by phase update):** > 1. Detect `COMPLETED` phase > 2. Deferred operations can now proceed > 3. Switch routing: > - **Writes**: Go to target store only > - **Reads**: Go to target store only > > **At this point:** > - Cluster is running on target store > - Source store remains available for safety > - Metadata writes are enabled again > > **Operator follow-up (after validation period):** > 1. Update configuration files: > ```properties > # Before (ZooKeeper): > metadataStoreUrl=zk://zk1:2181,zk2:2181/pulsar > > # After (Oxia): > metadataStoreUrl=oxia://oxia1:6648 > ``` > 2. Perform rolling restart with new config > 3. After all services restarted, decommission source store > > ### Failure Handling: ANY_PHASE → FAILED > > **If migration fails at any point:** > 1. Coordinator updates phase to `FAILED` > 2. Broker/Bookie actions: > - Detect `FAILED` phase > - Discard target store connection > - Continue using source store > - Metadata writes enabled again > > **Operator actions:** > 1. Review logs to understand failure cause > 2. Fix underlying issue > 3. Retry migration with `pulsar-admin metadata-migration start --target > <url>` > > ## Implementation Details > > > ### Key Implementation Details: > > 1. **Direct Oxia Client Usage**: The coordinator uses `AsyncOxiaClient` > directly instead of going through `MetadataStore` interface. This allows > setting version-id and modification count to match the source values, > ensuring conditional writes (compare-and-set operations) continue to work > correctly after migration. > > 2. **Breadth-First Traversal**: Processes paths level by level using a work > queue, enabling high concurrency while preventing deep recursion. > > 3. **Concurrent Operations**: Uses a semaphore to limit pending operations > (default: 1000), balancing throughput with memory usage. > > ### Data Structures > > **Migration State** (`/pulsar/migration-coordinator/migration`): > ```json > { > "phase": "PREPARATION", > "targetUrl": "oxia://oxia1:6648/default" > } > ``` > > Fields: > - `phase`: Current migration phase (NOT_STARTED, PREPARATION, COPYING, > COMPLETED, FAILED) > - `targetUrl`: Target metadata store URL (e.g., `oxia://oxia1:6648/default`) > > **Participant Registration** > (`/pulsar/migration-coordinator/participants/id-NNNN`): > - Sequential ephemeral node created by each broker/bookie at startup > - Empty data (presence indicates participation) > - Deleted by participant when preparation complete (signals "ready") > - Coordinator waits for all to be deleted before proceeding to COPYING phase > > **No additional state tracking**: The simplified design removes complex > state tracking and checksums. Migration state is kept minimal. > > ### CLI Commands > > ```bash > # Start migration > pulsar-admin metadata-migration start --target <target-url> > > # Check status > pulsar-admin metadata-migration status > ``` > > The simplified design only requires two commands. Rollback happens > automatically if migration fails (phase transitions to FAILED). > > ### REST API > > ``` > POST /admin/v2/metadata/migration/start > Body: { "targetUrl": "oxia://..." } > > GET /admin/v2/metadata/migration/status > Returns: { "phase": "COPYING", "targetUrl": "oxia://..." } > ``` > > ## Safety Guarantees > > ### Why This Approach is Safe > > **The migration design guarantees metadata consistency by avoiding > dual-write and dual-read patterns entirely:** > > 1. **Single Source of Truth**: At any given time, there is exactly ONE > active metadata store: > - Before migration: Source store (ZooKeeper) > - During PREPARATION and COPYING: Source store (read-only) > - After COMPLETED: Target store (Oxia) > > 2. **No Dual-Write Complexity**: Unlike approaches that write to both > stores simultaneously, this design eliminates: > - Write synchronization issues > - Conflict resolution between stores > - Data divergence problems > - Partial failure handling complexity > > 3. **No Dual-Read Complexity**: Unlike approaches that read from both > stores, this design eliminates: > - Read consistency issues > - Cache invalidation across stores > - Stale data problems > - Complex fallback logic > > 4. **Atomic Cutover**: All participants switch stores simultaneously when > COMPLETED phase is detected. There is no ambiguous state where some > participants use one store and others use another. > > 5. **Fast Migration Window**: With **< 30 seconds** for typical metadata > sizes (even up to 500 MB), the read-only window is minimal and acceptable > for most production environments. > > **Bottom line**: Metadata is **always in a consistent state** - either > fully in the source store or fully in the target store, never split or > diverged between them. > > ### Data Integrity > > 1. **Version Preservation**: All persistent data is copied with original > version-id and modification count preserved. This ensures conditional > writes (compare-and-set operations) continue working after migration. > > 2. **Ephemeral Node Recreation**: All ephemeral nodes are recreated by > their owning brokers/bookies before persistent data copy begins. > > 3. **Read-Only Mode**: All metadata writes are blocked during PREPARATION > and COPYING phases, ensuring no data inconsistencies during migration. > > **Important**: Read-only mode only affects metadata operations. Data > plane operations continue normally: > - ✅ **Publishing and consuming messages** works without interruption > - ✅ **Reading from existing topics and subscriptions** works normally > - ✅ **Ledger writes to BookKeeper** continue unaffected > - ❌ **Creating new topics or subscriptions** will be blocked temporarily > - ❌ **Namespace/policy updates** will be blocked temporarily > - ❌ **Bundle ownership changes** will be deferred until migration > completes > > ### Operational Safety > > 1. **No Downtime**: Brokers and bookies remain online throughout the > migration. **Data plane operations (publish/consume) continue without > interruption.** Only metadata operations are temporarily blocked during the > migration phases. > > 2. **Graceful Failure**: If migration fails at any point, phase transitions > to FAILED and cluster returns to source store automatically. > > 3. **Session Events**: Components receive `SessionLost` event during > migration to defer non-critical writes (e.g., ledger rollovers), and > `SessionReestablished` when migration completes or fails. > > 4. **Participant Coordination**: Migration waits for all participants to > complete preparation before copying data. > > ### Consistency > > 1. **Atomic Cutover**: All participants switch to target store > simultaneously when COMPLETED phase is detected. > > 2. **Ephemeral Session Consistency**: Each participant manages its own > ephemeral nodes in target store with proper session management. > > 3. **No Dual-Write Complexity**: By blocking writes during migration, we > avoid complex dual-write error handling and data divergence issues. > > ## Configuration > > ### No Configuration Changes for Migration > > The beauty of this design is that **no configuration changes are needed to > start migration**: > > - Brokers and bookies continue using their existing `metadataStoreUrl` > config > - The `DualMetadataStore` wrapper is automatically applied when using > ZooKeeper > - Target URL is provided only when triggering migration via CLI > > ### Post-Migration Configuration > > After migration completes and validation period ends, update config files: > > ```properties > # Before migration > metadataStoreUrl=zk://zk1:2181,zk2:2181,zk3:2181/pulsar > > # After migration (update and rolling restart) > metadataStoreUrl=oxia://oxia1:6648 > ``` > > ## Comparison with Kafka's ZooKeeper → KRaft Migration > > Apache Kafka faced a similar challenge migrating from ZooKeeper to KRaft > (Kafka Raft). Their approach provides useful comparison points: > > ### Kafka's Approach (KIP-866) > > **Migration Strategy:** > - **Dual-mode operation**: Kafka brokers run in a hybrid mode where the > KRaft controller reads from ZooKeeper > - **Metadata synchronization**: KRaft controller actively mirrors metadata > from ZooKeeper to KRaft > - **Phased cutover**: Operators manually transition from ZK_MIGRATION mode > to KRAFT mode > - **Write forwarding**: During migration, metadata writes go to ZooKeeper > and are replicated to KRaft > > **Timeline:** > - Migration can take hours or days as metadata is continuously synchronized > - Requires careful monitoring of lag between ZooKeeper and KRaft > - Rollback possible until final KRAFT mode is committed > > ### Pulsar's Approach (This PIP) > > **Migration Strategy:** > - **Transparent wrapper**: DualMetadataStore wraps existing store without > broker code changes > - **Read-only migration**: Metadata writes blocked during migration (< 30 > seconds for most clusters) > - **Atomic copy**: All persistent data copied in one operation with version > preservation > - **Single source of truth**: No dual-write or dual-read - metadata always > consistent > - **Automatic cutover**: All participants switch simultaneously when > COMPLETED phase detected > > **Timeline:** > - Migration completes in **< 30 seconds** for typical deployments (even up > to 500 MB metadata) > - No lag monitoring needed > - Automatic rollback on failure (FAILED phase) > > ### Key Differences > > | Aspect | Kafka (ZK → KRaft) | Pulsar (ZK → Oxia) | > |--------|-------------------|-------------------| > | **Migration Duration** | Hours to days | **< 30 seconds** (up to 500 MB) | > | **Metadata Writes** | Continue during migration | Blocked during > migration | > | **Data Plane** | Unaffected | Unaffected (publish/consume continues) | > | **Approach** | Continuous sync + dual-mode | Atomic copy + read-only mode > | > | **Consistency** | Dual-write (eventual consistency) | **Single source of > truth (always consistent)** | > | **Complexity** | High (dual-mode broker logic) | Low (transparent > wrapper) | > | **Version Preservation** | Not applicable (different metadata models) | > Yes (conditional writes preserved) | > | **Rollback** | Manual, complex | Automatic on failure | > | **Monitoring** | Requires lag tracking | Simple phase monitoring | > > ### Why Pulsar's Approach Differs > > 1. **Data Plane Independence**: **The key insight is that Pulsar's data > plane (publish/consume, ledger writes) does not require metadata writes to > function.** This architectural property allows pausing metadata writes for > a brief period (< 30 seconds) without affecting data operations. This is > what makes the migration **provably safe and consistent**, not the metadata > size. > > 2. **Write-Pause Safety**: Pausing writes during copy ensures: > - No dual-write complexity > - No data divergence between stores > - No conflict resolution needed > - Guaranteed consistency > > This works regardless of metadata size - whether 50K nodes or millions > of topics. The migration handles large metadata volumes through high > concurrency (1000 parallel operations), completing in < 30 seconds even for > 500 MB. > > 3. **Ephemeral Node Handling**: Pulsar has significant ephemeral metadata > (broker registrations, bundle ownership), making dual-write complex. > Read-only mode simplifies this. > > 4. **Conditional Writes**: Pulsar relies heavily on compare-and-set > operations. Version preservation ensures these continue working > post-migration, which Kafka doesn't need to address. > > 5. **Architectural Enabler**: Pulsar's separation of data plane and > metadata plane allows brief metadata write pauses without data plane > impact, enabling a simpler, safer migration approach. > > ### Lessons from Kafka's Experience > > Pulsar's design incorporates lessons from Kafka's migration: > > - ✅ **Avoid dual-write complexity**: Kafka found dual-mode operation added > significant code complexity. Pulsar's read-only approach is simpler **and > guarantees consistency**. > - ✅ **Clear phase boundaries**: Kafka's migration has unclear "completion" > point. Pulsar has explicit COMPLETED phase. > - ✅ **Automatic participant coordination**: Kafka requires manual broker > restarts. Pulsar participants coordinate automatically. > - ✅ **Fast migration**: **< 30 seconds** read-only window is acceptable for > most production environments > - ❌ **Brief write unavailability**: Pulsar accepts brief metadata write > unavailability (< 30 sec) vs Kafka's continuous operation, but gains > guaranteed consistency and simplicity. > > > ## References > > - [PIP-45: Pluggable metadata interface]( > https://github.com/apache/pulsar/wiki/PIP-45%3A-Pluggable-metadata-interface > ) > - [Oxia: A Scalable Metadata Store](https://github.com/streamnative/oxia) > - [MetadataStore Interface]( > https://github.com/apache/pulsar/blob/master/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java > ) > - [KIP-866: ZooKeeper to KRaft Migration]( > https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration) > - Kafka's approach to metadata store migration > > ``` > -- > Matteo Merli > <[email protected]> >
