https://github.com/apache/pulsar/pull/25196

```

# PIP-454: Metadata Store Migration Framework

## Motivation

Apache Pulsar currently uses Apache ZooKeeper as its metadata store for
broker coordination, topic metadata, namespace policies, and BookKeeper
ledger management. While ZooKeeper has served well, there are several
motivations for enabling migration to alternative metadata stores:

1. **Operational Simplicity**: Alternative metadata stores like Oxia may
offer simpler operations, better observability, or reduced operational
overhead compared to ZooKeeper ensembles.

2. **Performance Characteristics**: Different metadata stores have
different performance profiles. Some workloads may benefit from stores
optimized for high throughput or low latency.

3. **Deployment Flexibility**: Organizations may prefer metadata stores
that align better with their existing infrastructure and expertise.

4. **Zero-Downtime Migration**: Operators need a safe, automated way to
migrate metadata between stores without service interruption.

Currently, there is no supported path to migrate from one metadata store to
another without cluster downtime. This PIP proposes a **safe, simple
migration framework** that ensures metadata consistency by avoiding complex
dual-write/dual-read patterns. The framework enables:

- **Zero-downtime migration** from any metadata store to any other
supported store
- **Automatic ephemeral node recreation** in the target store
- **Version preservation** to ensure conditional writes continue working
- **Automatic failure recovery** if issues are detected
- **Minimal configuration changes** - no config updates needed until after
migration completes

## Goal

Provide a safe, automated framework for migrating Apache Pulsar's metadata
from one store implementation (e.g., ZooKeeper) to another (e.g., Oxia)
with zero service interruption.

### In Scope

- Migration framework supporting any source → any target metadata store
- Automatic ephemeral node recreation by brokers and bookies
- Persistent data copy with version preservation
- CLI commands for migration control and monitoring
- Automatic failure recovery during migration
- Support for broker and bookie participation
- Read-only mode during migration for consistency

### Out of Scope

- Developing new metadata store implementations (Oxia, Etcd support already
exists)
- Cross-cluster metadata synchronization (different use case)
- Automated rollback after COMPLETED phase (requires manual intervention)
- Migration of configuration metadata store (can be done separately)

## High Level Design

The migration framework introduces a **DualMetadataStore** wrapper that
transparently handles migration without modifying existing metadata store
implementations.

### Key Principles

1. **Transparent Wrapping**: The `DualMetadataStore` wraps the existing
source store (e.g., `ZKMetadataStore`) without modifying its implementation.

2. **Lazy Target Initialization**: The target store is only initialized
when migration begins, triggered by a flag in the source store.

3. **Ephemeral-First Approach**: Before copying persistent data, all
brokers and bookies recreate their ephemeral nodes in the target store.
This ensures the cluster is "live" in both stores during migration.

4. **Read-Only Mode During Migration**: To ensure consistency, all metadata
writes are blocked during PREPARATION and COPYING phases. Components
receive `SessionLost` events to defer non-critical operations (e.g., ledger
rollovers).

5. **Phase-Based Migration**: Migration proceeds through well-defined
phases (PREPARATION → COPYING → COMPLETED).

6. **Generic Framework**: The framework is agnostic to specific store
implementations - it works with any source and target that implement the
`MetadataStore` interface.

7. **Guaranteed Consistency**: By blocking writes during migration and
using atomic copy, metadata is **always in a consistent state**. No
dual-write complexity, no data divergence, no consistency issues.

## Detailed Design

### Migration Phases

```
NOT_STARTED
     ↓
PREPARATION ← All brokers/bookies recreate ephemeral nodes in target
             ← Metadata writes are BLOCKED (read-only mode)
     ↓
COPYING ← Coordinator copies persistent data source → target
         ← Metadata writes still BLOCKED
     ↓
COMPLETED ← Migration complete, all services using target store
          ← Metadata writes ENABLED on target
     ↓
After validation period:
 * Update config and restart brokers & bookies
 * Decommission source store

(If errors occur):
FAILED ← Rollback to source store, writes ENABLED
```

### Phase 1: NOT_STARTED → PREPARATION

**Participant Registration (at startup):**
Each broker and bookie registers itself as a migration participant by
creating a sequential ephemeral node:
- Path: `/pulsar/migration-coordinator/participants/id-NNNN` (sequential)
- This allows the coordinator to know how many participants exist before
migration starts

**Administrator triggers migration:**
```bash
pulsar-admin metadata-migration start --target oxia://oxia1:6648
```

**Coordinator actions:**
1. Creates migration flag in source store:
`/pulsar/migration-coordinator/migration`
   ```json
   {
     "phase": "PREPARATION",
     "targetUrl": "oxia://oxia1:6648"
   }
   ```

**Broker/Bookie actions (automatic, triggered by watching the flag):**
1. Detect migration flag via watch on
`/pulsar/migration-coordinator/migration`
2. Defer non-critical metadata writes (e.g., ledger rollovers, bundle
ownership changes)
3. Initialize connection to target store
4. Recreate ALL ephemeral nodes in target store
5. **Delete** participant registration node to signal "ready"

**Coordinator waits for all participant nodes to be deleted (indicating all
participants are ready)**

### Phase 2: PREPARATION → COPYING

**Coordinator actions:**
1. Updates phase to `COPYING`
2. Performs recursive copy of persistent data from source → target:
   - Skips ephemeral nodes (already recreated)
   - Concurrent operations limited by semaphore (default: 1000 pending ops)
   - Breadth-first traversal to process all paths
   - Progress logged periodically

**During this phase:**
- Brokers/bookies continue normal READ operations
- Metadata WRITES are BLOCKED (return failure)
- Ephemeral nodes remain alive in both stores
- All reads still go to source store

**During this phase:**
- Metadata writes are BLOCKED (return error to clients)
- Metadata reads continue normally from source store
- **Data plane operations unaffected**: Publish/consume, ledger writes
continue normally
- Version-id and modification count preserved using direct Oxia client
- Breadth-first traversal with max 1000 concurrent operations

**Estimated duration:**
- **< 30 seconds** for typical deployments with up to **500 MB of
metadata** in ZooKeeper

**Impact on operations:**
- ✅ Existing topics: Publish and consume continue without interruption
- ✅ BookKeeper: Ledger writes and reads continue normally
- ✅ Clients: Connected producers and consumers unaffected
- ❌ Admin operations: Topic/namespace creation blocked temporarily
- ❌ Bundle operations: Load balancing deferred until completion

### Phase 3: COPYING → COMPLETED

**Coordinator actions:**
1. Updates phase to `COMPLETED`
2. Logs success message with total copied node count

**Broker/Bookie actions (automatic, triggered by phase update):**
1. Detect `COMPLETED` phase
2. Deferred operations can now proceed
3. Switch routing:
   - **Writes**: Go to target store only
   - **Reads**: Go to target store only

**At this point:**
- Cluster is running on target store
- Source store remains available for safety
- Metadata writes are enabled again

**Operator follow-up (after validation period):**
1. Update configuration files:
   ```properties
   # Before (ZooKeeper):
   metadataStoreUrl=zk://zk1:2181,zk2:2181/pulsar

   # After (Oxia):
   metadataStoreUrl=oxia://oxia1:6648
   ```
2. Perform rolling restart with new config
3. After all services restarted, decommission source store

### Failure Handling: ANY_PHASE → FAILED

**If migration fails at any point:**
1. Coordinator updates phase to `FAILED`
2. Broker/Bookie actions:
   - Detect `FAILED` phase
   - Discard target store connection
   - Continue using source store
   - Metadata writes enabled again

**Operator actions:**
1. Review logs to understand failure cause
2. Fix underlying issue
3. Retry migration with `pulsar-admin metadata-migration start --target
<url>`

## Implementation Details


### Key Implementation Details:

1. **Direct Oxia Client Usage**: The coordinator uses `AsyncOxiaClient`
directly instead of going through `MetadataStore` interface. This allows
setting version-id and modification count to match the source values,
ensuring conditional writes (compare-and-set operations) continue to work
correctly after migration.

2. **Breadth-First Traversal**: Processes paths level by level using a work
queue, enabling high concurrency while preventing deep recursion.

3. **Concurrent Operations**: Uses a semaphore to limit pending operations
(default: 1000), balancing throughput with memory usage.

### Data Structures

**Migration State** (`/pulsar/migration-coordinator/migration`):
```json
{
  "phase": "PREPARATION",
  "targetUrl": "oxia://oxia1:6648/default"
}
```

Fields:
- `phase`: Current migration phase (NOT_STARTED, PREPARATION, COPYING,
COMPLETED, FAILED)
- `targetUrl`: Target metadata store URL (e.g., `oxia://oxia1:6648/default`)

**Participant Registration**
(`/pulsar/migration-coordinator/participants/id-NNNN`):
- Sequential ephemeral node created by each broker/bookie at startup
- Empty data (presence indicates participation)
- Deleted by participant when preparation complete (signals "ready")
- Coordinator waits for all to be deleted before proceeding to COPYING phase

**No additional state tracking**: The simplified design removes complex
state tracking and checksums. Migration state is kept minimal.

### CLI Commands

```bash
# Start migration
pulsar-admin metadata-migration start --target <target-url>

# Check status
pulsar-admin metadata-migration status
```

The simplified design only requires two commands. Rollback happens
automatically if migration fails (phase transitions to FAILED).

### REST API

```
POST   /admin/v2/metadata/migration/start
       Body: { "targetUrl": "oxia://..." }

GET    /admin/v2/metadata/migration/status
       Returns: { "phase": "COPYING", "targetUrl": "oxia://..." }
```

## Safety Guarantees

### Why This Approach is Safe

**The migration design guarantees metadata consistency by avoiding
dual-write and dual-read patterns entirely:**

1. **Single Source of Truth**: At any given time, there is exactly ONE
active metadata store:
   - Before migration: Source store (ZooKeeper)
   - During PREPARATION and COPYING: Source store (read-only)
   - After COMPLETED: Target store (Oxia)

2. **No Dual-Write Complexity**: Unlike approaches that write to both
stores simultaneously, this design eliminates:
   - Write synchronization issues
   - Conflict resolution between stores
   - Data divergence problems
   - Partial failure handling complexity

3. **No Dual-Read Complexity**: Unlike approaches that read from both
stores, this design eliminates:
   - Read consistency issues
   - Cache invalidation across stores
   - Stale data problems
   - Complex fallback logic

4. **Atomic Cutover**: All participants switch stores simultaneously when
COMPLETED phase is detected. There is no ambiguous state where some
participants use one store and others use another.

5. **Fast Migration Window**: With **< 30 seconds** for typical metadata
sizes (even up to 500 MB), the read-only window is minimal and acceptable
for most production environments.

**Bottom line**: Metadata is **always in a consistent state** - either
fully in the source store or fully in the target store, never split or
diverged between them.

### Data Integrity

1. **Version Preservation**: All persistent data is copied with original
version-id and modification count preserved. This ensures conditional
writes (compare-and-set operations) continue working after migration.

2. **Ephemeral Node Recreation**: All ephemeral nodes are recreated by
their owning brokers/bookies before persistent data copy begins.

3. **Read-Only Mode**: All metadata writes are blocked during PREPARATION
and COPYING phases, ensuring no data inconsistencies during migration.

   **Important**: Read-only mode only affects metadata operations. Data
plane operations continue normally:
   - ✅ **Publishing and consuming messages** works without interruption
   - ✅ **Reading from existing topics and subscriptions** works normally
   - ✅ **Ledger writes to BookKeeper** continue unaffected
   - ❌ **Creating new topics or subscriptions** will be blocked temporarily
   - ❌ **Namespace/policy updates** will be blocked temporarily
   - ❌ **Bundle ownership changes** will be deferred until migration
completes

### Operational Safety

1. **No Downtime**: Brokers and bookies remain online throughout the
migration. **Data plane operations (publish/consume) continue without
interruption.** Only metadata operations are temporarily blocked during the
migration phases.

2. **Graceful Failure**: If migration fails at any point, phase transitions
to FAILED and cluster returns to source store automatically.

3. **Session Events**: Components receive `SessionLost` event during
migration to defer non-critical writes (e.g., ledger rollovers), and
`SessionReestablished` when migration completes or fails.

4. **Participant Coordination**: Migration waits for all participants to
complete preparation before copying data.

### Consistency

1. **Atomic Cutover**: All participants switch to target store
simultaneously when COMPLETED phase is detected.

2. **Ephemeral Session Consistency**: Each participant manages its own
ephemeral nodes in target store with proper session management.

3. **No Dual-Write Complexity**: By blocking writes during migration, we
avoid complex dual-write error handling and data divergence issues.

## Configuration

### No Configuration Changes for Migration

The beauty of this design is that **no configuration changes are needed to
start migration**:

- Brokers and bookies continue using their existing `metadataStoreUrl`
config
- The `DualMetadataStore` wrapper is automatically applied when using
ZooKeeper
- Target URL is provided only when triggering migration via CLI

### Post-Migration Configuration

After migration completes and validation period ends, update config files:

```properties
# Before migration
metadataStoreUrl=zk://zk1:2181,zk2:2181,zk3:2181/pulsar

# After migration (update and rolling restart)
metadataStoreUrl=oxia://oxia1:6648
```

## Comparison with Kafka's ZooKeeper → KRaft Migration

Apache Kafka faced a similar challenge migrating from ZooKeeper to KRaft
(Kafka Raft). Their approach provides useful comparison points:

### Kafka's Approach (KIP-866)

**Migration Strategy:**
- **Dual-mode operation**: Kafka brokers run in a hybrid mode where the
KRaft controller reads from ZooKeeper
- **Metadata synchronization**: KRaft controller actively mirrors metadata
from ZooKeeper to KRaft
- **Phased cutover**: Operators manually transition from ZK_MIGRATION mode
to KRAFT mode
- **Write forwarding**: During migration, metadata writes go to ZooKeeper
and are replicated to KRaft

**Timeline:**
- Migration can take hours or days as metadata is continuously synchronized
- Requires careful monitoring of lag between ZooKeeper and KRaft
- Rollback possible until final KRAFT mode is committed

### Pulsar's Approach (This PIP)

**Migration Strategy:**
- **Transparent wrapper**: DualMetadataStore wraps existing store without
broker code changes
- **Read-only migration**: Metadata writes blocked during migration (< 30
seconds for most clusters)
- **Atomic copy**: All persistent data copied in one operation with version
preservation
- **Single source of truth**: No dual-write or dual-read - metadata always
consistent
- **Automatic cutover**: All participants switch simultaneously when
COMPLETED phase detected

**Timeline:**
- Migration completes in **< 30 seconds** for typical deployments (even up
to 500 MB metadata)
- No lag monitoring needed
- Automatic rollback on failure (FAILED phase)

### Key Differences

| Aspect | Kafka (ZK → KRaft) | Pulsar (ZK → Oxia) |
|--------|-------------------|-------------------|
| **Migration Duration** | Hours to days | **< 30 seconds** (up to 500 MB) |
| **Metadata Writes** | Continue during migration | Blocked during
migration |
| **Data Plane** | Unaffected | Unaffected (publish/consume continues) |
| **Approach** | Continuous sync + dual-mode | Atomic copy + read-only mode
|
| **Consistency** | Dual-write (eventual consistency) | **Single source of
truth (always consistent)** |
| **Complexity** | High (dual-mode broker logic) | Low (transparent
wrapper) |
| **Version Preservation** | Not applicable (different metadata models) |
Yes (conditional writes preserved) |
| **Rollback** | Manual, complex | Automatic on failure |
| **Monitoring** | Requires lag tracking | Simple phase monitoring |

### Why Pulsar's Approach Differs

1. **Data Plane Independence**: **The key insight is that Pulsar's data
plane (publish/consume, ledger writes) does not require metadata writes to
function.** This architectural property allows pausing metadata writes for
a brief period (< 30 seconds) without affecting data operations. This is
what makes the migration **provably safe and consistent**, not the metadata
size.

2. **Write-Pause Safety**: Pausing writes during copy ensures:
   - No dual-write complexity
   - No data divergence between stores
   - No conflict resolution needed
   - Guaranteed consistency

   This works regardless of metadata size - whether 50K nodes or millions
of topics. The migration handles large metadata volumes through high
concurrency (1000 parallel operations), completing in < 30 seconds even for
500 MB.

3. **Ephemeral Node Handling**: Pulsar has significant ephemeral metadata
(broker registrations, bundle ownership), making dual-write complex.
Read-only mode simplifies this.

4. **Conditional Writes**: Pulsar relies heavily on compare-and-set
operations. Version preservation ensures these continue working
post-migration, which Kafka doesn't need to address.

5. **Architectural Enabler**: Pulsar's separation of data plane and
metadata plane allows brief metadata write pauses without data plane
impact, enabling a simpler, safer migration approach.

### Lessons from Kafka's Experience

Pulsar's design incorporates lessons from Kafka's migration:

- ✅ **Avoid dual-write complexity**: Kafka found dual-mode operation added
significant code complexity. Pulsar's read-only approach is simpler **and
guarantees consistency**.
- ✅ **Clear phase boundaries**: Kafka's migration has unclear "completion"
point. Pulsar has explicit COMPLETED phase.
- ✅ **Automatic participant coordination**: Kafka requires manual broker
restarts. Pulsar participants coordinate automatically.
- ✅ **Fast migration**: **< 30 seconds** read-only window is acceptable for
most production environments
- ❌ **Brief write unavailability**: Pulsar accepts brief metadata write
unavailability (< 30 sec) vs Kafka's continuous operation, but gains
guaranteed consistency and simplicity.


## References

- [PIP-45: Pluggable metadata interface](
https://github.com/apache/pulsar/wiki/PIP-45%3A-Pluggable-metadata-interface
)
- [Oxia: A Scalable Metadata Store](https://github.com/streamnative/oxia)
- [MetadataStore Interface](
https://github.com/apache/pulsar/blob/master/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
)
- [KIP-866: ZooKeeper to KRaft Migration](
https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration)
- Kafka's approach to metadata store migration

```
--
Matteo Merli
<[email protected]>

Reply via email to