This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 731ec8364f0 [improve][broker][PIP-384] Decouple Bookkeeper client from 
ManagedLedgerStorage and enable multiple ManagedLedgerFactory instances (#23313)
731ec8364f0 is described below

commit 731ec8364f050e3db1532ec8316cf76109865e3d
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Oct 8 04:48:09 2024 +0300

    [improve][broker][PIP-384] Decouple Bookkeeper client from 
ManagedLedgerStorage and enable multiple ManagedLedgerFactory instances (#23313)
---
 .../bookkeeper/mledger/ManagedLedgerConfig.java    |   4 +-
 .../pulsar/broker/ManagedLedgerClientFactory.java  |  50 +++++-
 .../org/apache/pulsar/broker/PulsarService.java    |  28 ++--
 .../broker/TransactionMetadataStoreService.java    |   4 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   |   2 +-
 .../broker/admin/impl/PersistentTopicsBase.java    |  36 +++--
 .../pulsar/broker/service/BrokerService.java       |  87 +++++++---
 .../apache/pulsar/broker/service/ServerCnx.java    |  95 ++++++-----
 .../broker/service/persistent/PersistentTopic.java |  44 ++++--
 .../broker/stats/metrics/AbstractMetrics.java      |   4 +-
 .../broker/stats/metrics/ManagedLedgerMetrics.java |   3 +-
 .../prometheus/PrometheusMetricsGenerator.java     |  21 ++-
 .../BookkeeperManagedLedgerStorageClass.java       |  42 +++++
 .../broker/storage/ManagedLedgerStorage.java       |  36 +++--
 .../broker/storage/ManagedLedgerStorageClass.java  |  45 ++++++
 .../SnapshotSegmentAbortedTxnProcessorImpl.java    |   9 +-
 .../pendingack/impl/MLPendingAckStoreProvider.java | 175 +++++++++++++--------
 .../pulsar/broker/admin/AdminApiOffloadTest.java   |   2 +-
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  18 +--
 .../pulsar/broker/admin/PersistentTopicsTest.java  |   2 +-
 .../broker/service/BrokerBkEnsemblesTests.java     |   6 +-
 .../broker/service/BrokerBookieIsolationTest.java  |   6 +-
 .../pulsar/broker/service/BrokerServiceTest.java   |   7 +-
 .../PersistentDispatcherFailoverConsumerTest.java  |   6 +-
 .../service/PersistentTopicConcurrentTest.java     |   2 +-
 .../broker/service/PersistentTopicE2ETest.java     |   4 +-
 .../pulsar/broker/service/PersistentTopicTest.java |  15 +-
 .../pulsar/broker/service/ReplicationTxnTest.java  |   4 +-
 .../pulsar/broker/service/ReplicatorTest.java      |   3 +-
 .../pulsar/broker/service/ServerCnxTest.java       |  22 +--
 .../service/TransactionMarkerDeleteTest.java       |   2 +-
 .../broker/stats/ManagedLedgerMetricsTest.java     |   4 +-
 .../testcontext/NonStartableTestPulsarService.java |   2 +-
 .../broker/testcontext/PulsarTestContext.java      |  87 +++++++---
 .../pulsar/broker/testcontext/SpyConfig.java       |   3 +
 .../testcontext/StartableTestPulsarService.java    |  21 +++
 .../TopicTransactionBufferRecoverTest.java         |   2 +-
 .../broker/transaction/TransactionProduceTest.java |   2 +-
 .../pulsar/broker/transaction/TransactionTest.java |   2 +-
 .../pendingack/PendingAckPersistentTest.java       |  17 +-
 .../client/api/OrphanPersistentTopicTest.java      |   2 +-
 .../client/api/SimpleProducerConsumerTest.java     |  39 +++--
 .../api/SubscriptionPauseOnAckStatPersistTest.java |   2 +-
 .../client/api/v1/V1_ProducerConsumerTest.java     |   2 +-
 .../client/impl/SequenceIdWithErrorTest.java       |   2 +-
 .../apache/pulsar/compaction/CompactionTest.java   |   4 +-
 .../common/policies/data/PersistencePolicies.java  |  20 ++-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |   8 +-
 .../apache/pulsar/admin/cli/CmdTopicPolicies.java  |   8 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |   8 +-
 50 files changed, 701 insertions(+), 318 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 7b28990f355..a1e1deb503e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -87,7 +87,9 @@ public class ManagedLedgerConfig {
     private int minimumBacklogEntriesForCaching = 1000;
     private int maxBacklogBetweenCursorsForCaching = 1000;
     private boolean triggerOffloadOnTopicLoad = false;
-
+    @Getter
+    @Setter
+    private String storageClassName;
     @Getter
     @Setter
     private String shadowSourceName;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 9bbc2857863..737bc69bf24 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -24,6 +24,8 @@ import com.google.common.annotations.VisibleForTesting;
 import io.netty.channel.EventLoopGroup;
 import io.opentelemetry.api.OpenTelemetry;
 import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -39,16 +41,18 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import 
org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
+import org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
-
     private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerClientFactory.class);
-
+    private static final String DEFAULT_STORAGE_CLASS_NAME = "bookkeeper";
+    private BookkeeperManagedLedgerStorageClass defaultStorageClass;
     private ManagedLedgerFactory managedLedgerFactory;
     private BookKeeper defaultBkClient;
     private final AsyncCache<EnsemblePlacementPolicyConfig, BookKeeper>
@@ -119,20 +123,50 @@ public class ManagedLedgerClientFactory implements 
ManagedLedgerStorage {
             defaultBkClient.close();
             throw e;
         }
+
+        defaultStorageClass = new BookkeeperManagedLedgerStorageClass() {
+            @Override
+            public String getName() {
+                return DEFAULT_STORAGE_CLASS_NAME;
+            }
+
+            @Override
+            public ManagedLedgerFactory getManagedLedgerFactory() {
+                return managedLedgerFactory;
+            }
+
+            @Override
+            public StatsProvider getStatsProvider() {
+                return statsProvider;
+            }
+
+            @Override
+            public BookKeeper getBookKeeperClient() {
+                return defaultBkClient;
+            }
+        };
     }
 
-    public ManagedLedgerFactory getManagedLedgerFactory() {
-        return managedLedgerFactory;
+    @Override
+    public Collection<ManagedLedgerStorageClass> getStorageClasses() {
+        return List.of(getDefaultStorageClass());
     }
 
-    public BookKeeper getBookKeeperClient() {
-        return defaultBkClient;
+    @Override
+    public Optional<ManagedLedgerStorageClass> 
getManagedLedgerStorageClass(String name) {
+        if (name == null || DEFAULT_STORAGE_CLASS_NAME.equals(name)) {
+            return Optional.of(getDefaultStorageClass());
+        } else {
+            return Optional.empty();
+        }
     }
 
-    public StatsProvider getStatsProvider() {
-        return statsProvider;
+    @Override
+    public ManagedLedgerStorageClass getDefaultStorageClass() {
+        return defaultStorageClass;
     }
 
+
     @VisibleForTesting
     public Map<EnsemblePlacementPolicyConfig, BookKeeper> 
getBkEnsemblePolicyToBookKeeperMap() {
         return bkEnsemblePolicyToBkClientMap.synchronous().asMap();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 6c768a07897..dcc0e961275 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -124,7 +124,9 @@ import 
org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
 import 
org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
+import org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
 import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
@@ -210,7 +212,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private static final int DEFAULT_MONOTONIC_CLOCK_GRANULARITY_MILLIS = 8;
     private final ServiceConfiguration config;
     private NamespaceService nsService = null;
-    private ManagedLedgerStorage managedLedgerClientFactory = null;
+    private ManagedLedgerStorage managedLedgerStorage = null;
     private LeaderElectionService leaderElectionService = null;
     private BrokerService brokerService = null;
     private WebService webService = null;
@@ -606,13 +608,13 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 this.brokerService = null;
             }
 
-            if (this.managedLedgerClientFactory != null) {
+            if (this.managedLedgerStorage != null) {
                 try {
-                    this.managedLedgerClientFactory.close();
+                    this.managedLedgerStorage.close();
                 } catch (Exception e) {
                     LOG.warn("ManagedLedgerClientFactory closing failed {}", 
e.getMessage());
                 }
-                this.managedLedgerClientFactory = null;
+                this.managedLedgerStorage = null;
             }
 
             if (bkClientFactory != null) {
@@ -899,7 +901,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             // Now we are ready to start services
             this.bkClientFactory = newBookKeeperClientFactory();
 
-            managedLedgerClientFactory = newManagedLedgerClientFactory();
+            managedLedgerStorage = newManagedLedgerStorage();
 
             this.brokerService = newBrokerService(this);
 
@@ -1122,7 +1124,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     }
 
     @VisibleForTesting
-    protected ManagedLedgerStorage newManagedLedgerClientFactory() throws 
Exception {
+    protected ManagedLedgerStorage newManagedLedgerStorage() throws Exception {
         return ManagedLedgerStorage.create(
                 config, localMetadataStore,
                 bkClientFactory, ioEventLoopGroup, 
openTelemetry.getOpenTelemetryService().getOpenTelemetry()
@@ -1348,7 +1350,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             long resourceQuotaUpdateInterval = TimeUnit.MINUTES
                     
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
             loadSheddingTask = new LoadSheddingTask(loadManager, 
loadManagerExecutor,
-                    config, getManagedLedgerFactory());
+                    config, getDefaultManagedLedgerFactory());
             loadSheddingTask.start();
             loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
                     new LoadResourceQuotaUpdaterTask(loadManager), 
resourceQuotaUpdateInterval,
@@ -1535,11 +1537,17 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     }
 
     public BookKeeper getBookKeeperClient() {
-        return getManagedLedgerClientFactory().getBookKeeperClient();
+        ManagedLedgerStorageClass defaultStorageClass = 
getManagedLedgerStorage().getDefaultStorageClass();
+        if (defaultStorageClass instanceof BookkeeperManagedLedgerStorageClass 
bkStorageClass) {
+            return bkStorageClass.getBookKeeperClient();
+        } else {
+            // TODO: Refactor code to support other than default bookkeeper 
based storage class
+            throw new UnsupportedOperationException("BookKeeper client is not 
available");
+        }
     }
 
-    public ManagedLedgerFactory getManagedLedgerFactory() {
-        return getManagedLedgerClientFactory().getManagedLedgerFactory();
+    public ManagedLedgerFactory getDefaultManagedLedgerFactory() {
+        return 
getManagedLedgerStorage().getDefaultStorageClass().getManagedLedgerFactory();
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index c80580b02f1..bd19a8e8602 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -227,7 +227,9 @@ public class TransactionMetadataStoreService {
                 
.setBatchedWriteMaxDelayInMillis(serviceConfiguration.getTransactionLogBatchedWriteMaxDelayInMillis());
 
         return 
pulsarService.getBrokerService().getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(
-                v -> transactionMetadataStoreProvider.openStore(tcId, 
pulsarService.getManagedLedgerFactory(), v,
+                v -> transactionMetadataStoreProvider.openStore(tcId,
+                        
pulsarService.getManagedLedgerStorage().getManagedLedgerStorageClass(v.getStorageClassName())
+                                .get().getManagedLedgerFactory(), v,
                         timeoutTracker, recoverTracker,
                         
pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator(), 
txnLogBufferedWriterConfig,
                         brokerClientSharedTimer));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4d26fe2a4c3..18c80d6bef4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2540,7 +2540,7 @@ public abstract class NamespacesBase extends 
AdminResource {
         String localClusterName = pulsar().getConfiguration().getClusterName();
 
         
OffloaderObjectsScannerUtils.scanOffloadedLedgers(managedLedgerOffloader,
-                localClusterName, pulsar().getManagedLedgerFactory(), sink);
+                localClusterName, pulsar().getDefaultManagedLedgerFactory(), 
sink);
 
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 8860c9bb06d..6070093cc35 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1405,19 +1405,27 @@ public class PersistentTopicsBase extends AdminResource 
{
         validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)
                 .thenAccept(__ -> {
                     String managedLedger = 
topicName.getPersistenceNamingEncoding();
-                    pulsar().getManagedLedgerFactory()
-                            .asyncGetManagedLedgerInfo(managedLedger, new 
ManagedLedgerInfoCallback() {
-                        @Override
-                        public void getInfoComplete(ManagedLedgerInfo info, 
Object ctx) {
-                            asyncResponse.resume((StreamingOutput) output -> {
-                                objectWriter().writeValue(output, info);
+                    
pulsar().getBrokerService().getManagedLedgerFactoryForTopic(topicName)
+                            .thenAccept(managedLedgerFactory -> {
+                                
managedLedgerFactory.asyncGetManagedLedgerInfo(managedLedger,
+                                        new ManagedLedgerInfoCallback() {
+                                            @Override
+                                            public void 
getInfoComplete(ManagedLedgerInfo info, Object ctx) {
+                                                
asyncResponse.resume((StreamingOutput) output -> {
+                                                    
objectWriter().writeValue(output, info);
+                                                });
+                                            }
+
+                                            @Override
+                                            public void 
getInfoFailed(ManagedLedgerException exception, Object ctx) {
+                                                
asyncResponse.resume(exception);
+                                            }
+                                        }, null);
+                            })
+                            .exceptionally(ex -> {
+                                
resumeAsyncResponseExceptionally(asyncResponse, ex);
+                                return null;
                             });
-                        }
-                        @Override
-                        public void getInfoFailed(ManagedLedgerException 
exception, Object ctx) {
-                            asyncResponse.resume(exception);
-                        }
-                    }, null);
                 }).exceptionally(ex -> {
                     log.error("[{}] Failed to get managed info for {}", 
clientAppId(), topicName, ex);
                     resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -3174,7 +3182,9 @@ public class PersistentTopicsBase extends AdminResource {
                                 try {
                                     PersistentOfflineTopicStats 
estimateOfflineTopicStats =
                                             
offlineTopicBacklog.estimateUnloadedTopicBacklog(
-                                                    
pulsar().getManagedLedgerFactory(),
+                                                    pulsar().getBrokerService()
+                                                            
.getManagedLedgerFactoryForTopic(topicName,
+                                                                    
config.getStorageClassName()),
                                                     topicName);
                                     pulsar().getBrokerService()
                                             .cacheOfflineTopicStats(topicName, 
estimateOfflineTopicStats);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c240c758dcd..ed0cdf18b47 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -64,6 +64,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
@@ -136,6 +137,8 @@ import 
org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
 import org.apache.pulsar.broker.validator.BindAddressValidator;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
@@ -216,7 +219,7 @@ public class BrokerService implements Closeable {
             .register();
 
     private final PulsarService pulsar;
-    private final ManagedLedgerFactory managedLedgerFactory;
+    private final ManagedLedgerStorage managedLedgerStorage;
 
     private final Map<String, CompletableFuture<Optional<Topic>>> topics = new 
ConcurrentHashMap<>();
 
@@ -335,7 +338,7 @@ public class BrokerService implements Closeable {
         this.brokerPublishRateLimiter = new 
PublishRateLimiterImpl(pulsar.getMonotonicSnapshotClock());
         this.preciseTopicPublishRateLimitingEnable =
                 
pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
-        this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
+        this.managedLedgerStorage = pulsar.getManagedLedgerStorage();
         this.keepAliveIntervalSeconds = 
pulsar.getConfiguration().getKeepAliveIntervalSeconds();
         this.pendingTopicLoadingQueue = Queues.newConcurrentLinkedQueue();
         this.pulsarStats = new PulsarStats(pulsar);
@@ -1241,23 +1244,51 @@ public class BrokerService implements Closeable {
                 return;
             }
             CompletableFuture<ManagedLedgerConfig> mlConfigFuture = 
getManagedLedgerConfig(topicName);
-            managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(),
-                mlConfigFuture, new DeleteLedgerCallback() {
-                    @Override
-                    public void deleteLedgerComplete(Object ctx) {
-                        future.complete(null);
-                    }
+            mlConfigFuture.thenAccept(config -> {
+                getManagedLedgerFactoryForTopic(topicName, 
config.getStorageClassName())
+                        .asyncDelete(tn.getPersistenceNamingEncoding(),
+                                mlConfigFuture, new DeleteLedgerCallback() {
+                                    @Override
+                                    public void deleteLedgerComplete(Object 
ctx) {
+                                        future.complete(null);
+                                    }
 
-                    @Override
-                    public void deleteLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
-                        future.completeExceptionally(exception);
-                    }
-                }, null);
+                                    @Override
+                                    public void 
deleteLedgerFailed(ManagedLedgerException exception,
+                                                                   Object ctx) 
{
+                                        
future.completeExceptionally(exception);
+                                    }
+                                }, null);
+            }).exceptionally(ex1 -> {
+                log.error("Failed to get managed ledger config for topic {}", 
topic, ex1);
+                future.completeExceptionally(ex1);
+                return null;
+            });
          });
 
         return future;
     }
 
+    public CompletableFuture<ManagedLedgerFactory> 
getManagedLedgerFactoryForTopic(TopicName topicName) {
+        return getManagedLedgerConfig(topicName)
+                .thenApply(config -> {
+                    String storageClassName = config.getStorageClassName();
+                    return getManagedLedgerFactoryForTopic(topicName, 
storageClassName);
+                });
+    }
+
+    public ManagedLedgerFactory getManagedLedgerFactoryForTopic(TopicName 
topicName, String storageClassName) {
+        Optional<ManagedLedgerStorageClass> managedLedgerStorageClass =
+                
managedLedgerStorage.getManagedLedgerStorageClass(storageClassName);
+        if (!managedLedgerStorageClass.isPresent()) {
+            throw new CompletionException(new ManagedLedgerException(
+                    "ManagedLedgerStorageClass " + storageClassName + " not 
found for topic " + topicName));
+        }
+        return managedLedgerStorageClass
+                .get()
+                .getManagedLedgerFactory();
+    }
+
     public void deleteTopicAuthenticationWithRetry(String topic, 
CompletableFuture<Void> future, int count) {
         if (count == 0) {
             log.error("The number of retries has exhausted for topic {}", 
topic);
@@ -1624,14 +1655,17 @@ public class BrokerService implements Closeable {
     @VisibleForTesting
     protected CompletableFuture<Map<String, String>> 
fetchTopicPropertiesAsync(TopicName topicName) {
         if (!topicName.isPartitioned()) {
-            return 
managedLedgerFactory.getManagedLedgerPropertiesAsync(topicName.getPersistenceNamingEncoding());
+            return getManagedLedgerFactoryForTopic(topicName).thenCompose(
+                    managedLedgerFactory -> 
managedLedgerFactory.getManagedLedgerPropertiesAsync(
+                            topicName.getPersistenceNamingEncoding()));
         } else {
             TopicName partitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
             return fetchPartitionedTopicMetadataAsync(partitionedTopicName)
                     .thenCompose(metadata -> {
                         if (metadata.partitions == 
PartitionedTopicMetadata.NON_PARTITIONED) {
-                            return 
managedLedgerFactory.getManagedLedgerPropertiesAsync(
-                                    topicName.getPersistenceNamingEncoding());
+                            return 
getManagedLedgerFactoryForTopic(topicName).thenCompose(
+                                    managedLedgerFactory -> 
managedLedgerFactory.getManagedLedgerPropertiesAsync(
+                                            
topicName.getPersistenceNamingEncoding()));
                         } else {
                             // Check if the partitioned topic is a ShadowTopic
                             if (MapUtils.getString(metadata.properties, 
PROPERTY_SOURCE_TOPIC_KEY) != null) {
@@ -1756,6 +1790,8 @@ public class BrokerService implements Closeable {
             topicEventsDispatcher.notifyOnCompletion(loadFuture, topic, 
TopicEvent.LOAD);
 
             // Once we have the configuration, we can proceed with the async 
open operation
+            ManagedLedgerFactory managedLedgerFactory =
+                    getManagedLedgerFactoryForTopic(topicName, 
managedLedgerConfig.getStorageClassName());
             
managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), 
managedLedgerConfig,
                     new OpenLedgerCallback() {
                         @Override
@@ -1918,6 +1954,7 @@ public class BrokerService implements Closeable {
             
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
             
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
             
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+            
managedLedgerConfig.setStorageClassName(persistencePolicies.getManagedLedgerStorageClassName());
 
             if (serviceConfig.isStrictBookieAffinityEnabled()) {
                 
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
@@ -2745,25 +2782,29 @@ public class BrokerService implements Closeable {
             });
         });
 
+
+        ManagedLedgerFactory defaultManagedLedgerFactory =
+                
managedLedgerStorage.getDefaultStorageClass().getManagedLedgerFactory();
+
         //  add listener to notify broker managedLedgerCacheSizeMB dynamic 
config
         registerConfigurationListener("managedLedgerCacheSizeMB", 
(managedLedgerCacheSizeMB) -> {
-            managedLedgerFactory.getEntryCacheManager()
+            defaultManagedLedgerFactory.getEntryCacheManager()
                     .updateCacheSizeAndThreshold(((int) 
managedLedgerCacheSizeMB) * 1024L * 1024L);
         });
 
         //  add listener to notify broker managedLedgerCacheEvictionWatermark 
dynamic config
         registerConfigurationListener(
                 "managedLedgerCacheEvictionWatermark", 
(cacheEvictionWatermark) -> {
-            managedLedgerFactory.getEntryCacheManager()
-                    .updateCacheEvictionWatermark((double) 
cacheEvictionWatermark);
-        });
+                    defaultManagedLedgerFactory.getEntryCacheManager()
+                            .updateCacheEvictionWatermark((double) 
cacheEvictionWatermark);
+                });
 
         //  add listener to notify broker 
managedLedgerCacheEvictionTimeThresholdMillis dynamic config
         registerConfigurationListener(
                 "managedLedgerCacheEvictionTimeThresholdMillis", 
(cacheEvictionTimeThresholdMills) -> {
-            managedLedgerFactory.updateCacheEvictionTimeThreshold(MILLISECONDS
-                    .toNanos((long) cacheEvictionTimeThresholdMills));
-        });
+                    
defaultManagedLedgerFactory.updateCacheEvictionTimeThreshold(MILLISECONDS
+                            .toNanos((long) cacheEvictionTimeThresholdMills));
+                });
 
 
         // add listener to update message-dispatch-rate in msg for topic
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index aedd68d416f..37b431e8339 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2902,7 +2902,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             log.debug("[{}] handleEndTxnOnPartition txnId: [{}], txnAction: 
[{}]", topic,
                     txnID, txnAction);
         }
-        CompletableFuture<Optional<Topic>> topicFuture = 
service.getTopicIfExists(TopicName.get(topic).toString());
+        TopicName topicName = TopicName.get(topic);
+        CompletableFuture<Optional<Topic>> topicFuture = 
service.getTopicIfExists(topicName.toString());
         topicFuture.thenAcceptAsync(optionalTopic -> {
             if (optionalTopic.isPresent()) {
                 // we only accept superuser because this endpoint is reserved 
for tc to broker communication
@@ -2928,24 +2929,29 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                     txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                         });
             } else {
-                getBrokerService().getManagedLedgerFactory()
-                        
.asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
-                        .thenAccept((b) -> {
-                            if (b) {
-                                log.error("handleEndTxnOnPartition fail ! The 
topic {} does not exist in broker, "
-                                                + "txnId: [{}], txnAction: 
[{}]", topic,
-                                        txnID, TxnAction.valueOf(txnAction));
-                                
writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
-                                        ServerError.ServiceNotReady,
-                                        "The topic " + topic + " does not 
exist in broker.",
-                                        txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
-                            } else {
-                                log.warn("handleEndTxnOnPartition fail ! The 
topic {} has not been created, "
-                                                + "txnId: [{}], txnAction: 
[{}]",
-                                        topic, txnID, 
TxnAction.valueOf(txnAction));
-                                
writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
-                                        txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
-                            }
+                getBrokerService().getManagedLedgerFactoryForTopic(topicName)
+                        .thenCompose(managedLedgerFactory -> {
+                            return 
managedLedgerFactory.asyncExists(topicName.getPersistenceNamingEncoding())
+                                    .thenAccept((b) -> {
+                                        if (b) {
+                                            log.error(
+                                                    "handleEndTxnOnPartition 
fail ! The topic {} does not exist in "
+                                                            + "broker, "
+                                                            + "txnId: [{}], 
txnAction: [{}]", topic,
+                                                    txnID, 
TxnAction.valueOf(txnAction));
+                                            
writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
+                                                    
ServerError.ServiceNotReady,
+                                                    "The topic " + topic + " 
does not exist in broker.",
+                                                    txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
+                                        } else {
+                                            log.warn(
+                                                    "handleEndTxnOnPartition 
fail ! The topic {} has not been created, "
+                                                            + "txnId: [{}], 
txnAction: [{}]",
+                                                    topic, txnID, 
TxnAction.valueOf(txnAction));
+                                            
writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
+                                                    txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
+                                        }
+                                    });
                         }).exceptionally(e -> {
                             log.error("handleEndTxnOnPartition fail ! topic 
{}, "
                                             + "txnId: [{}], txnAction: [{}]", 
topic, txnID,
@@ -2954,7 +2960,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                     requestId, ServerError.ServiceNotReady,
                                     e.getMessage(), txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                             return null;
-                });
+
+                        });
             }
         }, ctx.executor()).exceptionally(e -> {
             log.error("handleEndTxnOnPartition fail ! topic {}, "
@@ -2984,7 +2991,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     new TxnID(txnidMostBits, txnidLeastBits), txnAction);
         }
 
-        CompletableFuture<Optional<Topic>> topicFuture = 
service.getTopicIfExists(TopicName.get(topic).toString());
+        TopicName topicName = TopicName.get(topic);
+        CompletableFuture<Optional<Topic>> topicFuture = 
service.getTopicIfExists(topicName.toString());
         topicFuture.thenAcceptAsync(optionalTopic -> {
             if (optionalTopic.isPresent()) {
                 Subscription subscription = 
optionalTopic.get().getSubscription(subName);
@@ -3019,24 +3027,31 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                     
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, 
txnidMostBits));
                         });
             } else {
-                getBrokerService().getManagedLedgerFactory()
-                        
.asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
-                        .thenAccept((b) -> {
-                            if (b) {
-                                log.error("handleEndTxnOnSubscription fail! 
The topic {} does not exist in broker, "
-                                                + "subscription: {}, txnId: 
[{}], txnAction: [{}]", topic, subName,
-                                        txnID, TxnAction.valueOf(txnAction));
-                                
writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
-                                        requestId, txnID.getLeastSigBits(), 
txnID.getMostSigBits(),
-                                        ServerError.ServiceNotReady,
-                                        "The topic " + topic + " does not 
exist in broker."));
-                            } else {
-                                log.warn("handleEndTxnOnSubscription fail ! 
The topic {} has not been created, "
-                                                + "subscription: {} txnId: 
[{}], txnAction: [{}]",
-                                        topic, subName, txnID, 
TxnAction.valueOf(txnAction));
-                                
writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId,
-                                        txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
-                            }
+                getBrokerService().getManagedLedgerFactoryForTopic(topicName)
+                        .thenCompose(managedLedgerFactory -> {
+                            return 
managedLedgerFactory.asyncExists(topicName.getPersistenceNamingEncoding())
+                                    .thenAccept((b) -> {
+                                        if (b) {
+                                            log.error(
+                                                    
"handleEndTxnOnSubscription fail! The topic {} does not exist in "
+                                                            + "broker, "
+                                                            + "subscription: 
{}, txnId: [{}], txnAction: [{}]", topic,
+                                                    subName,
+                                                    txnID, 
TxnAction.valueOf(txnAction));
+                                            
writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+                                                    requestId, 
txnID.getLeastSigBits(), txnID.getMostSigBits(),
+                                                    
ServerError.ServiceNotReady,
+                                                    "The topic " + topic + " 
does not exist in broker."));
+                                        } else {
+                                            log.warn(
+                                                    
"handleEndTxnOnSubscription fail ! The topic {} has not been "
+                                                    + "created, "
+                                                            + "subscription: 
{} txnId: [{}], txnAction: [{}]",
+                                                    topic, subName, txnID, 
TxnAction.valueOf(txnAction));
+                                            
writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId,
+                                                    txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
+                                        }
+                                    });
                         }).exceptionally(e -> {
                             log.error("handleEndTxnOnSubscription fail ! topic 
{}, subscription: {}"
                                             + "txnId: [{}], txnAction: [{}]", 
topic, subName,
@@ -3045,7 +3060,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                     requestId, txnID.getLeastSigBits(), 
txnID.getMostSigBits(),
                                     ServerError.ServiceNotReady, 
e.getMessage()));
                             return null;
-                });
+                        });
             }
         }, ctx.executor()).exceptionally(e -> {
             log.error("handleEndTxnOnSubscription fail ! topic: {}, 
subscription: {}"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f8581cfc799..3cce175660e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -74,12 +74,14 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionBound;
 import org.apache.bookkeeper.mledger.PositionFactory;
@@ -1232,26 +1234,34 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 .getTransactionPendingAckStoreSuffix(topic,
                         Codec.encode(subscriptionName)));
         if 
(brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
-            
getBrokerService().getManagedLedgerFactory().asyncDelete(tn.getPersistenceNamingEncoding(),
-                    getBrokerService().getManagedLedgerConfig(tn),
-                    new AsyncCallbacks.DeleteLedgerCallback() {
-                        @Override
-                        public void deleteLedgerComplete(Object ctx) {
-                            
asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture);
-                        }
-
-                        @Override
-                        public void deleteLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
-                            if (exception instanceof 
MetadataNotFoundException) {
+            CompletableFuture<ManagedLedgerConfig> managedLedgerConfig = 
getBrokerService().getManagedLedgerConfig(tn);
+            managedLedgerConfig.thenAccept(config -> {
+                ManagedLedgerFactory managedLedgerFactory =
+                        getBrokerService().getManagedLedgerFactoryForTopic(tn, 
config.getStorageClassName());
+                
managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(),
+                        managedLedgerConfig,
+                        new AsyncCallbacks.DeleteLedgerCallback() {
+                            @Override
+                            public void deleteLedgerComplete(Object ctx) {
                                 
asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture);
-                                return;
                             }
 
-                            unsubscribeFuture.completeExceptionally(exception);
-                            log.error("[{}][{}] Error deleting subscription 
pending ack store",
-                                    topic, subscriptionName, exception);
-                        }
-                    }, null);
+                            @Override
+                            public void 
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                                if (exception instanceof 
MetadataNotFoundException) {
+                                    
asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture);
+                                    return;
+                                }
+
+                                
unsubscribeFuture.completeExceptionally(exception);
+                                log.error("[{}][{}] Error deleting 
subscription pending ack store",
+                                        topic, subscriptionName, exception);
+                            }
+                        }, null);
+            }).exceptionally(ex -> {
+                unsubscribeFuture.completeExceptionally(ex);
+                return null;
+            });
         } else {
             asyncDeleteCursorWithClearDelayedMessage(subscriptionName, 
unsubscribeFuture);
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
index 489d37dd0a3..114f962cb81 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
@@ -131,7 +131,7 @@ abstract class AbstractMetrics {
      * @return
      */
     protected ManagedLedgerFactoryMXBean getManagedLedgerCacheStats() {
-        return pulsar.getManagedLedgerFactory().getCacheStats();
+        return pulsar.getDefaultManagedLedgerFactory().getCacheStats();
     }
 
     /**
@@ -140,7 +140,7 @@ abstract class AbstractMetrics {
      * @return
      */
     protected Map<String, ManagedLedger> getManagedLedgers() {
-        return pulsar.getManagedLedgerFactory().getManagedLedgers();
+        return pulsar.getDefaultManagedLedgerFactory().getManagedLedgers();
     }
 
     protected String getLocalClusterName() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
index 52c69265c2f..925fcb28b7a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
@@ -52,8 +52,7 @@ public class ManagedLedgerMetrics extends AbstractMetrics {
         this.metricsCollection = new ArrayList<>();
         this.ledgersByDimensionMap = new HashMap<>();
         this.tempAggregatedMetricsMap = new HashMap<>();
-        this.statsPeriodSeconds = pulsar.getManagedLedgerFactory()
-                .getConfig().getStatsPeriodSeconds();
+        this.statsPeriodSeconds = 
pulsar.getDefaultManagedLedgerFactory().getConfig().getStatsPeriodSeconds();
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 6b4d08c359d..8c3cb39c925 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -55,6 +55,8 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
 import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics;
 import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
+import org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
@@ -485,12 +487,14 @@ public class PrometheusMetricsGenerator implements 
AutoCloseable {
     }
 
     private static void generateManagedLedgerBookieClientMetrics(PulsarService 
pulsar, SimpleTextOutputStream stream) {
-        StatsProvider statsProvider = 
pulsar.getManagedLedgerClientFactory().getStatsProvider();
-        if (statsProvider instanceof NullStatsProvider) {
-            return;
-        }
+        ManagedLedgerStorageClass defaultStorageClass = 
pulsar.getManagedLedgerStorage().getDefaultStorageClass();
+        if (defaultStorageClass instanceof BookkeeperManagedLedgerStorageClass 
bkStorageClass) {
+            StatsProvider statsProvider = bkStorageClass.getStatsProvider();
+            if (statsProvider instanceof NullStatsProvider) {
+                return;
+            }
 
-        try (Writer writer = new OutputStreamWriter(new 
BufferedOutputStream(new OutputStream() {
+            try (Writer writer = new OutputStreamWriter(new 
BufferedOutputStream(new OutputStream() {
                 @Override
                 public void write(int b) throws IOException {
                     stream.writeByte(b);
@@ -501,9 +505,10 @@ public class PrometheusMetricsGenerator implements 
AutoCloseable {
                     stream.write(b, off, len);
                 }
             }), StandardCharsets.UTF_8)) {
-            statsProvider.writeAllMetrics(writer);
-        } catch (IOException e) {
-            log.error("Failed to write managed ledger bookie client metrics", 
e);
+                statsProvider.writeAllMetrics(writer);
+            } catch (IOException e) {
+                log.error("Failed to write managed ledger bookie client 
metrics", e);
+            }
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/BookkeeperManagedLedgerStorageClass.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/BookkeeperManagedLedgerStorageClass.java
new file mode 100644
index 00000000000..1f05cde72a5
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/BookkeeperManagedLedgerStorageClass.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.storage;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.stats.StatsProvider;
+
+/**
+ * ManagedLedgerStorageClass represents a configured instance of 
ManagedLedgerFactory for managed ledgers.
+ * This instance is backed by a bookkeeper storage.
+ */
+public interface BookkeeperManagedLedgerStorageClass extends 
ManagedLedgerStorageClass {
+    /**
+     * Return the bookkeeper client instance used by this instance.
+     *
+     * @return the bookkeeper client.
+     */
+    BookKeeper getBookKeeperClient();
+
+    /**
+     * Return the stats provider to expose the stats of the storage 
implementation.
+     *
+     * @return the stats provider.
+     */
+    StatsProvider getStatsProvider();
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
index 944d2badf75..720798123e7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
@@ -21,9 +21,8 @@ package org.apache.pulsar.broker.storage;
 import io.netty.channel.EventLoopGroup;
 import io.opentelemetry.api.OpenTelemetry;
 import java.io.IOException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
-import org.apache.bookkeeper.stats.StatsProvider;
+import java.util.Collection;
+import java.util.Optional;
 import org.apache.pulsar.broker.BookKeeperClientFactory;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.classification.InterfaceAudience.Private;
@@ -33,6 +32,12 @@ import 
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
 /**
  * Storage to access {@link org.apache.bookkeeper.mledger.ManagedLedger}s.
+ * <p>
+ * The interface provides the abstraction to access the storage layer for 
managed ledgers.
+ * The interface supports multiple storage classes, each with its own 
configuration. The default
+ * implementation supports a single instance of {@link 
BookkeeperManagedLedgerStorageClass}.
+ * Implementations can provide multiple storage classes. The default storage 
class is used
+ * for topics unless it is overridden by the persistency policy at topic or 
namespace level.
  */
 @Private
 @Unstable
@@ -52,25 +57,25 @@ public interface ManagedLedgerStorage extends AutoCloseable 
{
                     OpenTelemetry openTelemetry) throws Exception;
 
     /**
-     * Return the factory to create {@link ManagedLedgerFactory}.
-     *
-     * @return the factory to create {@link ManagedLedgerFactory}.
+     * Get all configured storage class instances.
+     * @return all configured storage class instances
      */
-    ManagedLedgerFactory getManagedLedgerFactory();
+    Collection<ManagedLedgerStorageClass> getStorageClasses();
 
     /**
-     * Return the stats provider to expose the stats of the storage 
implementation.
-     *
-     * @return the stats provider.
+     * Get the default storage class.
+     * @return default storage class
      */
-    StatsProvider getStatsProvider();
+    default ManagedLedgerStorageClass getDefaultStorageClass() {
+        return getStorageClasses().stream().findFirst().get();
+    }
 
     /**
-     * Return the default bookkeeper client.
-     *
-     * @return the default bookkeeper client.
+     * Lookup a storage class by name.
+     * @param name storage class name
+     * @return storage class instance, or empty if not found
      */
-    BookKeeper getBookKeeperClient();
+    Optional<ManagedLedgerStorageClass> getManagedLedgerStorageClass(String 
name);
 
     /**
      * Close the storage.
@@ -97,5 +102,4 @@ public interface ManagedLedgerStorage extends AutoCloseable {
         storage.initialize(conf, metadataStore, bkProvider, eventLoopGroup, 
openTelemetry);
         return storage;
     }
-
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorageClass.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorageClass.java
new file mode 100644
index 00000000000..8cbe5c3b411
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorageClass.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.storage;
+
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * ManagedLedgerStorageClass represents a configured instance of 
ManagedLedgerFactory for managed ledgers.
+ * The {@link ManagedLedgerStorage} can hold multiple storage classes, and 
each storage class can have its own
+ * configuration.
+ */
[email protected]
[email protected]
+public interface ManagedLedgerStorageClass {
+    /**
+     * Return the name of the storage class.
+     *
+     * @return the name of the storage class.
+     */
+    String getName();
+    /**
+     * Return the factory to create {@link ManagedLedgerFactory}.
+     *
+     * @return the factory to create {@link ManagedLedgerFactory}.
+     */
+    ManagedLedgerFactory getManagedLedgerFactory();
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index 88a3968b7b4..f2ff5d519d8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -311,8 +311,13 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
                         
SnapshotSegmentAbortedTxnProcessorImpl.this.topic.getName());
             }
         };
-        
topic.getBrokerService().getPulsar().getManagedLedgerFactory().asyncOpenReadOnlyManagedLedger(
-                topicName.getPersistenceNamingEncoding(), callback, 
topic.getManagedLedger().getConfig(), null);
+        
topic.getBrokerService().getManagedLedgerFactoryForTopic(topicName).thenAccept(managedLedgerFactory
 ->
+                        
managedLedgerFactory.asyncOpenReadOnlyManagedLedger(topicName.getPersistenceNamingEncoding(),
+                                callback, 
topic.getManagedLedger().getConfig(), null))
+                .exceptionally(e -> {
+                    future.completeExceptionally(e);
+                    return null;
+                });
         return wait(future, "open read only ml for " + topicName);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
index 6fc61d423ce..12f761bb4df 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
@@ -25,9 +25,11 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.broker.transaction.exception.pendingack.TransactionPendingAckException;
@@ -84,7 +86,8 @@ public class MLPendingAckStoreProvider implements 
TransactionPendingAckStoreProv
         }
 
         PersistentTopic originPersistentTopic = (PersistentTopic) 
subscription.getTopic();
-        PulsarService pulsarService = 
originPersistentTopic.getBrokerService().getPulsar();
+        BrokerService brokerService = originPersistentTopic.getBrokerService();
+        PulsarService pulsarService = brokerService.getPulsar();
 
         final Timer brokerClientSharedTimer =
                 pulsarService.getBrokerClientSharedTimer();
@@ -103,93 +106,127 @@ public class MLPendingAckStoreProvider implements 
TransactionPendingAckStoreProv
 
         String pendingAckTopicName = MLPendingAckStore
                 
.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), 
subscription.getName());
-        originPersistentTopic.getBrokerService().getManagedLedgerFactory()
-                .asyncExists(TopicName.get(pendingAckTopicName)
-                        .getPersistenceNamingEncoding()).thenAccept(exist -> {
-            TopicName topicName;
-            if (exist) {
-                topicName = TopicName.get(pendingAckTopicName);
-            } else {
-                topicName = TopicName.get(originPersistentTopic.getName());
-            }
-            originPersistentTopic.getBrokerService()
-                    .getManagedLedgerConfig(topicName).thenAccept(config -> {
-                config.setCreateIfMissing(true);
-                
originPersistentTopic.getBrokerService().getManagedLedgerFactory()
-                        
.asyncOpen(TopicName.get(pendingAckTopicName).getPersistenceNamingEncoding(),
-                                config, new 
AsyncCallbacks.OpenLedgerCallback() {
-                                    @Override
-                                    public void 
openLedgerComplete(ManagedLedger ledger, Object ctx) {
-                                        ledger.asyncOpenCursor(
-                                                
MLPendingAckStore.getTransactionPendingAckStoreCursorName(),
-                                                InitialPosition.Earliest, new 
AsyncCallbacks.OpenCursorCallback() {
-                                                    @Override
-                                                    public void 
openCursorComplete(ManagedCursor cursor, Object ctx) {
-                                                        
pendingAckStoreFuture.complete(new MLPendingAckStore(ledger,
-                                                                cursor,
-                                                                
subscription.getCursor(),
-                                                                
originPersistentTopic
-                                                                        
.getBrokerService()
-                                                                        
.getPulsar()
-                                                                        
.getConfiguration()
-                                                                        
.getTransactionPendingAckLogIndexMinLag(),
-                                                                
txnLogBufferedWriterConfig,
-                                                                
brokerClientSharedTimer, bufferedWriterMetrics,
-                                                                
originPersistentTopic
-                                                                        
.getBrokerService()
-                                                                        
.getPulsar()
-                                                                        
.getOrderedExecutor()
-                                                                        
.chooseThread()));
-                                                        if 
(log.isDebugEnabled()) {
-                                                            log.debug("{},{} 
open MLPendingAckStore cursor success",
-                                                                    
originPersistentTopic.getName(),
-                                                                    
subscription.getName());
-                                                        }
-                                                    }
-
-                                                    @Override
-                                                    public void 
openCursorFailed(ManagedLedgerException exception,
-                                                                               
  Object ctx) {
-                                                        log.error("{},{} open 
MLPendingAckStore cursor failed."
-                                                                , 
originPersistentTopic.getName(),
-                                                                
subscription.getName(), exception);
-                                                        
pendingAckStoreFuture.completeExceptionally(exception);
-                                                    }
-                                                }, null);
-                                    }
-
-                                    @Override
-                                    public void 
openLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                                        log.error("{}, {} open 
MLPendingAckStore managedLedger failed."
-                                                , 
originPersistentTopic.getName(), subscription.getName(), exception);
-                                        
pendingAckStoreFuture.completeExceptionally(exception);
-                                    }
-                                }, () -> 
CompletableFuture.completedFuture(true), null);
+        TopicName pendingAckTopicNameObject = 
TopicName.get(pendingAckTopicName);
+        
brokerService.getManagedLedgerFactoryForTopic(pendingAckTopicNameObject)
+                .thenAccept(managedLedgerFactory -> {
+                    managedLedgerFactory.asyncExists(pendingAckTopicNameObject
+                            .getPersistenceNamingEncoding()).thenAccept(exist 
-> {
+                        TopicName topicName;
+                        if (exist) {
+                            topicName = pendingAckTopicNameObject;
+                        } else {
+                            topicName = 
TopicName.get(originPersistentTopic.getName());
+                        }
+                        
brokerService.getManagedLedgerConfig(topicName).thenAccept(config -> {
+                            internalNewPendingAckStore(subscription, config, 
brokerService, topicName,
+                                    pendingAckTopicNameObject, 
pendingAckStoreFuture, txnLogBufferedWriterConfig,
+                                    brokerClientSharedTimer, 
originPersistentTopic);
+                        }).exceptionally(e -> {
+                            Throwable t = 
FutureUtil.unwrapCompletionException(e);
+                            log.error("[{}] [{}] Failed to get managedLedger 
config when init pending ack "
+                                            + "store!",
+                                    originPersistentTopic, subscription, t);
+                            pendingAckStoreFuture.completeExceptionally(t);
+                            return null;
+
+                        });
                     }).exceptionally(e -> {
                         Throwable t = FutureUtil.unwrapCompletionException(e);
-                        log.error("[{}] [{}] Failed to get managedLedger 
config when init pending ack store!",
+                        log.error("[{}] [{}] Failed to check the pending ack 
topic exist when init pending ack store!",
                                 originPersistentTopic, subscription, t);
                         pendingAckStoreFuture.completeExceptionally(t);
                         return null;
-
                     });
                 }).exceptionally(e -> {
                     Throwable t = FutureUtil.unwrapCompletionException(e);
-                    log.error("[{}] [{}] Failed to check the pending ack topic 
exist when init pending ack store!",
-                            originPersistentTopic, subscription, t);
+                    log.error("[{}] [{}] Failed to get managedLedger config 
when init pending ack store!",
+                            pendingAckTopicNameObject, subscription, t);
                     pendingAckStoreFuture.completeExceptionally(t);
                     return null;
                 });
         return pendingAckStoreFuture;
     }
 
+    private static void internalNewPendingAckStore(PersistentSubscription 
subscription, ManagedLedgerConfig config,
+                                                   BrokerService 
brokerService, TopicName topicName,
+                                                   TopicName 
pendingAckTopicNameObject,
+                                                   
CompletableFuture<PendingAckStore> pendingAckStoreFuture,
+                                                   TxnLogBufferedWriterConfig 
txnLogBufferedWriterConfig,
+                                                   Timer 
brokerClientSharedTimer,
+                                                   PersistentTopic 
originPersistentTopic) {
+        config.setCreateIfMissing(true);
+        brokerService
+                .getManagedLedgerFactoryForTopic(topicName, 
config.getStorageClassName())
+                
.asyncOpen(pendingAckTopicNameObject.getPersistenceNamingEncoding(),
+                        config, new AsyncCallbacks.OpenLedgerCallback() {
+                            @Override
+                            public void openLedgerComplete(ManagedLedger 
ledger, Object ctx) {
+                                ledger.asyncOpenCursor(
+                                        
MLPendingAckStore.getTransactionPendingAckStoreCursorName(),
+                                        InitialPosition.Earliest,
+                                        new 
AsyncCallbacks.OpenCursorCallback() {
+                                            @Override
+                                            public void 
openCursorComplete(ManagedCursor cursor,
+                                                                           
Object ctx) {
+                                                pendingAckStoreFuture.complete(
+                                                        new 
MLPendingAckStore(ledger,
+                                                                cursor,
+                                                                
subscription.getCursor(),
+                                                                brokerService
+                                                                        
.getPulsar()
+                                                                        
.getConfiguration()
+                                                                        
.getTransactionPendingAckLogIndexMinLag(),
+                                                                
txnLogBufferedWriterConfig,
+                                                                
brokerClientSharedTimer,
+                                                                
bufferedWriterMetrics,
+                                                                brokerService
+                                                                        
.getPulsar()
+                                                                        
.getOrderedExecutor()
+                                                                        
.chooseThread()));
+                                                if (log.isDebugEnabled()) {
+                                                    log.debug(
+                                                            "{},{} open 
MLPendingAckStore cursor "
+                                                                    + 
"success",
+                                                            
originPersistentTopic.getName(),
+                                                            
subscription.getName());
+                                                }
+                                            }
+
+                                            @Override
+                                            public void openCursorFailed(
+                                                    ManagedLedgerException 
exception,
+                                                    Object ctx) {
+                                                log.error(
+                                                        "{},{} open 
MLPendingAckStore cursor "
+                                                                + "failed."
+                                                        , 
originPersistentTopic.getName(),
+                                                        
subscription.getName(), exception);
+                                                
pendingAckStoreFuture.completeExceptionally(
+                                                        exception);
+                                            }
+                                        }, null);
+                            }
+
+                            @Override
+                            public void 
openLedgerFailed(ManagedLedgerException exception,
+                                                         Object ctx) {
+                                log.error("{}, {} open MLPendingAckStore 
managedLedger failed."
+                                        , originPersistentTopic.getName(), 
subscription.getName(),
+                                        exception);
+                                
pendingAckStoreFuture.completeExceptionally(exception);
+                            }
+                        }, () -> CompletableFuture.completedFuture(true), 
null);
+    }
+
     @Override
     public CompletableFuture<Boolean> 
checkInitializedBefore(PersistentSubscription subscription) {
         PersistentTopic originPersistentTopic = (PersistentTopic) 
subscription.getTopic();
         String pendingAckTopicName = MLPendingAckStore
                 
.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), 
subscription.getName());
-        return 
originPersistentTopic.getBrokerService().getManagedLedgerFactory()
-                
.asyncExists(TopicName.get(pendingAckTopicName).getPersistenceNamingEncoding());
+        TopicName topicName = TopicName.get(pendingAckTopicName);
+        return 
originPersistentTopic.getBrokerService().getManagedLedgerFactoryForTopic(topicName)
+                .thenCompose(managedLedgerFactory -> 
managedLedgerFactory.asyncExists(
+                        topicName.getPersistenceNamingEncoding()));
     }
 
     private static class MLTxnPendingAckLogBufferedWriterMetrics extends 
TxnLogBufferedWriterMetricsStats{
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 1ea29c9d431..9aa2dcc700c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -135,7 +135,7 @@ public class AdminApiOffloadTest extends 
MockedPulsarServiceBaseTest {
             }
         }
 
-        ManagedLedgerInfo info = 
pulsar.getManagedLedgerFactory().getManagedLedgerInfo(mlName);
+        ManagedLedgerInfo info = 
pulsar.getDefaultManagedLedgerFactory().getManagedLedgerInfo(mlName);
         assertEquals(info.ledgers.size(), 2);
 
         assertEquals(admin.topics().offloadStatus(topicName).getStatus(), 
LongRunningProcessStatus.Status.NOT_RUN);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 4a1dbface2c..26da4116d09 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -588,24 +588,24 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
         // wait config to be updated
         Awaitility.await().until(() -> {
-            return 
pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize() == 1 * 
1024L * 1024L
-                    && 
pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark()
 == 0.8
-                    && 
pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold() == 
TimeUnit.MILLISECONDS
+            return 
pulsar.getDefaultManagedLedgerFactory().getEntryCacheManager().getMaxSize() == 
1 * 1024L * 1024L
+                    && 
pulsar.getDefaultManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark()
 == 0.8
+                    && 
pulsar.getDefaultManagedLedgerFactory().getCacheEvictionTimeThreshold() == 
TimeUnit.MILLISECONDS
                     .toNanos(2000);
         });
 
         // verify value is updated
-        
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize(),
 1 * 1024L * 1024L);
-        
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(),
 0.8);
-        
assertEquals(pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold(), 
TimeUnit.MILLISECONDS
+        
assertEquals(pulsar.getDefaultManagedLedgerFactory().getEntryCacheManager().getMaxSize(),
 1 * 1024L * 1024L);
+        
assertEquals(pulsar.getDefaultManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(),
 0.8);
+        
assertEquals(pulsar.getDefaultManagedLedgerFactory().getCacheEvictionTimeThreshold(),
 TimeUnit.MILLISECONDS
                 .toNanos(2000));
 
         restartBroker();
 
         // verify value again
-        
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize(),
 1 * 1024L * 1024L);
-        
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(),
 0.8);
-        
assertEquals(pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold(), 
TimeUnit.MILLISECONDS
+        
assertEquals(pulsar.getDefaultManagedLedgerFactory().getEntryCacheManager().getMaxSize(),
 1 * 1024L * 1024L);
+        
assertEquals(pulsar.getDefaultManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(),
 0.8);
+        
assertEquals(pulsar.getDefaultManagedLedgerFactory().getCacheEvictionTimeThreshold(),
 TimeUnit.MILLISECONDS
                 .toNanos(2000));
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 18fd3dd1c8b..aae2f7b8830 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -708,7 +708,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         // partitioned topic to more than 10.
         final String nonPartitionTopicName2 = "special-topic-partition-10";
         final String partitionedTopicName = "special-topic";
-        pulsar.getBrokerService().getManagedLedgerFactory()
+        pulsar.getDefaultManagedLedgerFactory()
                 
.open(TopicName.get(nonPartitionTopicName2).getPersistenceNamingEncoding());
         doAnswer(invocation -> {
             persistentTopics.namespaceName = NamespaceName.get("tenant", 
"namespace");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 82892ad353a..68a52c4b4c3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -127,7 +127,7 @@ public class BrokerBkEnsemblesTests extends 
BkEnsemblesTestBase {
         consumer.close();
         producer.close();
         pulsar.getBrokerService().removeTopicFromCache(topic);
-        ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) 
pulsar.getManagedLedgerFactory();
+        ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) 
pulsar.getDefaultManagedLedgerFactory();
         Field field = 
ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
         field.setAccessible(true);
         @SuppressWarnings("unchecked")
@@ -250,7 +250,7 @@ public class BrokerBkEnsemblesTests extends 
BkEnsemblesTestBase {
         // clean managed-ledger and recreate topic to clean any data from the 
cache
         producer.close();
         pulsar.getBrokerService().removeTopicFromCache(topic);
-        ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) 
pulsar.getManagedLedgerFactory();
+        ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) 
pulsar.getDefaultManagedLedgerFactory();
         Field field = 
ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
         field.setAccessible(true);
         @SuppressWarnings("unchecked")
@@ -399,7 +399,7 @@ public class BrokerBkEnsemblesTests extends 
BkEnsemblesTestBase {
 
     @Test
     public void testDeleteLedgerFactoryCorruptLedger() throws Exception {
-        ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) 
pulsar.getManagedLedgerFactory();
+        ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) 
pulsar.getDefaultManagedLedgerFactory();
         ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("test");
 
         // bookkeeper client
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index d7272fcffa9..be6f7c91437 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -274,7 +274,7 @@ public class BrokerBookieIsolationTest {
         assertAffinityBookies(ledgerManager, ml4.getLedgersInfoAsList(), 
isolatedBookies);
 
         ManagedLedgerClientFactory mlFactory =
-            (ManagedLedgerClientFactory) 
pulsarService.getManagedLedgerClientFactory();
+            (ManagedLedgerClientFactory) 
pulsarService.getManagedLedgerStorage();
         Map<EnsemblePlacementPolicyConfig, BookKeeper> 
bkPlacementPolicyToBkClientMap = mlFactory
                 .getBkEnsemblePolicyToBookKeeperMap();
 
@@ -588,7 +588,7 @@ public class BrokerBookieIsolationTest {
         assertAffinityBookies(ledgerManager, ml4.getLedgersInfoAsList(), 
isolatedBookies);
 
         ManagedLedgerClientFactory mlFactory =
-                (ManagedLedgerClientFactory) 
pulsarService.getManagedLedgerClientFactory();
+                (ManagedLedgerClientFactory) 
pulsarService.getManagedLedgerStorage();
         Map<EnsemblePlacementPolicyConfig, BookKeeper> 
bkPlacementPolicyToBkClientMap = mlFactory
                 .getBkEnsemblePolicyToBookKeeperMap();
 
@@ -751,7 +751,7 @@ public class BrokerBookieIsolationTest {
         assertAffinityBookies(ledgerManager, ml3.getLedgersInfoAsList(), 
isolatedBookies);
 
         ManagedLedgerClientFactory mlFactory =
-            (ManagedLedgerClientFactory) 
pulsarService.getManagedLedgerClientFactory();
+            (ManagedLedgerClientFactory) 
pulsarService.getManagedLedgerStorage();
         Map<EnsemblePlacementPolicyConfig, BookKeeper> 
bkPlacementPolicyToBkClientMap = mlFactory
                 .getBkEnsemblePolicyToBookKeeperMap();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 17209c83c13..e05bb836a3c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -76,8 +76,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader;
+import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
@@ -1451,7 +1451,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         ledgerField.setAccessible(true);
         @SuppressWarnings("unchecked")
         ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> 
ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) 
ledgerField
-                .get(pulsar.getManagedLedgerFactory());
+                .get(pulsar.getDefaultManagedLedgerFactory());
         CompletableFuture<ManagedLedgerImpl> future = new 
CompletableFuture<>();
         future.completeExceptionally(new ManagedLedgerException("ledger 
opening failed"));
         ledgers.put(namespace + "/persistent/deadLockTestTopic", future);
@@ -1517,8 +1517,7 @@ public class BrokerServiceTest extends BrokerTestBase {
 
         PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
 
-        ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) 
pulsar.getManagedLedgerClientFactory()
-                .getManagedLedgerFactory();
+        ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) 
pulsar.getDefaultManagedLedgerFactory();
         Field ledgersField = 
ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
         ledgersField.setAccessible(true);
         ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> 
ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) 
ledgersField
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 000ea7af915..69f3e2e4d39 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -61,6 +61,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -184,10 +185,11 @@ public class PersistentDispatcherFailoverConsumerTest {
         doReturn("mockCursor").when(cursorMock).getName();
 
         // call openLedgerComplete with ledgerMock on ML factory asyncOpen
+        ManagedLedgerFactory managedLedgerFactory = 
pulsarTestContext.getDefaultManagedLedgerFactory();
         doAnswer(invocationOnMock -> {
             ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
             return null;
-        }).when(pulsarTestContext.getManagedLedgerFactory())
+        }).when(managedLedgerFactory)
                 .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -196,7 +198,7 @@ public class PersistentDispatcherFailoverConsumerTest {
             ((OpenLedgerCallback) invocationOnMock.getArguments()[2])
                     .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null);
             return null;
-        }).when(pulsarTestContext.getManagedLedgerFactory())
+        }).when(managedLedgerFactory)
                 .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 85e0887465d..f75a3256747 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -92,7 +92,7 @@ public class PersistentTopicConcurrentTest extends 
MockedBookKeeperTestCase {
         cursorMock = ledger.openCursor("c1");
         ledgerMock = ledger;
         mlFactoryMock = factory;
-        doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+        doReturn(mlFactoryMock).when(pulsar).getDefaultManagedLedgerFactory();
 
         brokerService = spyWithClassAndConstructorArgs(BrokerService.class, 
pulsar, eventLoopGroup);
         doReturn(brokerService).when(pulsar).getBrokerService();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 36e741f8fa9..2896c13af00 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -596,7 +596,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         pulsarClient.close();
 
         assertNotNull(pulsar.getBrokerService().getTopicReference(topic));
-        assertTrue(((ManagedLedgerFactoryImpl) 
pulsar.getManagedLedgerFactory()).getManagedLedgers()
+        assertTrue(((ManagedLedgerFactoryImpl) 
pulsar.getDefaultManagedLedgerFactory()).getManagedLedgers()
                 .containsKey(topicName.getPersistenceNamingEncoding()));
 
         admin.namespaces().unload("prop/ns-abc");
@@ -613,7 +613,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         }
 
         // ML should have been closed as well
-        assertFalse(((ManagedLedgerFactoryImpl) 
pulsar.getManagedLedgerFactory()).getManagedLedgers()
+        assertFalse(((ManagedLedgerFactoryImpl) 
pulsar.getDefaultManagedLedgerFactory()).getManagedLedgers()
                 .containsKey(topicName.getPersistenceNamingEncoding()));
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 81c12df4f39..1e96da737dd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -90,6 +90,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
@@ -169,6 +170,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     private BrokerService brokerService;
 
     private EventLoopGroup eventLoopGroup;
+    private ManagedLedgerFactory managedLedgerFactory;
 
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
@@ -190,13 +192,14 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
                 .build();
         brokerService = pulsarTestContext.getBrokerService();
 
+        managedLedgerFactory = 
pulsarTestContext.getDefaultManagedLedgerFactory();
         doAnswer(invocationOnMock -> CompletableFuture.completedFuture(null))
-                
.when(pulsarTestContext.getManagedLedgerFactory()).getManagedLedgerPropertiesAsync(any());
+                
.when(managedLedgerFactory).getManagedLedgerPropertiesAsync(any());
         doAnswer(invocation -> {
             DeleteLedgerCallback deleteLedgerCallback = 
invocation.getArgument(1);
             deleteLedgerCallback.deleteLedgerComplete(null);
             return null;
-        
}).when(pulsarTestContext.getManagedLedgerFactory()).asyncDelete(any(), any(), 
any());
+        }).when(managedLedgerFactory).asyncDelete(any(), any(), any());
         // Mock serviceCnx.
         serverCnx = 
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class,
                 pulsarTestContext.getPulsarService());
@@ -247,7 +250,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         doAnswer(invocationOnMock -> {
             ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
             return null;
-        }).when(pulsarTestContext.getManagedLedgerFactory())
+        }).when(managedLedgerFactory)
                 .asyncOpen(anyString(), any(ManagedLedgerConfig.class), 
any(OpenLedgerCallback.class),
                         any(Supplier.class), any());
 
@@ -273,7 +276,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
                     .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null)).start();
 
             return null;
-        }).when(pulsarTestContext.getManagedLedgerFactory())
+        }).when(managedLedgerFactory)
                 .asyncOpen(anyString(), any(ManagedLedgerConfig.class), 
any(OpenLedgerCallback.class),
                         any(Supplier.class), any());
 
@@ -1395,7 +1398,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         doAnswer(invocationOnMock -> {
             ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
             return null;
-        }).when(pulsarTestContext.getManagedLedgerFactory())
+        }).when(managedLedgerFactory)
                 .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -1404,7 +1407,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
             ((OpenLedgerCallback) invocationOnMock.getArguments()[2])
                     .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null);
             return null;
-        }).when(pulsarTestContext.getManagedLedgerFactory())
+        }).when(managedLedgerFactory)
                 .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
index 3caf4a1f239..bd4a0889c73 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
@@ -187,14 +187,14 @@ public class ReplicationTxnTest extends 
OneWayReplicatorTestBase {
         for (int i = 0; i < txnLogPartitions; i++) {
             TopicName txnLog = TopicName.get(TopicDomain.persistent.value(),
                     NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX + 
i);
-            assertNotNull(pulsar1.getManagedLedgerFactory()
+            assertNotNull(pulsar1.getDefaultManagedLedgerFactory()
                     
.getManagedLedgerInfo(txnLog.getPersistenceNamingEncoding()));
             assertFalse(broker1.getTopics().containsKey(txnLog.toString()));
         }
         // __transaction_pending_ack: it only uses ML, will not create topic.
         TopicName pendingAck = TopicName.get(
                 MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, 
subscription));
-        assertNotNull(pulsar1.getManagedLedgerFactory()
+        assertNotNull(pulsar1.getDefaultManagedLedgerFactory()
                 
.getManagedLedgerInfo(pendingAck.getPersistenceNamingEncoding()));
         assertFalse(broker1.getTopics().containsKey(pendingAck.toString()));
         // __transaction_buffer_snapshot.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index aac7a85f477..2420ed58bed 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -1419,8 +1419,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         config1.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
         config2.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
 
-        ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) 
pulsar1.getManagedLedgerClientFactory()
-                .getManagedLedgerFactory();
+        ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) 
pulsar1.getDefaultManagedLedgerFactory();
         Field ledgersField = 
ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
         ledgersField.setAccessible(true);
         ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> 
ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) 
ledgersField
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 42b52d901e3..9a85995ab77 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -83,6 +83,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.commons.lang3.mutable.MutableInt;
@@ -202,6 +203,7 @@ public class ServerCnxTest {
     private ManagedLedger ledgerMock;
     private ManagedCursor cursorMock;
     private ConcurrentHashSet<EmbeddedChannel> 
channelsStoppedAnswerHealthCheck = new ConcurrentHashSet<>();
+    private ManagedLedgerFactory managedLedgerFactory;
 
 
     @BeforeMethod(alwaysRun = true)
@@ -218,7 +220,7 @@ public class ServerCnxTest {
                 .spyByDefault()
                 .build();
         pulsar = pulsarTestContext.getPulsarService();
-
+        managedLedgerFactory = 
pulsarTestContext.getDefaultManagedLedgerFactory();
         brokerService = pulsarTestContext.getBrokerService();
 
         namespaceService = pulsar.getNamespaceService();
@@ -2043,7 +2045,7 @@ public class ServerCnxTest {
                     () -> ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
                             null));
             return null;
-        }).when(pulsarTestContext.getManagedLedgerFactory())
+        }).when(managedLedgerFactory)
                 .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -2098,7 +2100,7 @@ public class ServerCnxTest {
                     () -> ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
                             null));
             return null;
-        }).when(pulsarTestContext.getManagedLedgerFactory())
+        }).when(managedLedgerFactory)
                 .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -2165,7 +2167,7 @@ public class ServerCnxTest {
 
             ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
             return null;
-        }).when(pulsarTestContext.getManagedLedgerFactory())
+        }).when(managedLedgerFactory)
                 .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -2244,7 +2246,7 @@ public class ServerCnxTest {
                     () -> ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
                             null));
             return null;
-        }).when(pulsarTestContext.getManagedLedgerFactory())
+        }).when(managedLedgerFactory)
                 .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -2316,7 +2318,7 @@ public class ServerCnxTest {
                             null));
 
             return null;
-        }).when(pulsarTestContext.getManagedLedgerFactory())
+        }).when(managedLedgerFactory)
                 .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -2391,7 +2393,7 @@ public class ServerCnxTest {
                     () -> ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
                             null));
             return null;
-        }).when(pulsarTestContext.getManagedLedgerFactory())
+        }).when(managedLedgerFactory)
                 .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -2400,7 +2402,7 @@ public class ServerCnxTest {
             openTopicFail.complete(() -> ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2])
                     .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null));
             return null;
-        }).when(pulsarTestContext.getManagedLedgerFactory())
+        }).when(managedLedgerFactory)
                 .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -2926,7 +2928,7 @@ public class ServerCnxTest {
             Thread.sleep(300);
             ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
             return null;
-        }).when(pulsarTestContext.getManagedLedgerFactory())
+        }).when(managedLedgerFactory)
                 .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -2937,7 +2939,7 @@ public class ServerCnxTest {
                     .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null)).start();
 
             return null;
-        }).when(pulsarTestContext.getManagedLedgerFactory())
+        }).when(managedLedgerFactory)
                 .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
index 7e8454f6c7e..fc10d315cb1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
@@ -70,7 +70,7 @@ public class TransactionMarkerDeleteTest extends 
TransactionTestBase {
     @Test
     public void testMarkerDeleteTimes() throws Exception {
         ManagedLedgerImpl managedLedger =
-                spy((ManagedLedgerImpl) 
getPulsarServiceList().get(0).getManagedLedgerFactory().open("test"));
+                spy((ManagedLedgerImpl) 
getPulsarServiceList().get(0).getDefaultManagedLedgerFactory().open("test"));
         PersistentTopic topic = mock(PersistentTopic.class);
         BrokerService brokerService = mock(BrokerService.class);
         PulsarService pulsarService = mock(PulsarService.class);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
index d0fd384ba78..d72e8f75427 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
@@ -105,7 +105,7 @@ public class ManagedLedgerMetricsTest extends 
BrokerTestBase {
             producer.send(message.getBytes());
         }
 
-        var managedLedgerFactory = (ManagedLedgerFactoryImpl) 
pulsar.getManagedLedgerFactory();
+        var managedLedgerFactory = (ManagedLedgerFactoryImpl) 
pulsar.getDefaultManagedLedgerFactory();
         for (Entry<String, ManagedLedger> ledger : 
managedLedgerFactory.getManagedLedgers().entrySet()) {
             ManagedLedgerMBeanImpl stats = (ManagedLedgerMBeanImpl) 
ledger.getValue().getStats();
             stats.refreshStats(1, TimeUnit.SECONDS);
@@ -205,7 +205,7 @@ public class ManagedLedgerMetricsTest extends 
BrokerTestBase {
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         managedLedgerConfig.setMaxEntriesPerLedger(2);
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(TransactionCoordinatorID.get(0),
-                pulsar.getManagedLedgerFactory(), managedLedgerConfig, 
txnLogBufferedWriterConfig,
+                pulsar.getDefaultManagedLedgerFactory(), managedLedgerConfig, 
txnLogBufferedWriterConfig,
                 transactionTimer, DISABLED_BUFFERED_WRITER_METRICS);
         mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
         ManagedLedgerMetrics metrics = new ManagedLedgerMetrics(pulsar);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
index 7860b0708e3..70e386c68aa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
@@ -71,7 +71,7 @@ class NonStartableTestPulsarService extends 
AbstractTestPulsarService {
         super(spyConfig, config, localMetadataStore, 
configurationMetadataStore, compactionServiceFactory,
                 brokerInterceptor, bookKeeperClientFactory, null);
         setPulsarResources(pulsarResources);
-        setManagedLedgerClientFactory(managedLedgerClientFactory);
+        setManagedLedgerStorage(managedLedgerClientFactory);
         try {
             setBrokerService(brokerServiceCustomizer.apply(
                     spyConfig.getBrokerService().spy(TestBrokerService.class, 
this, getIoEventLoopGroup())));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index 3d79a17a90f..cdb047079bf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.function.Consumer;
@@ -56,7 +57,9 @@ import org.apache.pulsar.broker.resources.TopicResources;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil;
+import org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
 import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
 import org.apache.pulsar.common.util.PortManager;
 import org.apache.pulsar.compaction.CompactionServiceFactory;
@@ -136,7 +139,7 @@ public class PulsarTestContext implements AutoCloseable {
 
     private final OrderedExecutor executor;
 
-    private final ManagedLedgerStorage managedLedgerClientFactory;
+    private final ManagedLedgerStorage managedLedgerStorage;
 
     private final PulsarService pulsarService;
 
@@ -167,8 +170,12 @@ public class PulsarTestContext implements AutoCloseable {
     private final boolean enableOpenTelemetry;
     private final InMemoryMetricReader openTelemetryMetricReader;
 
-    public ManagedLedgerFactory getManagedLedgerFactory() {
-        return managedLedgerClientFactory.getManagedLedgerFactory();
+    public ManagedLedgerStorage getManagedLedgerStorage() {
+        return managedLedgerStorage;
+    }
+
+    public ManagedLedgerFactory getDefaultManagedLedgerFactory() {
+        return 
getManagedLedgerStorage().getDefaultStorageClass().getManagedLedgerFactory();
     }
 
     public PulsarMockBookKeeper getMockBookKeeper() {
@@ -524,8 +531,8 @@ public class PulsarTestContext implements AutoCloseable {
          */
         public Builder managedLedgerClients(BookKeeper bookKeeperClient,
                                             ManagedLedgerFactory 
managedLedgerFactory) {
-            return managedLedgerClientFactory(
-                    
PulsarTestContext.createManagedLedgerClientFactory(bookKeeperClient, 
managedLedgerFactory));
+            return managedLedgerStorage(
+                    
PulsarTestContext.createManagedLedgerStorage(bookKeeperClient, 
managedLedgerFactory));
         }
 
         /**
@@ -569,6 +576,9 @@ public class PulsarTestContext implements AutoCloseable {
             if (configOverrideCustomizer != null) {
                 configOverrideCustomizer.accept(super.config);
             }
+            if (super.managedLedgerStorage != null && 
!MockUtil.isMock(super.managedLedgerStorage)) {
+                super.managedLedgerStorage = 
spyConfig.getManagedLedgerStorage().spy(super.managedLedgerStorage);
+            }
             initializeCommonPulsarServices(spyConfig);
             initializePulsarServices(spyConfig, this);
             if (pulsarServiceCustomizer != null) {
@@ -622,7 +632,7 @@ public class PulsarTestContext implements AutoCloseable {
         }
 
         private void initializeCommonPulsarServices(SpyConfig spyConfig) {
-            if (super.bookKeeperClient == null && 
super.managedLedgerClientFactory == null) {
+            if (super.bookKeeperClient == null && super.managedLedgerStorage 
== null) {
                 if (super.executor == null) {
                     OrderedExecutor createdExecutor = 
OrderedExecutor.newBuilder().numThreads(1)
                             .name(PulsarTestContext.class.getSimpleName() + 
"-executor").build();
@@ -645,8 +655,11 @@ public class PulsarTestContext implements AutoCloseable {
                 });
                 bookKeeperClient(mockBookKeeper);
             }
-            if (super.bookKeeperClient == null && 
super.managedLedgerClientFactory != null) {
-                
bookKeeperClient(super.managedLedgerClientFactory.getBookKeeperClient());
+            if (super.bookKeeperClient == null && super.managedLedgerStorage 
!= null) {
+                
bookKeeperClient(super.managedLedgerStorage.getStorageClasses().stream()
+                        
.filter(BookkeeperManagedLedgerStorageClass.class::isInstance)
+                        .map(BookkeeperManagedLedgerStorageClass.class::cast)
+                        
.map(BookkeeperManagedLedgerStorageClass::getBookKeeperClient).findFirst().get());
             }
             if (super.localMetadataStore == null || 
super.configurationMetadataStore == null) {
                 if (super.mockZooKeeper != null) {
@@ -725,8 +738,8 @@ public class PulsarTestContext implements AutoCloseable {
         }
 
         @Override
-        public Builder managedLedgerClientFactory(ManagedLedgerStorage 
managedLedgerClientFactory) {
-            throw new IllegalStateException("Cannot set 
managedLedgerClientFactory when startable.");
+        public Builder managedLedgerStorage(ManagedLedgerStorage 
managedLedgerStorage) {
+            throw new IllegalStateException("Cannot set managedLedgerStorage 
when startable.");
         }
 
         @Override
@@ -788,10 +801,12 @@ public class PulsarTestContext implements AutoCloseable {
 
         @Override
         protected void initializePulsarServices(SpyConfig spyConfig, Builder 
builder) {
-            if (builder.managedLedgerClientFactory == null) {
+            if (builder.managedLedgerStorage == null) {
                 ManagedLedgerFactory mlFactoryMock = 
Mockito.mock(ManagedLedgerFactory.class);
-                managedLedgerClientFactory(
-                        
PulsarTestContext.createManagedLedgerClientFactory(builder.bookKeeperClient, 
mlFactoryMock));
+                managedLedgerStorage(
+                        spyConfig.getManagedLedgerStorage()
+                                
.spy(PulsarTestContext.createManagedLedgerStorage(builder.bookKeeperClient,
+                                        mlFactoryMock)));
             }
             if (builder.pulsarResources == null) {
                 SpyConfig.SpyType spyConfigPulsarResources = 
spyConfig.getPulsarResources();
@@ -825,7 +840,7 @@ public class PulsarTestContext implements AutoCloseable {
                             builder.configurationMetadataStore, 
compactionServiceFactory,
                             builder.brokerInterceptor,
                             bookKeeperClientFactory, builder.pulsarResources,
-                            builder.managedLedgerClientFactory, 
builder.brokerServiceCustomizer);
+                            builder.managedLedgerStorage, 
builder.brokerServiceCustomizer);
             if (compactionServiceFactory != null) {
                 compactionServiceFactory.initialize(pulsarService);
             }
@@ -838,10 +853,31 @@ public class PulsarTestContext implements AutoCloseable {
     }
 
     @NotNull
-    private static ManagedLedgerStorage 
createManagedLedgerClientFactory(BookKeeper bookKeeperClient,
-                                                                         
ManagedLedgerFactory managedLedgerFactory) {
-        return new ManagedLedgerStorage() {
+    private static ManagedLedgerStorage createManagedLedgerStorage(BookKeeper 
bookKeeperClient,
+                                                                   
ManagedLedgerFactory managedLedgerFactory) {
+        BookkeeperManagedLedgerStorageClass managedLedgerStorageClass =
+                new BookkeeperManagedLedgerStorageClass() {
+                    @Override
+                    public String getName() {
+                        return "bookkeeper";
+                    }
+
+                    @Override
+                    public ManagedLedgerFactory getManagedLedgerFactory() {
+                        return managedLedgerFactory;
+                    }
+
+                    @Override
+                    public StatsProvider getStatsProvider() {
+                        return new NullStatsProvider();
+                    }
 
+                    @Override
+                    public BookKeeper getBookKeeperClient() {
+                        return bookKeeperClient;
+                    }
+                };
+        return new ManagedLedgerStorage() {
             @Override
             public void initialize(ServiceConfiguration conf, 
MetadataStoreExtended metadataStore,
                                    BookKeeperClientFactory bookkeeperProvider, 
EventLoopGroup eventLoopGroup,
@@ -849,18 +885,17 @@ public class PulsarTestContext implements AutoCloseable {
             }
 
             @Override
-            public ManagedLedgerFactory getManagedLedgerFactory() {
-                return managedLedgerFactory;
-            }
-
-            @Override
-            public StatsProvider getStatsProvider() {
-                return new NullStatsProvider();
+            public Collection<ManagedLedgerStorageClass> getStorageClasses() {
+                return List.of(managedLedgerStorageClass);
             }
 
             @Override
-            public BookKeeper getBookKeeperClient() {
-                return bookKeeperClient;
+            public Optional<ManagedLedgerStorageClass> 
getManagedLedgerStorageClass(String name) {
+                if (name == null || name.equals("bookkeeper")) {
+                    return Optional.of(managedLedgerStorageClass);
+                } else {
+                    return Optional.empty();
+                }
             }
 
             @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
index 64789d1f0d4..285eb1bba6d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
@@ -112,6 +112,8 @@ public class SpyConfig {
      */
     private final SpyType namespaceService;
 
+    private final SpyType managedLedgerStorage;
+
     /**
      * Create a builder for SpyConfig with no spies by default.
      *
@@ -141,5 +143,6 @@ public class SpyConfig {
         spyConfigBuilder.compactor(defaultSpyType);
         spyConfigBuilder.compactedServiceFactory(defaultSpyType);
         spyConfigBuilder.namespaceService(defaultSpyType);
+        spyConfigBuilder.managedLedgerStorage(defaultSpyType);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
index a0774414492..d82cd69c83d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
@@ -23,6 +23,7 @@ import 
io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import lombok.SneakyThrows;
 import org.apache.pulsar.broker.BookKeeperClientFactory;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -30,6 +31,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.compaction.CompactionServiceFactory;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
@@ -62,4 +64,23 @@ class StartableTestPulsarService extends 
AbstractTestPulsarService {
     public Supplier<NamespaceService> getNamespaceServiceProvider() throws 
PulsarServerException {
         return () -> 
spyConfig.getNamespaceService().spy(NamespaceService.class, this);
     }
+
+    @SneakyThrows
+    @Override
+    public ManagedLedgerStorage getManagedLedgerStorage() {
+        // support adding spy to managedLedgerStorage in beforePulsarStart 
method
+        if (super.getManagedLedgerStorage() == null) {
+            setManagedLedgerStorage(createManagedLedgerStorageSpy());
+        }
+        return super.getManagedLedgerStorage();
+    }
+
+    @Override
+    protected ManagedLedgerStorage newManagedLedgerStorage() throws Exception {
+        return getManagedLedgerStorage();
+    }
+
+    private ManagedLedgerStorage createManagedLedgerStorageSpy() throws 
Exception {
+        return 
spyConfig.getManagedLedgerStorage().spy(super.newManagedLedgerStorage());
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index f21e11b9802..14cc813a17d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -806,7 +806,7 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
                 //
             }
         };
-        pulsarService.getManagedLedgerFactory()
+        pulsarService.getDefaultManagedLedgerFactory()
                 
.asyncOpenReadOnlyManagedLedger(snapshotSegmentTopicName.getPersistenceNamingEncoding(),
 callback,
                         
brokerService.getManagedLedgerConfig(snapshotSegmentTopicName).get(),null);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 14b1d563c11..3d7ab902bf4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -247,7 +247,7 @@ public class TransactionProduceTest extends 
TransactionTestBase {
             if (partition >= 0) {
                 topic = TopicName.get(topic).toString() + 
TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
             }
-            return 
getPulsarServiceList().get(0).getManagedLedgerFactory().openReadOnlyCursor(
+            return 
getPulsarServiceList().get(0).getDefaultManagedLedgerFactory().openReadOnlyCursor(
                     TopicName.get(topic).getPersistenceNamingEncoding(),
                     PositionFactory.EARLIEST, new ManagedLedgerConfig());
         } catch (Exception e) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 5480b1a21d5..35c9048ebb5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -510,7 +510,7 @@ public class TransactionTest extends TransactionTestBase {
         admin.topics().createNonPartitionedTopic(topic);
         PulsarService pulsarService = super.getPulsarServiceList().get(0);
         pulsarService.getBrokerService().getTopics().clear();
-        ManagedLedgerFactory managedLedgerFactory = 
pulsarService.getBrokerService().getManagedLedgerFactory();
+        ManagedLedgerFactory managedLedgerFactory = 
pulsarService.getDefaultManagedLedgerFactory();
         Field field = 
ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
         field.setAccessible(true);
         ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> 
ledgers =
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 72aa078d5da..fc6a10e385a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -23,7 +23,7 @@ import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertM
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -45,6 +45,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.api.BKException;
@@ -952,8 +953,14 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
 
         assertNotNull(persistentTopic);
         BrokerService brokerService = spy(persistentTopic.getBrokerService());
-        doReturn(FutureUtil.failedFuture(new 
BrokerServiceException.ServiceUnitNotReadyException("test")))
-                .when(brokerService).getManagedLedgerConfig(any());
+        AtomicBoolean isGetManagedLedgerConfigFail = new AtomicBoolean(false);
+        doAnswer(invocation -> {
+            if (isGetManagedLedgerConfigFail.get()) {
+                return FutureUtil.failedFuture(new 
BrokerServiceException.ServiceUnitNotReadyException("test"));
+            } else {
+                return invocation.callRealMethod();
+            }
+        }).when(brokerService).getManagedLedgerConfig(any());
         Field field = AbstractTopic.class.getDeclaredField("brokerService");
         field.setAccessible(true);
         field.set(persistentTopic, brokerService);
@@ -968,11 +975,13 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
 
         producer.send("test");
         Transaction transaction = pulsarClient.newTransaction()
-                .withTransactionTimeout(30, TimeUnit.SECONDS).build().get();
+                .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
 
+        isGetManagedLedgerConfigFail.set(true);
         // pending ack init fail, so the ack will throw exception
         try {
             consumer.acknowledgeAsync(consumer.receive().getMessageId(), 
transaction).get();
+            fail("ack should fail");
         } catch (Exception e) {
             assertTrue(e.getCause() instanceof 
PulsarClientException.LookupException);
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
index 9396a80cf25..6f79c573ed3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
@@ -291,7 +291,7 @@ public class OrphanPersistentTopicTest extends 
ProducerConsumerBase {
                     Thread.sleep(10 * 1000);
                 }
                 log.info("Race condition occurs {} times", 
mockRaceConditionCounter.get());
-                
pulsar.getManagedLedgerFactory().delete(TopicName.get(tpName).getPersistenceNamingEncoding());
+                
pulsar.getDefaultManagedLedgerFactory().delete(TopicName.get(tpName).getPersistenceNamingEncoding());
             }
             return invocation.callRealMethod();
         
}).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 2e71e8cc28c..e76c3d8fb84 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -91,8 +91,13 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.broker.testcontext.SpyConfig;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -150,6 +155,14 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         super.producerBaseSetup();
     }
 
+    @Override
+    protected PulsarTestContext.Builder 
createPulsarTestContextBuilder(ServiceConfiguration conf) {
+        return super.createPulsarTestContextBuilder(conf)
+                .spyConfig(SpyConfig.builder()
+                        
.managedLedgerStorage(SpyConfig.SpyType.SPY_ALSO_INVOCATIONS)
+                        .build());
+    }
+
     @AfterMethod(alwaysRun = true)
     public void cleanupAfterMethod() throws Exception {
         try {
@@ -1097,18 +1110,25 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 
     }
 
+
     @Override
     protected void beforePulsarStart(PulsarService pulsar) throws Exception {
         super.beforePulsarStart(pulsar);
-        doAnswer(i0 -> {
-            ManagedLedgerFactory factory = (ManagedLedgerFactory) 
spy(i0.callRealMethod());
-            doAnswer(i1 -> {
-                EntryCacheManager manager = (EntryCacheManager) 
spy(i1.callRealMethod());
-                doAnswer(i2 -> 
spy(i2.callRealMethod())).when(manager).getEntryCache(any());
-                return manager;
-            }).when(factory).getEntryCacheManager();
-            return factory;
-        }).when(pulsar).getManagedLedgerFactory();
+        ManagedLedgerStorage managedLedgerStorage = 
pulsar.getManagedLedgerStorage();
+        doAnswer(invocation -> {
+            ManagedLedgerStorageClass managedLedgerStorageClass =
+                    (ManagedLedgerStorageClass) 
spy(invocation.callRealMethod());
+            doAnswer(i0 -> {
+                ManagedLedgerFactory factory = (ManagedLedgerFactory) 
spy(i0.callRealMethod());
+                doAnswer(i1 -> {
+                    EntryCacheManager manager = (EntryCacheManager) 
spy(i1.callRealMethod());
+                    doAnswer(i2 -> 
spy(i2.callRealMethod())).when(manager).getEntryCache(any());
+                    return manager;
+                }).when(factory).getEntryCacheManager();
+                return factory;
+            }).when(managedLedgerStorageClass).getManagedLedgerFactory();
+            return managedLedgerStorageClass;
+        }).when(managedLedgerStorage).getDefaultStorageClass();
     }
 
     /**
@@ -1126,6 +1146,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
     public void testActiveAndInActiveConsumerEntryCacheBehavior() throws 
Exception {
         log.info("-- Starting {} test --", methodName);
 
+
         final long batchMessageDelayMs = 100;
         final int receiverSize = 10;
         final String topicName = "cache-topic";
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
index 36c36735c06..390e81ad664 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
@@ -563,7 +563,7 @@ public class SubscriptionPauseOnAckStatPersistTest extends 
ProducerConsumerBase
         final String subscription = "s1";
         final int msgSendCount = 100;
         // Inject a injection to record the counter of calling 
"cursor.isCursorDataFullyPersistable".
-        final ManagedLedgerImpl ml = (ManagedLedgerImpl) 
pulsar.getBrokerService().getManagedLedgerFactory().open(mlName);
+        final ManagedLedgerImpl ml = (ManagedLedgerImpl) 
pulsar.getDefaultManagedLedgerFactory().open(mlName);
         final ManagedCursorImpl cursor = (ManagedCursorImpl) 
ml.openCursor(subscription);
         final ManagedCursorImpl spyCursor = Mockito.spy(cursor);
         AtomicInteger callingIsCursorDataFullyPersistableCounter = new 
AtomicInteger();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index d3cb1d60d37..0b3ff345acf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -624,7 +624,7 @@ public class V1_ProducerConsumerTest extends 
V1_ProducerConsumerBase {
                 return manager;
             }).when(factory).getEntryCacheManager();
             return factory;
-        }).when(pulsar).getManagedLedgerFactory();
+        }).when(pulsar).getDefaultManagedLedgerFactory();
     }
 
     /**
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
index 1395424b141..2b1b409b71c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
@@ -62,7 +62,7 @@ public class SequenceIdWithErrorTest extends 
BkEnsemblesTestBase {
         ManagedLedgerClientFactory clientFactory = new 
ManagedLedgerClientFactory();
         clientFactory.initialize(pulsar.getConfiguration(), 
pulsar.getLocalMetadataStore(),
                 pulsar.getBookKeeperClientFactory(), eventLoopGroup, 
OpenTelemetry.noop());
-        ManagedLedgerFactory mlFactory = 
clientFactory.getManagedLedgerFactory();
+        ManagedLedgerFactory mlFactory = 
clientFactory.getDefaultStorageClass().getManagedLedgerFactory();
         ManagedLedger ml = 
mlFactory.open(TopicName.get(topicName).getPersistenceNamingEncoding());
         ml.close();
         clientFactory.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 19f42a7e057..d75ccce7ff3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -927,7 +927,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         // verify second ledger created
         String managedLedgerName = 
((PersistentTopic)pulsar.getBrokerService().getTopicReference(topic).get())
             .getManagedLedger().getName();
-        ManagedLedgerInfo info = 
pulsar.getManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName);
+        ManagedLedgerInfo info = 
pulsar.getDefaultManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName);
         Assert.assertEquals(info.ledgers.size(), 2);
         Assert.assertTrue(ledgersOpened.isEmpty()); // no ledgers should have 
been opened
 
@@ -950,7 +950,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                     .send();
         }
 
-        info = 
pulsar.getManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName);
+        info = 
pulsar.getDefaultManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName);
         Assert.assertEquals(info.ledgers.size(), 3);
 
         // should only have opened the penultimate ledger to get stat
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
index df4e086748f..3fbc91e7d2e 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
@@ -30,17 +30,24 @@ public class PersistencePolicies {
     private int bookkeeperWriteQuorum;
     private int bookkeeperAckQuorum;
     private double managedLedgerMaxMarkDeleteRate;
+    private String managedLedgerStorageClassName;
 
     public PersistencePolicies() {
-        this(2, 2, 2, 0.0);
+        this(2, 2, 2, 0.0, null);
     }
 
     public PersistencePolicies(int bookkeeperEnsemble, int 
bookkeeperWriteQuorum, int bookkeeperAckQuorum,
-            double managedLedgerMaxMarkDeleteRate) {
+                               double managedLedgerMaxMarkDeleteRate) {
+        this(bookkeeperEnsemble, bookkeeperWriteQuorum, bookkeeperAckQuorum, 
managedLedgerMaxMarkDeleteRate, null);
+    }
+
+    public PersistencePolicies(int bookkeeperEnsemble, int 
bookkeeperWriteQuorum, int bookkeeperAckQuorum,
+            double managedLedgerMaxMarkDeleteRate, String 
managedLedgerStorageClassName) {
         this.bookkeeperEnsemble = bookkeeperEnsemble;
         this.bookkeeperWriteQuorum = bookkeeperWriteQuorum;
         this.bookkeeperAckQuorum = bookkeeperAckQuorum;
         this.managedLedgerMaxMarkDeleteRate = managedLedgerMaxMarkDeleteRate;
+        this.managedLedgerStorageClassName = managedLedgerStorageClassName;
     }
 
     public int getBookkeeperEnsemble() {
@@ -59,10 +66,14 @@ public class PersistencePolicies {
         return managedLedgerMaxMarkDeleteRate;
     }
 
+    public String getManagedLedgerStorageClassName() {
+        return managedLedgerStorageClassName;
+    }
+
     @Override
     public int hashCode() {
         return Objects.hash(bookkeeperEnsemble, bookkeeperWriteQuorum,
-                bookkeeperAckQuorum, managedLedgerMaxMarkDeleteRate);
+                bookkeeperAckQuorum, managedLedgerMaxMarkDeleteRate, 
managedLedgerStorageClassName);
     }
     @Override
     public boolean equals(Object obj) {
@@ -71,7 +82,8 @@ public class PersistencePolicies {
             return bookkeeperEnsemble == other.bookkeeperEnsemble
                     && bookkeeperWriteQuorum == other.bookkeeperWriteQuorum
                     && bookkeeperAckQuorum == other.bookkeeperAckQuorum
-                    && managedLedgerMaxMarkDeleteRate == 
other.managedLedgerMaxMarkDeleteRate;
+                    && managedLedgerMaxMarkDeleteRate == 
other.managedLedgerMaxMarkDeleteRate
+                    && managedLedgerStorageClassName == 
other.managedLedgerStorageClassName;
         }
 
         return false;
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index e8e644b6880..8adedcd14ac 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1379,6 +1379,11 @@ public class CmdNamespaces extends CmdBase {
                 description = "Throttling rate of mark-delete operation (0 
means no throttle)")
         private double managedLedgerMaxMarkDeleteRate = 0;
 
+        @Option(names = { "-c",
+                "--ml-storage-class" },
+                description = "Managed ledger storage class name")
+        private String managedLedgerStorageClassName;
+
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(namespaceName);
@@ -1390,7 +1395,8 @@ public class CmdNamespaces extends CmdBase {
                 throw new ParameterException("[--ml-mark-delete-max-rate] 
cannot less than 0.");
             }
             getAdmin().namespaces().setPersistence(namespace, new 
PersistencePolicies(bookkeeperEnsemble,
-                    bookkeeperWriteQuorum, bookkeeperAckQuorum, 
managedLedgerMaxMarkDeleteRate));
+                    bookkeeperWriteQuorum, bookkeeperAckQuorum, 
managedLedgerMaxMarkDeleteRate,
+                    managedLedgerStorageClassName));
         }
     }
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 3cc72db2e95..10850d107ed 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -1197,6 +1197,11 @@ public class CmdTopicPolicies extends CmdBase {
                 + "If set to true, the policy will be replicate to other 
clusters asynchronously", arity = "0")
         private boolean isGlobal = false;
 
+        @Option(names = { "-c",
+                "--ml-storage-class" },
+                description = "Managed ledger storage class name")
+        private String managedLedgerStorageClassName;
+
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(topicName);
@@ -1208,7 +1213,8 @@ public class CmdTopicPolicies extends CmdBase {
                 throw new ParameterException("[--ml-mark-delete-max-rate] 
cannot less than 0.");
             }
             getTopicPolicies(isGlobal).setPersistence(persistentTopic, new 
PersistencePolicies(bookkeeperEnsemble,
-                    bookkeeperWriteQuorum, bookkeeperAckQuorum, 
managedLedgerMaxMarkDeleteRate));
+                    bookkeeperWriteQuorum, bookkeeperAckQuorum, 
managedLedgerMaxMarkDeleteRate,
+                    managedLedgerStorageClassName));
         }
     }
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 261bd81a5b7..955d6e13e1d 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -2148,6 +2148,11 @@ public class CmdTopics extends CmdBase {
                 + "(0 means no throttle)")
         private double managedLedgerMaxMarkDeleteRate = 0;
 
+        @Option(names = { "-c",
+                "--ml-storage-class" },
+                description = "Managed ledger storage class name")
+        private String managedLedgerStorageClassName;
+
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(topicName);
@@ -2159,7 +2164,8 @@ public class CmdTopics extends CmdBase {
                 throw new ParameterException("[--ml-mark-delete-max-rate] 
cannot less than 0.");
             }
             getTopics().setPersistence(persistentTopic, new 
PersistencePolicies(bookkeeperEnsemble,
-                    bookkeeperWriteQuorum, bookkeeperAckQuorum, 
managedLedgerMaxMarkDeleteRate));
+                    bookkeeperWriteQuorum, bookkeeperAckQuorum, 
managedLedgerMaxMarkDeleteRate,
+                    managedLedgerStorageClassName));
         }
     }
 


Reply via email to