Sounds good! I added a few comments and questions on the PR itself.

-Lari

On 2026/01/29 20:04:00 Matteo Merli wrote:
> https://github.com/apache/pulsar/pull/25196
> 
> ```
> 
> # PIP-454: Metadata Store Migration Framework
> 
> ## Motivation
> 
> Apache Pulsar currently uses Apache ZooKeeper as its metadata store for
> broker coordination, topic metadata, namespace policies, and BookKeeper
> ledger management. While ZooKeeper has served well, there are several
> motivations for enabling migration to alternative metadata stores:
> 
> 1. **Operational Simplicity**: Alternative metadata stores like Oxia may
> offer simpler operations, better observability, or reduced operational
> overhead compared to ZooKeeper ensembles.
> 
> 2. **Performance Characteristics**: Different metadata stores have
> different performance profiles. Some workloads may benefit from stores
> optimized for high throughput or low latency.
> 
> 3. **Deployment Flexibility**: Organizations may prefer metadata stores
> that align better with their existing infrastructure and expertise.
> 
> 4. **Zero-Downtime Migration**: Operators need a safe, automated way to
> migrate metadata between stores without service interruption.
> 
> Currently, there is no supported path to migrate from one metadata store to
> another without cluster downtime. This PIP proposes a **safe, simple
> migration framework** that ensures metadata consistency by avoiding complex
> dual-write/dual-read patterns. The framework enables:
> 
> - **Zero-downtime migration** from any metadata store to any other
> supported store
> - **Automatic ephemeral node recreation** in the target store
> - **Version preservation** to ensure conditional writes continue working
> - **Automatic failure recovery** if issues are detected
> - **Minimal configuration changes** - no config updates needed until after
> migration completes
> 
> ## Goal
> 
> Provide a safe, automated framework for migrating Apache Pulsar's metadata
> from one store implementation (e.g., ZooKeeper) to another (e.g., Oxia)
> with zero service interruption.
> 
> ### In Scope
> 
> - Migration framework supporting any source → any target metadata store
> - Automatic ephemeral node recreation by brokers and bookies
> - Persistent data copy with version preservation
> - CLI commands for migration control and monitoring
> - Automatic failure recovery during migration
> - Support for broker and bookie participation
> - Read-only mode during migration for consistency
> 
> ### Out of Scope
> 
> - Developing new metadata store implementations (Oxia, Etcd support already
> exists)
> - Cross-cluster metadata synchronization (different use case)
> - Automated rollback after COMPLETED phase (requires manual intervention)
> - Migration of configuration metadata store (can be done separately)
> 
> ## High Level Design
> 
> The migration framework introduces a **DualMetadataStore** wrapper that
> transparently handles migration without modifying existing metadata store
> implementations.
> 
> ### Key Principles
> 
> 1. **Transparent Wrapping**: The `DualMetadataStore` wraps the existing
> source store (e.g., `ZKMetadataStore`) without modifying its implementation.
> 
> 2. **Lazy Target Initialization**: The target store is only initialized
> when migration begins, triggered by a flag in the source store.
> 
> 3. **Ephemeral-First Approach**: Before copying persistent data, all
> brokers and bookies recreate their ephemeral nodes in the target store.
> This ensures the cluster is "live" in both stores during migration.
> 
> 4. **Read-Only Mode During Migration**: To ensure consistency, all metadata
> writes are blocked during PREPARATION and COPYING phases. Components
> receive `SessionLost` events to defer non-critical operations (e.g., ledger
> rollovers).
> 
> 5. **Phase-Based Migration**: Migration proceeds through well-defined
> phases (PREPARATION → COPYING → COMPLETED).
> 
> 6. **Generic Framework**: The framework is agnostic to specific store
> implementations - it works with any source and target that implement the
> `MetadataStore` interface.
> 
> 7. **Guaranteed Consistency**: By blocking writes during migration and
> using atomic copy, metadata is **always in a consistent state**. No
> dual-write complexity, no data divergence, no consistency issues.
> 
> ## Detailed Design
> 
> ### Migration Phases
> 
> ```
> NOT_STARTED
>      ↓
> PREPARATION ← All brokers/bookies recreate ephemeral nodes in target
>              ← Metadata writes are BLOCKED (read-only mode)
>      ↓
> COPYING ← Coordinator copies persistent data source → target
>          ← Metadata writes still BLOCKED
>      ↓
> COMPLETED ← Migration complete, all services using target store
>           ← Metadata writes ENABLED on target
>      ↓
> After validation period:
>  * Update config and restart brokers & bookies
>  * Decommission source store
> 
> (If errors occur):
> FAILED ← Rollback to source store, writes ENABLED
> ```
> 
> ### Phase 1: NOT_STARTED → PREPARATION
> 
> **Participant Registration (at startup):**
> Each broker and bookie registers itself as a migration participant by
> creating a sequential ephemeral node:
> - Path: `/pulsar/migration-coordinator/participants/id-NNNN` (sequential)
> - This allows the coordinator to know how many participants exist before
> migration starts
> 
> **Administrator triggers migration:**
> ```bash
> pulsar-admin metadata-migration start --target oxia://oxia1:6648
> ```
> 
> **Coordinator actions:**
> 1. Creates migration flag in source store:
> `/pulsar/migration-coordinator/migration`
>    ```json
>    {
>      "phase": "PREPARATION",
>      "targetUrl": "oxia://oxia1:6648"
>    }
>    ```
> 
> **Broker/Bookie actions (automatic, triggered by watching the flag):**
> 1. Detect migration flag via watch on
> `/pulsar/migration-coordinator/migration`
> 2. Defer non-critical metadata writes (e.g., ledger rollovers, bundle
> ownership changes)
> 3. Initialize connection to target store
> 4. Recreate ALL ephemeral nodes in target store
> 5. **Delete** participant registration node to signal "ready"
> 
> **Coordinator waits for all participant nodes to be deleted (indicating all
> participants are ready)**
> 
> ### Phase 2: PREPARATION → COPYING
> 
> **Coordinator actions:**
> 1. Updates phase to `COPYING`
> 2. Performs recursive copy of persistent data from source → target:
>    - Skips ephemeral nodes (already recreated)
>    - Concurrent operations limited by semaphore (default: 1000 pending ops)
>    - Breadth-first traversal to process all paths
>    - Progress logged periodically
> 
> **During this phase:**
> - Brokers/bookies continue normal READ operations
> - Metadata WRITES are BLOCKED (return failure)
> - Ephemeral nodes remain alive in both stores
> - All reads still go to source store
> 
> **During this phase:**
> - Metadata writes are BLOCKED (return error to clients)
> - Metadata reads continue normally from source store
> - **Data plane operations unaffected**: Publish/consume, ledger writes
> continue normally
> - Version-id and modification count preserved using direct Oxia client
> - Breadth-first traversal with max 1000 concurrent operations
> 
> **Estimated duration:**
> - **< 30 seconds** for typical deployments with up to **500 MB of
> metadata** in ZooKeeper
> 
> **Impact on operations:**
> - ✅ Existing topics: Publish and consume continue without interruption
> - ✅ BookKeeper: Ledger writes and reads continue normally
> - ✅ Clients: Connected producers and consumers unaffected
> - ❌ Admin operations: Topic/namespace creation blocked temporarily
> - ❌ Bundle operations: Load balancing deferred until completion
> 
> ### Phase 3: COPYING → COMPLETED
> 
> **Coordinator actions:**
> 1. Updates phase to `COMPLETED`
> 2. Logs success message with total copied node count
> 
> **Broker/Bookie actions (automatic, triggered by phase update):**
> 1. Detect `COMPLETED` phase
> 2. Deferred operations can now proceed
> 3. Switch routing:
>    - **Writes**: Go to target store only
>    - **Reads**: Go to target store only
> 
> **At this point:**
> - Cluster is running on target store
> - Source store remains available for safety
> - Metadata writes are enabled again
> 
> **Operator follow-up (after validation period):**
> 1. Update configuration files:
>    ```properties
>    # Before (ZooKeeper):
>    metadataStoreUrl=zk://zk1:2181,zk2:2181/pulsar
> 
>    # After (Oxia):
>    metadataStoreUrl=oxia://oxia1:6648
>    ```
> 2. Perform rolling restart with new config
> 3. After all services restarted, decommission source store
> 
> ### Failure Handling: ANY_PHASE → FAILED
> 
> **If migration fails at any point:**
> 1. Coordinator updates phase to `FAILED`
> 2. Broker/Bookie actions:
>    - Detect `FAILED` phase
>    - Discard target store connection
>    - Continue using source store
>    - Metadata writes enabled again
> 
> **Operator actions:**
> 1. Review logs to understand failure cause
> 2. Fix underlying issue
> 3. Retry migration with `pulsar-admin metadata-migration start --target
> <url>`
> 
> ## Implementation Details
> 
> 
> ### Key Implementation Details:
> 
> 1. **Direct Oxia Client Usage**: The coordinator uses `AsyncOxiaClient`
> directly instead of going through `MetadataStore` interface. This allows
> setting version-id and modification count to match the source values,
> ensuring conditional writes (compare-and-set operations) continue to work
> correctly after migration.
> 
> 2. **Breadth-First Traversal**: Processes paths level by level using a work
> queue, enabling high concurrency while preventing deep recursion.
> 
> 3. **Concurrent Operations**: Uses a semaphore to limit pending operations
> (default: 1000), balancing throughput with memory usage.
> 
> ### Data Structures
> 
> **Migration State** (`/pulsar/migration-coordinator/migration`):
> ```json
> {
>   "phase": "PREPARATION",
>   "targetUrl": "oxia://oxia1:6648/default"
> }
> ```
> 
> Fields:
> - `phase`: Current migration phase (NOT_STARTED, PREPARATION, COPYING,
> COMPLETED, FAILED)
> - `targetUrl`: Target metadata store URL (e.g., `oxia://oxia1:6648/default`)
> 
> **Participant Registration**
> (`/pulsar/migration-coordinator/participants/id-NNNN`):
> - Sequential ephemeral node created by each broker/bookie at startup
> - Empty data (presence indicates participation)
> - Deleted by participant when preparation complete (signals "ready")
> - Coordinator waits for all to be deleted before proceeding to COPYING phase
> 
> **No additional state tracking**: The simplified design removes complex
> state tracking and checksums. Migration state is kept minimal.
> 
> ### CLI Commands
> 
> ```bash
> # Start migration
> pulsar-admin metadata-migration start --target <target-url>
> 
> # Check status
> pulsar-admin metadata-migration status
> ```
> 
> The simplified design only requires two commands. Rollback happens
> automatically if migration fails (phase transitions to FAILED).
> 
> ### REST API
> 
> ```
> POST   /admin/v2/metadata/migration/start
>        Body: { "targetUrl": "oxia://..." }
> 
> GET    /admin/v2/metadata/migration/status
>        Returns: { "phase": "COPYING", "targetUrl": "oxia://..." }
> ```
> 
> ## Safety Guarantees
> 
> ### Why This Approach is Safe
> 
> **The migration design guarantees metadata consistency by avoiding
> dual-write and dual-read patterns entirely:**
> 
> 1. **Single Source of Truth**: At any given time, there is exactly ONE
> active metadata store:
>    - Before migration: Source store (ZooKeeper)
>    - During PREPARATION and COPYING: Source store (read-only)
>    - After COMPLETED: Target store (Oxia)
> 
> 2. **No Dual-Write Complexity**: Unlike approaches that write to both
> stores simultaneously, this design eliminates:
>    - Write synchronization issues
>    - Conflict resolution between stores
>    - Data divergence problems
>    - Partial failure handling complexity
> 
> 3. **No Dual-Read Complexity**: Unlike approaches that read from both
> stores, this design eliminates:
>    - Read consistency issues
>    - Cache invalidation across stores
>    - Stale data problems
>    - Complex fallback logic
> 
> 4. **Atomic Cutover**: All participants switch stores simultaneously when
> COMPLETED phase is detected. There is no ambiguous state where some
> participants use one store and others use another.
> 
> 5. **Fast Migration Window**: With **< 30 seconds** for typical metadata
> sizes (even up to 500 MB), the read-only window is minimal and acceptable
> for most production environments.
> 
> **Bottom line**: Metadata is **always in a consistent state** - either
> fully in the source store or fully in the target store, never split or
> diverged between them.
> 
> ### Data Integrity
> 
> 1. **Version Preservation**: All persistent data is copied with original
> version-id and modification count preserved. This ensures conditional
> writes (compare-and-set operations) continue working after migration.
> 
> 2. **Ephemeral Node Recreation**: All ephemeral nodes are recreated by
> their owning brokers/bookies before persistent data copy begins.
> 
> 3. **Read-Only Mode**: All metadata writes are blocked during PREPARATION
> and COPYING phases, ensuring no data inconsistencies during migration.
> 
>    **Important**: Read-only mode only affects metadata operations. Data
> plane operations continue normally:
>    - ✅ **Publishing and consuming messages** works without interruption
>    - ✅ **Reading from existing topics and subscriptions** works normally
>    - ✅ **Ledger writes to BookKeeper** continue unaffected
>    - ❌ **Creating new topics or subscriptions** will be blocked temporarily
>    - ❌ **Namespace/policy updates** will be blocked temporarily
>    - ❌ **Bundle ownership changes** will be deferred until migration
> completes
> 
> ### Operational Safety
> 
> 1. **No Downtime**: Brokers and bookies remain online throughout the
> migration. **Data plane operations (publish/consume) continue without
> interruption.** Only metadata operations are temporarily blocked during the
> migration phases.
> 
> 2. **Graceful Failure**: If migration fails at any point, phase transitions
> to FAILED and cluster returns to source store automatically.
> 
> 3. **Session Events**: Components receive `SessionLost` event during
> migration to defer non-critical writes (e.g., ledger rollovers), and
> `SessionReestablished` when migration completes or fails.
> 
> 4. **Participant Coordination**: Migration waits for all participants to
> complete preparation before copying data.
> 
> ### Consistency
> 
> 1. **Atomic Cutover**: All participants switch to target store
> simultaneously when COMPLETED phase is detected.
> 
> 2. **Ephemeral Session Consistency**: Each participant manages its own
> ephemeral nodes in target store with proper session management.
> 
> 3. **No Dual-Write Complexity**: By blocking writes during migration, we
> avoid complex dual-write error handling and data divergence issues.
> 
> ## Configuration
> 
> ### No Configuration Changes for Migration
> 
> The beauty of this design is that **no configuration changes are needed to
> start migration**:
> 
> - Brokers and bookies continue using their existing `metadataStoreUrl`
> config
> - The `DualMetadataStore` wrapper is automatically applied when using
> ZooKeeper
> - Target URL is provided only when triggering migration via CLI
> 
> ### Post-Migration Configuration
> 
> After migration completes and validation period ends, update config files:
> 
> ```properties
> # Before migration
> metadataStoreUrl=zk://zk1:2181,zk2:2181,zk3:2181/pulsar
> 
> # After migration (update and rolling restart)
> metadataStoreUrl=oxia://oxia1:6648
> ```
> 
> ## Comparison with Kafka's ZooKeeper → KRaft Migration
> 
> Apache Kafka faced a similar challenge migrating from ZooKeeper to KRaft
> (Kafka Raft). Their approach provides useful comparison points:
> 
> ### Kafka's Approach (KIP-866)
> 
> **Migration Strategy:**
> - **Dual-mode operation**: Kafka brokers run in a hybrid mode where the
> KRaft controller reads from ZooKeeper
> - **Metadata synchronization**: KRaft controller actively mirrors metadata
> from ZooKeeper to KRaft
> - **Phased cutover**: Operators manually transition from ZK_MIGRATION mode
> to KRAFT mode
> - **Write forwarding**: During migration, metadata writes go to ZooKeeper
> and are replicated to KRaft
> 
> **Timeline:**
> - Migration can take hours or days as metadata is continuously synchronized
> - Requires careful monitoring of lag between ZooKeeper and KRaft
> - Rollback possible until final KRAFT mode is committed
> 
> ### Pulsar's Approach (This PIP)
> 
> **Migration Strategy:**
> - **Transparent wrapper**: DualMetadataStore wraps existing store without
> broker code changes
> - **Read-only migration**: Metadata writes blocked during migration (< 30
> seconds for most clusters)
> - **Atomic copy**: All persistent data copied in one operation with version
> preservation
> - **Single source of truth**: No dual-write or dual-read - metadata always
> consistent
> - **Automatic cutover**: All participants switch simultaneously when
> COMPLETED phase detected
> 
> **Timeline:**
> - Migration completes in **< 30 seconds** for typical deployments (even up
> to 500 MB metadata)
> - No lag monitoring needed
> - Automatic rollback on failure (FAILED phase)
> 
> ### Key Differences
> 
> | Aspect | Kafka (ZK → KRaft) | Pulsar (ZK → Oxia) |
> |--------|-------------------|-------------------|
> | **Migration Duration** | Hours to days | **< 30 seconds** (up to 500 MB) |
> | **Metadata Writes** | Continue during migration | Blocked during
> migration |
> | **Data Plane** | Unaffected | Unaffected (publish/consume continues) |
> | **Approach** | Continuous sync + dual-mode | Atomic copy + read-only mode
> |
> | **Consistency** | Dual-write (eventual consistency) | **Single source of
> truth (always consistent)** |
> | **Complexity** | High (dual-mode broker logic) | Low (transparent
> wrapper) |
> | **Version Preservation** | Not applicable (different metadata models) |
> Yes (conditional writes preserved) |
> | **Rollback** | Manual, complex | Automatic on failure |
> | **Monitoring** | Requires lag tracking | Simple phase monitoring |
> 
> ### Why Pulsar's Approach Differs
> 
> 1. **Data Plane Independence**: **The key insight is that Pulsar's data
> plane (publish/consume, ledger writes) does not require metadata writes to
> function.** This architectural property allows pausing metadata writes for
> a brief period (< 30 seconds) without affecting data operations. This is
> what makes the migration **provably safe and consistent**, not the metadata
> size.
> 
> 2. **Write-Pause Safety**: Pausing writes during copy ensures:
>    - No dual-write complexity
>    - No data divergence between stores
>    - No conflict resolution needed
>    - Guaranteed consistency
> 
>    This works regardless of metadata size - whether 50K nodes or millions
> of topics. The migration handles large metadata volumes through high
> concurrency (1000 parallel operations), completing in < 30 seconds even for
> 500 MB.
> 
> 3. **Ephemeral Node Handling**: Pulsar has significant ephemeral metadata
> (broker registrations, bundle ownership), making dual-write complex.
> Read-only mode simplifies this.
> 
> 4. **Conditional Writes**: Pulsar relies heavily on compare-and-set
> operations. Version preservation ensures these continue working
> post-migration, which Kafka doesn't need to address.
> 
> 5. **Architectural Enabler**: Pulsar's separation of data plane and
> metadata plane allows brief metadata write pauses without data plane
> impact, enabling a simpler, safer migration approach.
> 
> ### Lessons from Kafka's Experience
> 
> Pulsar's design incorporates lessons from Kafka's migration:
> 
> - ✅ **Avoid dual-write complexity**: Kafka found dual-mode operation added
> significant code complexity. Pulsar's read-only approach is simpler **and
> guarantees consistency**.
> - ✅ **Clear phase boundaries**: Kafka's migration has unclear "completion"
> point. Pulsar has explicit COMPLETED phase.
> - ✅ **Automatic participant coordination**: Kafka requires manual broker
> restarts. Pulsar participants coordinate automatically.
> - ✅ **Fast migration**: **< 30 seconds** read-only window is acceptable for
> most production environments
> - ❌ **Brief write unavailability**: Pulsar accepts brief metadata write
> unavailability (< 30 sec) vs Kafka's continuous operation, but gains
> guaranteed consistency and simplicity.
> 
> 
> ## References
> 
> - [PIP-45: Pluggable metadata interface](
> https://github.com/apache/pulsar/wiki/PIP-45%3A-Pluggable-metadata-interface
> )
> - [Oxia: A Scalable Metadata Store](https://github.com/streamnative/oxia)
> - [MetadataStore Interface](
> https://github.com/apache/pulsar/blob/master/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
> )
> - [KIP-866: ZooKeeper to KRaft Migration](
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration)
> - Kafka's approach to metadata store migration
> 
> ```
> --
> Matteo Merli
> <[email protected]>
> 

Reply via email to