This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 731ec8364f0 [improve][broker][PIP-384] Decouple Bookkeeper client from
ManagedLedgerStorage and enable multiple ManagedLedgerFactory instances (#23313)
731ec8364f0 is described below
commit 731ec8364f050e3db1532ec8316cf76109865e3d
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Oct 8 04:48:09 2024 +0300
[improve][broker][PIP-384] Decouple Bookkeeper client from
ManagedLedgerStorage and enable multiple ManagedLedgerFactory instances (#23313)
---
.../bookkeeper/mledger/ManagedLedgerConfig.java | 4 +-
.../pulsar/broker/ManagedLedgerClientFactory.java | 50 +++++-
.../org/apache/pulsar/broker/PulsarService.java | 28 ++--
.../broker/TransactionMetadataStoreService.java | 4 +-
.../pulsar/broker/admin/impl/NamespacesBase.java | 2 +-
.../broker/admin/impl/PersistentTopicsBase.java | 36 +++--
.../pulsar/broker/service/BrokerService.java | 87 +++++++---
.../apache/pulsar/broker/service/ServerCnx.java | 95 ++++++-----
.../broker/service/persistent/PersistentTopic.java | 44 ++++--
.../broker/stats/metrics/AbstractMetrics.java | 4 +-
.../broker/stats/metrics/ManagedLedgerMetrics.java | 3 +-
.../prometheus/PrometheusMetricsGenerator.java | 21 ++-
.../BookkeeperManagedLedgerStorageClass.java | 42 +++++
.../broker/storage/ManagedLedgerStorage.java | 36 +++--
.../broker/storage/ManagedLedgerStorageClass.java | 45 ++++++
.../SnapshotSegmentAbortedTxnProcessorImpl.java | 9 +-
.../pendingack/impl/MLPendingAckStoreProvider.java | 175 +++++++++++++--------
.../pulsar/broker/admin/AdminApiOffloadTest.java | 2 +-
.../apache/pulsar/broker/admin/AdminApiTest.java | 18 +--
.../pulsar/broker/admin/PersistentTopicsTest.java | 2 +-
.../broker/service/BrokerBkEnsemblesTests.java | 6 +-
.../broker/service/BrokerBookieIsolationTest.java | 6 +-
.../pulsar/broker/service/BrokerServiceTest.java | 7 +-
.../PersistentDispatcherFailoverConsumerTest.java | 6 +-
.../service/PersistentTopicConcurrentTest.java | 2 +-
.../broker/service/PersistentTopicE2ETest.java | 4 +-
.../pulsar/broker/service/PersistentTopicTest.java | 15 +-
.../pulsar/broker/service/ReplicationTxnTest.java | 4 +-
.../pulsar/broker/service/ReplicatorTest.java | 3 +-
.../pulsar/broker/service/ServerCnxTest.java | 22 +--
.../service/TransactionMarkerDeleteTest.java | 2 +-
.../broker/stats/ManagedLedgerMetricsTest.java | 4 +-
.../testcontext/NonStartableTestPulsarService.java | 2 +-
.../broker/testcontext/PulsarTestContext.java | 87 +++++++---
.../pulsar/broker/testcontext/SpyConfig.java | 3 +
.../testcontext/StartableTestPulsarService.java | 21 +++
.../TopicTransactionBufferRecoverTest.java | 2 +-
.../broker/transaction/TransactionProduceTest.java | 2 +-
.../pulsar/broker/transaction/TransactionTest.java | 2 +-
.../pendingack/PendingAckPersistentTest.java | 17 +-
.../client/api/OrphanPersistentTopicTest.java | 2 +-
.../client/api/SimpleProducerConsumerTest.java | 39 +++--
.../api/SubscriptionPauseOnAckStatPersistTest.java | 2 +-
.../client/api/v1/V1_ProducerConsumerTest.java | 2 +-
.../client/impl/SequenceIdWithErrorTest.java | 2 +-
.../apache/pulsar/compaction/CompactionTest.java | 4 +-
.../common/policies/data/PersistencePolicies.java | 20 ++-
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 8 +-
.../apache/pulsar/admin/cli/CmdTopicPolicies.java | 8 +-
.../org/apache/pulsar/admin/cli/CmdTopics.java | 8 +-
50 files changed, 701 insertions(+), 318 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 7b28990f355..a1e1deb503e 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -87,7 +87,9 @@ public class ManagedLedgerConfig {
private int minimumBacklogEntriesForCaching = 1000;
private int maxBacklogBetweenCursorsForCaching = 1000;
private boolean triggerOffloadOnTopicLoad = false;
-
+ @Getter
+ @Setter
+ private String storageClassName;
@Getter
@Setter
private String shadowSourceName;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 9bbc2857863..737bc69bf24 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -24,6 +24,8 @@ import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.EventLoopGroup;
import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -39,16 +41,18 @@ import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.configuration.Configuration;
import
org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
+import org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
-
private static final Logger log =
LoggerFactory.getLogger(ManagedLedgerClientFactory.class);
-
+ private static final String DEFAULT_STORAGE_CLASS_NAME = "bookkeeper";
+ private BookkeeperManagedLedgerStorageClass defaultStorageClass;
private ManagedLedgerFactory managedLedgerFactory;
private BookKeeper defaultBkClient;
private final AsyncCache<EnsemblePlacementPolicyConfig, BookKeeper>
@@ -119,20 +123,50 @@ public class ManagedLedgerClientFactory implements
ManagedLedgerStorage {
defaultBkClient.close();
throw e;
}
+
+ defaultStorageClass = new BookkeeperManagedLedgerStorageClass() {
+ @Override
+ public String getName() {
+ return DEFAULT_STORAGE_CLASS_NAME;
+ }
+
+ @Override
+ public ManagedLedgerFactory getManagedLedgerFactory() {
+ return managedLedgerFactory;
+ }
+
+ @Override
+ public StatsProvider getStatsProvider() {
+ return statsProvider;
+ }
+
+ @Override
+ public BookKeeper getBookKeeperClient() {
+ return defaultBkClient;
+ }
+ };
}
- public ManagedLedgerFactory getManagedLedgerFactory() {
- return managedLedgerFactory;
+ @Override
+ public Collection<ManagedLedgerStorageClass> getStorageClasses() {
+ return List.of(getDefaultStorageClass());
}
- public BookKeeper getBookKeeperClient() {
- return defaultBkClient;
+ @Override
+ public Optional<ManagedLedgerStorageClass>
getManagedLedgerStorageClass(String name) {
+ if (name == null || DEFAULT_STORAGE_CLASS_NAME.equals(name)) {
+ return Optional.of(getDefaultStorageClass());
+ } else {
+ return Optional.empty();
+ }
}
- public StatsProvider getStatsProvider() {
- return statsProvider;
+ @Override
+ public ManagedLedgerStorageClass getDefaultStorageClass() {
+ return defaultStorageClass;
}
+
@VisibleForTesting
public Map<EnsemblePlacementPolicyConfig, BookKeeper>
getBkEnsemblePolicyToBookKeeperMap() {
return bkEnsemblePolicyToBkClientMap.synchronous().asMap();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 6c768a07897..dcc0e961275 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -124,7 +124,9 @@ import
org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import
org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
+import org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
@@ -210,7 +212,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private static final int DEFAULT_MONOTONIC_CLOCK_GRANULARITY_MILLIS = 8;
private final ServiceConfiguration config;
private NamespaceService nsService = null;
- private ManagedLedgerStorage managedLedgerClientFactory = null;
+ private ManagedLedgerStorage managedLedgerStorage = null;
private LeaderElectionService leaderElectionService = null;
private BrokerService brokerService = null;
private WebService webService = null;
@@ -606,13 +608,13 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
this.brokerService = null;
}
- if (this.managedLedgerClientFactory != null) {
+ if (this.managedLedgerStorage != null) {
try {
- this.managedLedgerClientFactory.close();
+ this.managedLedgerStorage.close();
} catch (Exception e) {
LOG.warn("ManagedLedgerClientFactory closing failed {}",
e.getMessage());
}
- this.managedLedgerClientFactory = null;
+ this.managedLedgerStorage = null;
}
if (bkClientFactory != null) {
@@ -899,7 +901,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
// Now we are ready to start services
this.bkClientFactory = newBookKeeperClientFactory();
- managedLedgerClientFactory = newManagedLedgerClientFactory();
+ managedLedgerStorage = newManagedLedgerStorage();
this.brokerService = newBrokerService(this);
@@ -1122,7 +1124,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
}
@VisibleForTesting
- protected ManagedLedgerStorage newManagedLedgerClientFactory() throws
Exception {
+ protected ManagedLedgerStorage newManagedLedgerStorage() throws Exception {
return ManagedLedgerStorage.create(
config, localMetadataStore,
bkClientFactory, ioEventLoopGroup,
openTelemetry.getOpenTelemetryService().getOpenTelemetry()
@@ -1348,7 +1350,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
long resourceQuotaUpdateInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
loadSheddingTask = new LoadSheddingTask(loadManager,
loadManagerExecutor,
- config, getManagedLedgerFactory());
+ config, getDefaultManagedLedgerFactory());
loadSheddingTask.start();
loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadResourceQuotaUpdaterTask(loadManager),
resourceQuotaUpdateInterval,
@@ -1535,11 +1537,17 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
}
public BookKeeper getBookKeeperClient() {
- return getManagedLedgerClientFactory().getBookKeeperClient();
+ ManagedLedgerStorageClass defaultStorageClass =
getManagedLedgerStorage().getDefaultStorageClass();
+ if (defaultStorageClass instanceof BookkeeperManagedLedgerStorageClass
bkStorageClass) {
+ return bkStorageClass.getBookKeeperClient();
+ } else {
+ // TODO: Refactor code to support other than default bookkeeper
based storage class
+ throw new UnsupportedOperationException("BookKeeper client is not
available");
+ }
}
- public ManagedLedgerFactory getManagedLedgerFactory() {
- return getManagedLedgerClientFactory().getManagedLedgerFactory();
+ public ManagedLedgerFactory getDefaultManagedLedgerFactory() {
+ return
getManagedLedgerStorage().getDefaultStorageClass().getManagedLedgerFactory();
}
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index c80580b02f1..bd19a8e8602 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -227,7 +227,9 @@ public class TransactionMetadataStoreService {
.setBatchedWriteMaxDelayInMillis(serviceConfiguration.getTransactionLogBatchedWriteMaxDelayInMillis());
return
pulsarService.getBrokerService().getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(
- v -> transactionMetadataStoreProvider.openStore(tcId,
pulsarService.getManagedLedgerFactory(), v,
+ v -> transactionMetadataStoreProvider.openStore(tcId,
+
pulsarService.getManagedLedgerStorage().getManagedLedgerStorageClass(v.getStorageClassName())
+ .get().getManagedLedgerFactory(), v,
timeoutTracker, recoverTracker,
pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator(),
txnLogBufferedWriterConfig,
brokerClientSharedTimer));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4d26fe2a4c3..18c80d6bef4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2540,7 +2540,7 @@ public abstract class NamespacesBase extends
AdminResource {
String localClusterName = pulsar().getConfiguration().getClusterName();
OffloaderObjectsScannerUtils.scanOffloadedLedgers(managedLedgerOffloader,
- localClusterName, pulsar().getManagedLedgerFactory(), sink);
+ localClusterName, pulsar().getDefaultManagedLedgerFactory(),
sink);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 8860c9bb06d..6070093cc35 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1405,19 +1405,27 @@ public class PersistentTopicsBase extends AdminResource
{
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)
.thenAccept(__ -> {
String managedLedger =
topicName.getPersistenceNamingEncoding();
- pulsar().getManagedLedgerFactory()
- .asyncGetManagedLedgerInfo(managedLedger, new
ManagedLedgerInfoCallback() {
- @Override
- public void getInfoComplete(ManagedLedgerInfo info,
Object ctx) {
- asyncResponse.resume((StreamingOutput) output -> {
- objectWriter().writeValue(output, info);
+
pulsar().getBrokerService().getManagedLedgerFactoryForTopic(topicName)
+ .thenAccept(managedLedgerFactory -> {
+
managedLedgerFactory.asyncGetManagedLedgerInfo(managedLedger,
+ new ManagedLedgerInfoCallback() {
+ @Override
+ public void
getInfoComplete(ManagedLedgerInfo info, Object ctx) {
+
asyncResponse.resume((StreamingOutput) output -> {
+
objectWriter().writeValue(output, info);
+ });
+ }
+
+ @Override
+ public void
getInfoFailed(ManagedLedgerException exception, Object ctx) {
+
asyncResponse.resume(exception);
+ }
+ }, null);
+ })
+ .exceptionally(ex -> {
+
resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
});
- }
- @Override
- public void getInfoFailed(ManagedLedgerException
exception, Object ctx) {
- asyncResponse.resume(exception);
- }
- }, null);
}).exceptionally(ex -> {
log.error("[{}] Failed to get managed info for {}",
clientAppId(), topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -3174,7 +3182,9 @@ public class PersistentTopicsBase extends AdminResource {
try {
PersistentOfflineTopicStats
estimateOfflineTopicStats =
offlineTopicBacklog.estimateUnloadedTopicBacklog(
-
pulsar().getManagedLedgerFactory(),
+ pulsar().getBrokerService()
+
.getManagedLedgerFactoryForTopic(topicName,
+
config.getStorageClassName()),
topicName);
pulsar().getBrokerService()
.cacheOfflineTopicStats(topicName,
estimateOfflineTopicStats);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c240c758dcd..ed0cdf18b47 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -64,6 +64,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
@@ -136,6 +137,8 @@ import
org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
import org.apache.pulsar.broker.validator.BindAddressValidator;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
@@ -216,7 +219,7 @@ public class BrokerService implements Closeable {
.register();
private final PulsarService pulsar;
- private final ManagedLedgerFactory managedLedgerFactory;
+ private final ManagedLedgerStorage managedLedgerStorage;
private final Map<String, CompletableFuture<Optional<Topic>>> topics = new
ConcurrentHashMap<>();
@@ -335,7 +338,7 @@ public class BrokerService implements Closeable {
this.brokerPublishRateLimiter = new
PublishRateLimiterImpl(pulsar.getMonotonicSnapshotClock());
this.preciseTopicPublishRateLimitingEnable =
pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
- this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
+ this.managedLedgerStorage = pulsar.getManagedLedgerStorage();
this.keepAliveIntervalSeconds =
pulsar.getConfiguration().getKeepAliveIntervalSeconds();
this.pendingTopicLoadingQueue = Queues.newConcurrentLinkedQueue();
this.pulsarStats = new PulsarStats(pulsar);
@@ -1241,23 +1244,51 @@ public class BrokerService implements Closeable {
return;
}
CompletableFuture<ManagedLedgerConfig> mlConfigFuture =
getManagedLedgerConfig(topicName);
- managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(),
- mlConfigFuture, new DeleteLedgerCallback() {
- @Override
- public void deleteLedgerComplete(Object ctx) {
- future.complete(null);
- }
+ mlConfigFuture.thenAccept(config -> {
+ getManagedLedgerFactoryForTopic(topicName,
config.getStorageClassName())
+ .asyncDelete(tn.getPersistenceNamingEncoding(),
+ mlConfigFuture, new DeleteLedgerCallback() {
+ @Override
+ public void deleteLedgerComplete(Object
ctx) {
+ future.complete(null);
+ }
- @Override
- public void deleteLedgerFailed(ManagedLedgerException
exception, Object ctx) {
- future.completeExceptionally(exception);
- }
- }, null);
+ @Override
+ public void
deleteLedgerFailed(ManagedLedgerException exception,
+ Object ctx)
{
+
future.completeExceptionally(exception);
+ }
+ }, null);
+ }).exceptionally(ex1 -> {
+ log.error("Failed to get managed ledger config for topic {}",
topic, ex1);
+ future.completeExceptionally(ex1);
+ return null;
+ });
});
return future;
}
+ public CompletableFuture<ManagedLedgerFactory>
getManagedLedgerFactoryForTopic(TopicName topicName) {
+ return getManagedLedgerConfig(topicName)
+ .thenApply(config -> {
+ String storageClassName = config.getStorageClassName();
+ return getManagedLedgerFactoryForTopic(topicName,
storageClassName);
+ });
+ }
+
+ public ManagedLedgerFactory getManagedLedgerFactoryForTopic(TopicName
topicName, String storageClassName) {
+ Optional<ManagedLedgerStorageClass> managedLedgerStorageClass =
+
managedLedgerStorage.getManagedLedgerStorageClass(storageClassName);
+ if (!managedLedgerStorageClass.isPresent()) {
+ throw new CompletionException(new ManagedLedgerException(
+ "ManagedLedgerStorageClass " + storageClassName + " not
found for topic " + topicName));
+ }
+ return managedLedgerStorageClass
+ .get()
+ .getManagedLedgerFactory();
+ }
+
public void deleteTopicAuthenticationWithRetry(String topic,
CompletableFuture<Void> future, int count) {
if (count == 0) {
log.error("The number of retries has exhausted for topic {}",
topic);
@@ -1624,14 +1655,17 @@ public class BrokerService implements Closeable {
@VisibleForTesting
protected CompletableFuture<Map<String, String>>
fetchTopicPropertiesAsync(TopicName topicName) {
if (!topicName.isPartitioned()) {
- return
managedLedgerFactory.getManagedLedgerPropertiesAsync(topicName.getPersistenceNamingEncoding());
+ return getManagedLedgerFactoryForTopic(topicName).thenCompose(
+ managedLedgerFactory ->
managedLedgerFactory.getManagedLedgerPropertiesAsync(
+ topicName.getPersistenceNamingEncoding()));
} else {
TopicName partitionedTopicName =
TopicName.get(topicName.getPartitionedTopicName());
return fetchPartitionedTopicMetadataAsync(partitionedTopicName)
.thenCompose(metadata -> {
if (metadata.partitions ==
PartitionedTopicMetadata.NON_PARTITIONED) {
- return
managedLedgerFactory.getManagedLedgerPropertiesAsync(
- topicName.getPersistenceNamingEncoding());
+ return
getManagedLedgerFactoryForTopic(topicName).thenCompose(
+ managedLedgerFactory ->
managedLedgerFactory.getManagedLedgerPropertiesAsync(
+
topicName.getPersistenceNamingEncoding()));
} else {
// Check if the partitioned topic is a ShadowTopic
if (MapUtils.getString(metadata.properties,
PROPERTY_SOURCE_TOPIC_KEY) != null) {
@@ -1756,6 +1790,8 @@ public class BrokerService implements Closeable {
topicEventsDispatcher.notifyOnCompletion(loadFuture, topic,
TopicEvent.LOAD);
// Once we have the configuration, we can proceed with the async
open operation
+ ManagedLedgerFactory managedLedgerFactory =
+ getManagedLedgerFactoryForTopic(topicName,
managedLedgerConfig.getStorageClassName());
managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(),
managedLedgerConfig,
new OpenLedgerCallback() {
@Override
@@ -1918,6 +1954,7 @@ public class BrokerService implements Closeable {
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+
managedLedgerConfig.setStorageClassName(persistencePolicies.getManagedLedgerStorageClassName());
if (serviceConfig.isStrictBookieAffinityEnabled()) {
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
@@ -2745,25 +2782,29 @@ public class BrokerService implements Closeable {
});
});
+
+ ManagedLedgerFactory defaultManagedLedgerFactory =
+
managedLedgerStorage.getDefaultStorageClass().getManagedLedgerFactory();
+
// add listener to notify broker managedLedgerCacheSizeMB dynamic
config
registerConfigurationListener("managedLedgerCacheSizeMB",
(managedLedgerCacheSizeMB) -> {
- managedLedgerFactory.getEntryCacheManager()
+ defaultManagedLedgerFactory.getEntryCacheManager()
.updateCacheSizeAndThreshold(((int)
managedLedgerCacheSizeMB) * 1024L * 1024L);
});
// add listener to notify broker managedLedgerCacheEvictionWatermark
dynamic config
registerConfigurationListener(
"managedLedgerCacheEvictionWatermark",
(cacheEvictionWatermark) -> {
- managedLedgerFactory.getEntryCacheManager()
- .updateCacheEvictionWatermark((double)
cacheEvictionWatermark);
- });
+ defaultManagedLedgerFactory.getEntryCacheManager()
+ .updateCacheEvictionWatermark((double)
cacheEvictionWatermark);
+ });
// add listener to notify broker
managedLedgerCacheEvictionTimeThresholdMillis dynamic config
registerConfigurationListener(
"managedLedgerCacheEvictionTimeThresholdMillis",
(cacheEvictionTimeThresholdMills) -> {
- managedLedgerFactory.updateCacheEvictionTimeThreshold(MILLISECONDS
- .toNanos((long) cacheEvictionTimeThresholdMills));
- });
+
defaultManagedLedgerFactory.updateCacheEvictionTimeThreshold(MILLISECONDS
+ .toNanos((long) cacheEvictionTimeThresholdMills));
+ });
// add listener to update message-dispatch-rate in msg for topic
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index aedd68d416f..37b431e8339 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2902,7 +2902,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.debug("[{}] handleEndTxnOnPartition txnId: [{}], txnAction:
[{}]", topic,
txnID, txnAction);
}
- CompletableFuture<Optional<Topic>> topicFuture =
service.getTopicIfExists(TopicName.get(topic).toString());
+ TopicName topicName = TopicName.get(topic);
+ CompletableFuture<Optional<Topic>> topicFuture =
service.getTopicIfExists(topicName.toString());
topicFuture.thenAcceptAsync(optionalTopic -> {
if (optionalTopic.isPresent()) {
// we only accept superuser because this endpoint is reserved
for tc to broker communication
@@ -2928,24 +2929,29 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
txnID.getLeastSigBits(),
txnID.getMostSigBits()));
});
} else {
- getBrokerService().getManagedLedgerFactory()
-
.asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
- .thenAccept((b) -> {
- if (b) {
- log.error("handleEndTxnOnPartition fail ! The
topic {} does not exist in broker, "
- + "txnId: [{}], txnAction:
[{}]", topic,
- txnID, TxnAction.valueOf(txnAction));
-
writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
- ServerError.ServiceNotReady,
- "The topic " + topic + " does not
exist in broker.",
- txnID.getLeastSigBits(),
txnID.getMostSigBits()));
- } else {
- log.warn("handleEndTxnOnPartition fail ! The
topic {} has not been created, "
- + "txnId: [{}], txnAction:
[{}]",
- topic, txnID,
TxnAction.valueOf(txnAction));
-
writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
- txnID.getLeastSigBits(),
txnID.getMostSigBits()));
- }
+ getBrokerService().getManagedLedgerFactoryForTopic(topicName)
+ .thenCompose(managedLedgerFactory -> {
+ return
managedLedgerFactory.asyncExists(topicName.getPersistenceNamingEncoding())
+ .thenAccept((b) -> {
+ if (b) {
+ log.error(
+ "handleEndTxnOnPartition
fail ! The topic {} does not exist in "
+ + "broker, "
+ + "txnId: [{}],
txnAction: [{}]", topic,
+ txnID,
TxnAction.valueOf(txnAction));
+
writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
+
ServerError.ServiceNotReady,
+ "The topic " + topic + "
does not exist in broker.",
+ txnID.getLeastSigBits(),
txnID.getMostSigBits()));
+ } else {
+ log.warn(
+ "handleEndTxnOnPartition
fail ! The topic {} has not been created, "
+ + "txnId: [{}],
txnAction: [{}]",
+ topic, txnID,
TxnAction.valueOf(txnAction));
+
writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
+ txnID.getLeastSigBits(),
txnID.getMostSigBits()));
+ }
+ });
}).exceptionally(e -> {
log.error("handleEndTxnOnPartition fail ! topic
{}, "
+ "txnId: [{}], txnAction: [{}]",
topic, txnID,
@@ -2954,7 +2960,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
requestId, ServerError.ServiceNotReady,
e.getMessage(), txnID.getLeastSigBits(),
txnID.getMostSigBits()));
return null;
- });
+
+ });
}
}, ctx.executor()).exceptionally(e -> {
log.error("handleEndTxnOnPartition fail ! topic {}, "
@@ -2984,7 +2991,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
new TxnID(txnidMostBits, txnidLeastBits), txnAction);
}
- CompletableFuture<Optional<Topic>> topicFuture =
service.getTopicIfExists(TopicName.get(topic).toString());
+ TopicName topicName = TopicName.get(topic);
+ CompletableFuture<Optional<Topic>> topicFuture =
service.getTopicIfExists(topicName.toString());
topicFuture.thenAcceptAsync(optionalTopic -> {
if (optionalTopic.isPresent()) {
Subscription subscription =
optionalTopic.get().getSubscription(subName);
@@ -3019,24 +3027,31 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits,
txnidMostBits));
});
} else {
- getBrokerService().getManagedLedgerFactory()
-
.asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
- .thenAccept((b) -> {
- if (b) {
- log.error("handleEndTxnOnSubscription fail!
The topic {} does not exist in broker, "
- + "subscription: {}, txnId:
[{}], txnAction: [{}]", topic, subName,
- txnID, TxnAction.valueOf(txnAction));
-
writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
- requestId, txnID.getLeastSigBits(),
txnID.getMostSigBits(),
- ServerError.ServiceNotReady,
- "The topic " + topic + " does not
exist in broker."));
- } else {
- log.warn("handleEndTxnOnSubscription fail !
The topic {} has not been created, "
- + "subscription: {} txnId:
[{}], txnAction: [{}]",
- topic, subName, txnID,
TxnAction.valueOf(txnAction));
-
writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId,
- txnID.getLeastSigBits(),
txnID.getMostSigBits()));
- }
+ getBrokerService().getManagedLedgerFactoryForTopic(topicName)
+ .thenCompose(managedLedgerFactory -> {
+ return
managedLedgerFactory.asyncExists(topicName.getPersistenceNamingEncoding())
+ .thenAccept((b) -> {
+ if (b) {
+ log.error(
+
"handleEndTxnOnSubscription fail! The topic {} does not exist in "
+ + "broker, "
+ + "subscription:
{}, txnId: [{}], txnAction: [{}]", topic,
+ subName,
+ txnID,
TxnAction.valueOf(txnAction));
+
writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+ requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits(),
+
ServerError.ServiceNotReady,
+ "The topic " + topic + "
does not exist in broker."));
+ } else {
+ log.warn(
+
"handleEndTxnOnSubscription fail ! The topic {} has not been "
+ + "created, "
+ + "subscription:
{} txnId: [{}], txnAction: [{}]",
+ topic, subName, txnID,
TxnAction.valueOf(txnAction));
+
writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId,
+ txnID.getLeastSigBits(),
txnID.getMostSigBits()));
+ }
+ });
}).exceptionally(e -> {
log.error("handleEndTxnOnSubscription fail ! topic
{}, subscription: {}"
+ "txnId: [{}], txnAction: [{}]",
topic, subName,
@@ -3045,7 +3060,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
requestId, txnID.getLeastSigBits(),
txnID.getMostSigBits(),
ServerError.ServiceNotReady,
e.getMessage()));
return null;
- });
+ });
}
}, ctx.executor()).exceptionally(e -> {
log.error("handleEndTxnOnSubscription fail ! topic: {},
subscription: {}"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f8581cfc799..3cce175660e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -74,12 +74,14 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionBound;
import org.apache.bookkeeper.mledger.PositionFactory;
@@ -1232,26 +1234,34 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
.getTransactionPendingAckStoreSuffix(topic,
Codec.encode(subscriptionName)));
if
(brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
-
getBrokerService().getManagedLedgerFactory().asyncDelete(tn.getPersistenceNamingEncoding(),
- getBrokerService().getManagedLedgerConfig(tn),
- new AsyncCallbacks.DeleteLedgerCallback() {
- @Override
- public void deleteLedgerComplete(Object ctx) {
-
asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture);
- }
-
- @Override
- public void deleteLedgerFailed(ManagedLedgerException
exception, Object ctx) {
- if (exception instanceof
MetadataNotFoundException) {
+ CompletableFuture<ManagedLedgerConfig> managedLedgerConfig =
getBrokerService().getManagedLedgerConfig(tn);
+ managedLedgerConfig.thenAccept(config -> {
+ ManagedLedgerFactory managedLedgerFactory =
+ getBrokerService().getManagedLedgerFactoryForTopic(tn,
config.getStorageClassName());
+
managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(),
+ managedLedgerConfig,
+ new AsyncCallbacks.DeleteLedgerCallback() {
+ @Override
+ public void deleteLedgerComplete(Object ctx) {
asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture);
- return;
}
- unsubscribeFuture.completeExceptionally(exception);
- log.error("[{}][{}] Error deleting subscription
pending ack store",
- topic, subscriptionName, exception);
- }
- }, null);
+ @Override
+ public void
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+ if (exception instanceof
MetadataNotFoundException) {
+
asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture);
+ return;
+ }
+
+
unsubscribeFuture.completeExceptionally(exception);
+ log.error("[{}][{}] Error deleting
subscription pending ack store",
+ topic, subscriptionName, exception);
+ }
+ }, null);
+ }).exceptionally(ex -> {
+ unsubscribeFuture.completeExceptionally(ex);
+ return null;
+ });
} else {
asyncDeleteCursorWithClearDelayedMessage(subscriptionName,
unsubscribeFuture);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
index 489d37dd0a3..114f962cb81 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
@@ -131,7 +131,7 @@ abstract class AbstractMetrics {
* @return
*/
protected ManagedLedgerFactoryMXBean getManagedLedgerCacheStats() {
- return pulsar.getManagedLedgerFactory().getCacheStats();
+ return pulsar.getDefaultManagedLedgerFactory().getCacheStats();
}
/**
@@ -140,7 +140,7 @@ abstract class AbstractMetrics {
* @return
*/
protected Map<String, ManagedLedger> getManagedLedgers() {
- return pulsar.getManagedLedgerFactory().getManagedLedgers();
+ return pulsar.getDefaultManagedLedgerFactory().getManagedLedgers();
}
protected String getLocalClusterName() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
index 52c69265c2f..925fcb28b7a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
@@ -52,8 +52,7 @@ public class ManagedLedgerMetrics extends AbstractMetrics {
this.metricsCollection = new ArrayList<>();
this.ledgersByDimensionMap = new HashMap<>();
this.tempAggregatedMetricsMap = new HashMap<>();
- this.statsPeriodSeconds = pulsar.getManagedLedgerFactory()
- .getConfig().getStatsPeriodSeconds();
+ this.statsPeriodSeconds =
pulsar.getDefaultManagedLedgerFactory().getConfig().getStatsPeriodSeconds();
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 6b4d08c359d..8c3cb39c925 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -55,6 +55,8 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
+import org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
@@ -485,12 +487,14 @@ public class PrometheusMetricsGenerator implements
AutoCloseable {
}
private static void generateManagedLedgerBookieClientMetrics(PulsarService
pulsar, SimpleTextOutputStream stream) {
- StatsProvider statsProvider =
pulsar.getManagedLedgerClientFactory().getStatsProvider();
- if (statsProvider instanceof NullStatsProvider) {
- return;
- }
+ ManagedLedgerStorageClass defaultStorageClass =
pulsar.getManagedLedgerStorage().getDefaultStorageClass();
+ if (defaultStorageClass instanceof BookkeeperManagedLedgerStorageClass
bkStorageClass) {
+ StatsProvider statsProvider = bkStorageClass.getStatsProvider();
+ if (statsProvider instanceof NullStatsProvider) {
+ return;
+ }
- try (Writer writer = new OutputStreamWriter(new
BufferedOutputStream(new OutputStream() {
+ try (Writer writer = new OutputStreamWriter(new
BufferedOutputStream(new OutputStream() {
@Override
public void write(int b) throws IOException {
stream.writeByte(b);
@@ -501,9 +505,10 @@ public class PrometheusMetricsGenerator implements
AutoCloseable {
stream.write(b, off, len);
}
}), StandardCharsets.UTF_8)) {
- statsProvider.writeAllMetrics(writer);
- } catch (IOException e) {
- log.error("Failed to write managed ledger bookie client metrics",
e);
+ statsProvider.writeAllMetrics(writer);
+ } catch (IOException e) {
+ log.error("Failed to write managed ledger bookie client
metrics", e);
+ }
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/BookkeeperManagedLedgerStorageClass.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/BookkeeperManagedLedgerStorageClass.java
new file mode 100644
index 00000000000..1f05cde72a5
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/BookkeeperManagedLedgerStorageClass.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.storage;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.stats.StatsProvider;
+
+/**
+ * ManagedLedgerStorageClass represents a configured instance of
ManagedLedgerFactory for managed ledgers.
+ * This instance is backed by a bookkeeper storage.
+ */
+public interface BookkeeperManagedLedgerStorageClass extends
ManagedLedgerStorageClass {
+ /**
+ * Return the bookkeeper client instance used by this instance.
+ *
+ * @return the bookkeeper client.
+ */
+ BookKeeper getBookKeeperClient();
+
+ /**
+ * Return the stats provider to expose the stats of the storage
implementation.
+ *
+ * @return the stats provider.
+ */
+ StatsProvider getStatsProvider();
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
index 944d2badf75..720798123e7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
@@ -21,9 +21,8 @@ package org.apache.pulsar.broker.storage;
import io.netty.channel.EventLoopGroup;
import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
-import org.apache.bookkeeper.stats.StatsProvider;
+import java.util.Collection;
+import java.util.Optional;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.classification.InterfaceAudience.Private;
@@ -33,6 +32,12 @@ import
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
/**
* Storage to access {@link org.apache.bookkeeper.mledger.ManagedLedger}s.
+ * <p>
+ * The interface provides the abstraction to access the storage layer for
managed ledgers.
+ * The interface supports multiple storage classes, each with its own
configuration. The default
+ * implementation supports a single instance of {@link
BookkeeperManagedLedgerStorageClass}.
+ * Implementations can provide multiple storage classes. The default storage
class is used
+ * for topics unless it is overridden by the persistency policy at topic or
namespace level.
*/
@Private
@Unstable
@@ -52,25 +57,25 @@ public interface ManagedLedgerStorage extends AutoCloseable
{
OpenTelemetry openTelemetry) throws Exception;
/**
- * Return the factory to create {@link ManagedLedgerFactory}.
- *
- * @return the factory to create {@link ManagedLedgerFactory}.
+ * Get all configured storage class instances.
+ * @return all configured storage class instances
*/
- ManagedLedgerFactory getManagedLedgerFactory();
+ Collection<ManagedLedgerStorageClass> getStorageClasses();
/**
- * Return the stats provider to expose the stats of the storage
implementation.
- *
- * @return the stats provider.
+ * Get the default storage class.
+ * @return default storage class
*/
- StatsProvider getStatsProvider();
+ default ManagedLedgerStorageClass getDefaultStorageClass() {
+ return getStorageClasses().stream().findFirst().get();
+ }
/**
- * Return the default bookkeeper client.
- *
- * @return the default bookkeeper client.
+ * Lookup a storage class by name.
+ * @param name storage class name
+ * @return storage class instance, or empty if not found
*/
- BookKeeper getBookKeeperClient();
+ Optional<ManagedLedgerStorageClass> getManagedLedgerStorageClass(String
name);
/**
* Close the storage.
@@ -97,5 +102,4 @@ public interface ManagedLedgerStorage extends AutoCloseable {
storage.initialize(conf, metadataStore, bkProvider, eventLoopGroup,
openTelemetry);
return storage;
}
-
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorageClass.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorageClass.java
new file mode 100644
index 00000000000..8cbe5c3b411
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorageClass.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.storage;
+
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * ManagedLedgerStorageClass represents a configured instance of
ManagedLedgerFactory for managed ledgers.
+ * The {@link ManagedLedgerStorage} can hold multiple storage classes, and
each storage class can have its own
+ * configuration.
+ */
[email protected]
[email protected]
+public interface ManagedLedgerStorageClass {
+ /**
+ * Return the name of the storage class.
+ *
+ * @return the name of the storage class.
+ */
+ String getName();
+ /**
+ * Return the factory to create {@link ManagedLedgerFactory}.
+ *
+ * @return the factory to create {@link ManagedLedgerFactory}.
+ */
+ ManagedLedgerFactory getManagedLedgerFactory();
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index 88a3968b7b4..f2ff5d519d8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -311,8 +311,13 @@ public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcess
SnapshotSegmentAbortedTxnProcessorImpl.this.topic.getName());
}
};
-
topic.getBrokerService().getPulsar().getManagedLedgerFactory().asyncOpenReadOnlyManagedLedger(
- topicName.getPersistenceNamingEncoding(), callback,
topic.getManagedLedger().getConfig(), null);
+
topic.getBrokerService().getManagedLedgerFactoryForTopic(topicName).thenAccept(managedLedgerFactory
->
+
managedLedgerFactory.asyncOpenReadOnlyManagedLedger(topicName.getPersistenceNamingEncoding(),
+ callback,
topic.getManagedLedger().getConfig(), null))
+ .exceptionally(e -> {
+ future.completeExceptionally(e);
+ return null;
+ });
return wait(future, "open read only ml for " + topicName);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
index 6fc61d423ce..12f761bb4df 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
@@ -25,9 +25,11 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import
org.apache.pulsar.broker.transaction.exception.pendingack.TransactionPendingAckException;
@@ -84,7 +86,8 @@ public class MLPendingAckStoreProvider implements
TransactionPendingAckStoreProv
}
PersistentTopic originPersistentTopic = (PersistentTopic)
subscription.getTopic();
- PulsarService pulsarService =
originPersistentTopic.getBrokerService().getPulsar();
+ BrokerService brokerService = originPersistentTopic.getBrokerService();
+ PulsarService pulsarService = brokerService.getPulsar();
final Timer brokerClientSharedTimer =
pulsarService.getBrokerClientSharedTimer();
@@ -103,93 +106,127 @@ public class MLPendingAckStoreProvider implements
TransactionPendingAckStoreProv
String pendingAckTopicName = MLPendingAckStore
.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(),
subscription.getName());
- originPersistentTopic.getBrokerService().getManagedLedgerFactory()
- .asyncExists(TopicName.get(pendingAckTopicName)
- .getPersistenceNamingEncoding()).thenAccept(exist -> {
- TopicName topicName;
- if (exist) {
- topicName = TopicName.get(pendingAckTopicName);
- } else {
- topicName = TopicName.get(originPersistentTopic.getName());
- }
- originPersistentTopic.getBrokerService()
- .getManagedLedgerConfig(topicName).thenAccept(config -> {
- config.setCreateIfMissing(true);
-
originPersistentTopic.getBrokerService().getManagedLedgerFactory()
-
.asyncOpen(TopicName.get(pendingAckTopicName).getPersistenceNamingEncoding(),
- config, new
AsyncCallbacks.OpenLedgerCallback() {
- @Override
- public void
openLedgerComplete(ManagedLedger ledger, Object ctx) {
- ledger.asyncOpenCursor(
-
MLPendingAckStore.getTransactionPendingAckStoreCursorName(),
- InitialPosition.Earliest, new
AsyncCallbacks.OpenCursorCallback() {
- @Override
- public void
openCursorComplete(ManagedCursor cursor, Object ctx) {
-
pendingAckStoreFuture.complete(new MLPendingAckStore(ledger,
- cursor,
-
subscription.getCursor(),
-
originPersistentTopic
-
.getBrokerService()
-
.getPulsar()
-
.getConfiguration()
-
.getTransactionPendingAckLogIndexMinLag(),
-
txnLogBufferedWriterConfig,
-
brokerClientSharedTimer, bufferedWriterMetrics,
-
originPersistentTopic
-
.getBrokerService()
-
.getPulsar()
-
.getOrderedExecutor()
-
.chooseThread()));
- if
(log.isDebugEnabled()) {
- log.debug("{},{}
open MLPendingAckStore cursor success",
-
originPersistentTopic.getName(),
-
subscription.getName());
- }
- }
-
- @Override
- public void
openCursorFailed(ManagedLedgerException exception,
-
Object ctx) {
- log.error("{},{} open
MLPendingAckStore cursor failed."
- ,
originPersistentTopic.getName(),
-
subscription.getName(), exception);
-
pendingAckStoreFuture.completeExceptionally(exception);
- }
- }, null);
- }
-
- @Override
- public void
openLedgerFailed(ManagedLedgerException exception, Object ctx) {
- log.error("{}, {} open
MLPendingAckStore managedLedger failed."
- ,
originPersistentTopic.getName(), subscription.getName(), exception);
-
pendingAckStoreFuture.completeExceptionally(exception);
- }
- }, () ->
CompletableFuture.completedFuture(true), null);
+ TopicName pendingAckTopicNameObject =
TopicName.get(pendingAckTopicName);
+
brokerService.getManagedLedgerFactoryForTopic(pendingAckTopicNameObject)
+ .thenAccept(managedLedgerFactory -> {
+ managedLedgerFactory.asyncExists(pendingAckTopicNameObject
+ .getPersistenceNamingEncoding()).thenAccept(exist
-> {
+ TopicName topicName;
+ if (exist) {
+ topicName = pendingAckTopicNameObject;
+ } else {
+ topicName =
TopicName.get(originPersistentTopic.getName());
+ }
+
brokerService.getManagedLedgerConfig(topicName).thenAccept(config -> {
+ internalNewPendingAckStore(subscription, config,
brokerService, topicName,
+ pendingAckTopicNameObject,
pendingAckStoreFuture, txnLogBufferedWriterConfig,
+ brokerClientSharedTimer,
originPersistentTopic);
+ }).exceptionally(e -> {
+ Throwable t =
FutureUtil.unwrapCompletionException(e);
+ log.error("[{}] [{}] Failed to get managedLedger
config when init pending ack "
+ + "store!",
+ originPersistentTopic, subscription, t);
+ pendingAckStoreFuture.completeExceptionally(t);
+ return null;
+
+ });
}).exceptionally(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
- log.error("[{}] [{}] Failed to get managedLedger
config when init pending ack store!",
+ log.error("[{}] [{}] Failed to check the pending ack
topic exist when init pending ack store!",
originPersistentTopic, subscription, t);
pendingAckStoreFuture.completeExceptionally(t);
return null;
-
});
}).exceptionally(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
- log.error("[{}] [{}] Failed to check the pending ack topic
exist when init pending ack store!",
- originPersistentTopic, subscription, t);
+ log.error("[{}] [{}] Failed to get managedLedger config
when init pending ack store!",
+ pendingAckTopicNameObject, subscription, t);
pendingAckStoreFuture.completeExceptionally(t);
return null;
});
return pendingAckStoreFuture;
}
+ private static void internalNewPendingAckStore(PersistentSubscription
subscription, ManagedLedgerConfig config,
+ BrokerService
brokerService, TopicName topicName,
+ TopicName
pendingAckTopicNameObject,
+
CompletableFuture<PendingAckStore> pendingAckStoreFuture,
+ TxnLogBufferedWriterConfig
txnLogBufferedWriterConfig,
+ Timer
brokerClientSharedTimer,
+ PersistentTopic
originPersistentTopic) {
+ config.setCreateIfMissing(true);
+ brokerService
+ .getManagedLedgerFactoryForTopic(topicName,
config.getStorageClassName())
+
.asyncOpen(pendingAckTopicNameObject.getPersistenceNamingEncoding(),
+ config, new AsyncCallbacks.OpenLedgerCallback() {
+ @Override
+ public void openLedgerComplete(ManagedLedger
ledger, Object ctx) {
+ ledger.asyncOpenCursor(
+
MLPendingAckStore.getTransactionPendingAckStoreCursorName(),
+ InitialPosition.Earliest,
+ new
AsyncCallbacks.OpenCursorCallback() {
+ @Override
+ public void
openCursorComplete(ManagedCursor cursor,
+
Object ctx) {
+ pendingAckStoreFuture.complete(
+ new
MLPendingAckStore(ledger,
+ cursor,
+
subscription.getCursor(),
+ brokerService
+
.getPulsar()
+
.getConfiguration()
+
.getTransactionPendingAckLogIndexMinLag(),
+
txnLogBufferedWriterConfig,
+
brokerClientSharedTimer,
+
bufferedWriterMetrics,
+ brokerService
+
.getPulsar()
+
.getOrderedExecutor()
+
.chooseThread()));
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "{},{} open
MLPendingAckStore cursor "
+ +
"success",
+
originPersistentTopic.getName(),
+
subscription.getName());
+ }
+ }
+
+ @Override
+ public void openCursorFailed(
+ ManagedLedgerException
exception,
+ Object ctx) {
+ log.error(
+ "{},{} open
MLPendingAckStore cursor "
+ + "failed."
+ ,
originPersistentTopic.getName(),
+
subscription.getName(), exception);
+
pendingAckStoreFuture.completeExceptionally(
+ exception);
+ }
+ }, null);
+ }
+
+ @Override
+ public void
openLedgerFailed(ManagedLedgerException exception,
+ Object ctx) {
+ log.error("{}, {} open MLPendingAckStore
managedLedger failed."
+ , originPersistentTopic.getName(),
subscription.getName(),
+ exception);
+
pendingAckStoreFuture.completeExceptionally(exception);
+ }
+ }, () -> CompletableFuture.completedFuture(true),
null);
+ }
+
@Override
public CompletableFuture<Boolean>
checkInitializedBefore(PersistentSubscription subscription) {
PersistentTopic originPersistentTopic = (PersistentTopic)
subscription.getTopic();
String pendingAckTopicName = MLPendingAckStore
.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(),
subscription.getName());
- return
originPersistentTopic.getBrokerService().getManagedLedgerFactory()
-
.asyncExists(TopicName.get(pendingAckTopicName).getPersistenceNamingEncoding());
+ TopicName topicName = TopicName.get(pendingAckTopicName);
+ return
originPersistentTopic.getBrokerService().getManagedLedgerFactoryForTopic(topicName)
+ .thenCompose(managedLedgerFactory ->
managedLedgerFactory.asyncExists(
+ topicName.getPersistenceNamingEncoding()));
}
private static class MLTxnPendingAckLogBufferedWriterMetrics extends
TxnLogBufferedWriterMetricsStats{
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 1ea29c9d431..9aa2dcc700c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -135,7 +135,7 @@ public class AdminApiOffloadTest extends
MockedPulsarServiceBaseTest {
}
}
- ManagedLedgerInfo info =
pulsar.getManagedLedgerFactory().getManagedLedgerInfo(mlName);
+ ManagedLedgerInfo info =
pulsar.getDefaultManagedLedgerFactory().getManagedLedgerInfo(mlName);
assertEquals(info.ledgers.size(), 2);
assertEquals(admin.topics().offloadStatus(topicName).getStatus(),
LongRunningProcessStatus.Status.NOT_RUN);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 4a1dbface2c..26da4116d09 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -588,24 +588,24 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
// wait config to be updated
Awaitility.await().until(() -> {
- return
pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize() == 1 *
1024L * 1024L
- &&
pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark()
== 0.8
- &&
pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold() ==
TimeUnit.MILLISECONDS
+ return
pulsar.getDefaultManagedLedgerFactory().getEntryCacheManager().getMaxSize() ==
1 * 1024L * 1024L
+ &&
pulsar.getDefaultManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark()
== 0.8
+ &&
pulsar.getDefaultManagedLedgerFactory().getCacheEvictionTimeThreshold() ==
TimeUnit.MILLISECONDS
.toNanos(2000);
});
// verify value is updated
-
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize(),
1 * 1024L * 1024L);
-
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(),
0.8);
-
assertEquals(pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold(),
TimeUnit.MILLISECONDS
+
assertEquals(pulsar.getDefaultManagedLedgerFactory().getEntryCacheManager().getMaxSize(),
1 * 1024L * 1024L);
+
assertEquals(pulsar.getDefaultManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(),
0.8);
+
assertEquals(pulsar.getDefaultManagedLedgerFactory().getCacheEvictionTimeThreshold(),
TimeUnit.MILLISECONDS
.toNanos(2000));
restartBroker();
// verify value again
-
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize(),
1 * 1024L * 1024L);
-
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(),
0.8);
-
assertEquals(pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold(),
TimeUnit.MILLISECONDS
+
assertEquals(pulsar.getDefaultManagedLedgerFactory().getEntryCacheManager().getMaxSize(),
1 * 1024L * 1024L);
+
assertEquals(pulsar.getDefaultManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(),
0.8);
+
assertEquals(pulsar.getDefaultManagedLedgerFactory().getCacheEvictionTimeThreshold(),
TimeUnit.MILLISECONDS
.toNanos(2000));
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 18fd3dd1c8b..aae2f7b8830 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -708,7 +708,7 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
// partitioned topic to more than 10.
final String nonPartitionTopicName2 = "special-topic-partition-10";
final String partitionedTopicName = "special-topic";
- pulsar.getBrokerService().getManagedLedgerFactory()
+ pulsar.getDefaultManagedLedgerFactory()
.open(TopicName.get(nonPartitionTopicName2).getPersistenceNamingEncoding());
doAnswer(invocation -> {
persistentTopics.namespaceName = NamespaceName.get("tenant",
"namespace");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 82892ad353a..68a52c4b4c3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -127,7 +127,7 @@ public class BrokerBkEnsemblesTests extends
BkEnsemblesTestBase {
consumer.close();
producer.close();
pulsar.getBrokerService().removeTopicFromCache(topic);
- ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl)
pulsar.getManagedLedgerFactory();
+ ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl)
pulsar.getDefaultManagedLedgerFactory();
Field field =
ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
@SuppressWarnings("unchecked")
@@ -250,7 +250,7 @@ public class BrokerBkEnsemblesTests extends
BkEnsemblesTestBase {
// clean managed-ledger and recreate topic to clean any data from the
cache
producer.close();
pulsar.getBrokerService().removeTopicFromCache(topic);
- ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl)
pulsar.getManagedLedgerFactory();
+ ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl)
pulsar.getDefaultManagedLedgerFactory();
Field field =
ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
@SuppressWarnings("unchecked")
@@ -399,7 +399,7 @@ public class BrokerBkEnsemblesTests extends
BkEnsemblesTestBase {
@Test
public void testDeleteLedgerFactoryCorruptLedger() throws Exception {
- ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl)
pulsar.getManagedLedgerFactory();
+ ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl)
pulsar.getDefaultManagedLedgerFactory();
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("test");
// bookkeeper client
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index d7272fcffa9..be6f7c91437 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -274,7 +274,7 @@ public class BrokerBookieIsolationTest {
assertAffinityBookies(ledgerManager, ml4.getLedgersInfoAsList(),
isolatedBookies);
ManagedLedgerClientFactory mlFactory =
- (ManagedLedgerClientFactory)
pulsarService.getManagedLedgerClientFactory();
+ (ManagedLedgerClientFactory)
pulsarService.getManagedLedgerStorage();
Map<EnsemblePlacementPolicyConfig, BookKeeper>
bkPlacementPolicyToBkClientMap = mlFactory
.getBkEnsemblePolicyToBookKeeperMap();
@@ -588,7 +588,7 @@ public class BrokerBookieIsolationTest {
assertAffinityBookies(ledgerManager, ml4.getLedgersInfoAsList(),
isolatedBookies);
ManagedLedgerClientFactory mlFactory =
- (ManagedLedgerClientFactory)
pulsarService.getManagedLedgerClientFactory();
+ (ManagedLedgerClientFactory)
pulsarService.getManagedLedgerStorage();
Map<EnsemblePlacementPolicyConfig, BookKeeper>
bkPlacementPolicyToBkClientMap = mlFactory
.getBkEnsemblePolicyToBookKeeperMap();
@@ -751,7 +751,7 @@ public class BrokerBookieIsolationTest {
assertAffinityBookies(ledgerManager, ml3.getLedgersInfoAsList(),
isolatedBookies);
ManagedLedgerClientFactory mlFactory =
- (ManagedLedgerClientFactory)
pulsarService.getManagedLedgerClientFactory();
+ (ManagedLedgerClientFactory)
pulsarService.getManagedLedgerStorage();
Map<EnsemblePlacementPolicyConfig, BookKeeper>
bkPlacementPolicyToBkClientMap = mlFactory
.getBkEnsemblePolicyToBookKeeperMap();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 17209c83c13..e05bb836a3c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -76,8 +76,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader;
+import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -1451,7 +1451,7 @@ public class BrokerServiceTest extends BrokerTestBase {
ledgerField.setAccessible(true);
@SuppressWarnings("unchecked")
ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>
ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>)
ledgerField
- .get(pulsar.getManagedLedgerFactory());
+ .get(pulsar.getDefaultManagedLedgerFactory());
CompletableFuture<ManagedLedgerImpl> future = new
CompletableFuture<>();
future.completeExceptionally(new ManagedLedgerException("ledger
opening failed"));
ledgers.put(namespace + "/persistent/deadLockTestTopic", future);
@@ -1517,8 +1517,7 @@ public class BrokerServiceTest extends BrokerTestBase {
PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
- ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl)
pulsar.getManagedLedgerClientFactory()
- .getManagedLedgerFactory();
+ ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl)
pulsar.getDefaultManagedLedgerFactory();
Field ledgersField =
ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
ledgersField.setAccessible(true);
ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>
ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>)
ledgersField
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 000ea7af915..69f3e2e4d39 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -61,6 +61,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -184,10 +185,11 @@ public class PersistentDispatcherFailoverConsumerTest {
doReturn("mockCursor").when(cursorMock).getName();
// call openLedgerComplete with ledgerMock on ML factory asyncOpen
+ ManagedLedgerFactory managedLedgerFactory =
pulsarTestContext.getDefaultManagedLedgerFactory();
doAnswer(invocationOnMock -> {
((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
- }).when(pulsarTestContext.getManagedLedgerFactory())
+ }).when(managedLedgerFactory)
.asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -196,7 +198,7 @@ public class PersistentDispatcherFailoverConsumerTest {
((OpenLedgerCallback) invocationOnMock.getArguments()[2])
.openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null);
return null;
- }).when(pulsarTestContext.getManagedLedgerFactory())
+ }).when(managedLedgerFactory)
.asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 85e0887465d..f75a3256747 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -92,7 +92,7 @@ public class PersistentTopicConcurrentTest extends
MockedBookKeeperTestCase {
cursorMock = ledger.openCursor("c1");
ledgerMock = ledger;
mlFactoryMock = factory;
- doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+ doReturn(mlFactoryMock).when(pulsar).getDefaultManagedLedgerFactory();
brokerService = spyWithClassAndConstructorArgs(BrokerService.class,
pulsar, eventLoopGroup);
doReturn(brokerService).when(pulsar).getBrokerService();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 36e741f8fa9..2896c13af00 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -596,7 +596,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
pulsarClient.close();
assertNotNull(pulsar.getBrokerService().getTopicReference(topic));
- assertTrue(((ManagedLedgerFactoryImpl)
pulsar.getManagedLedgerFactory()).getManagedLedgers()
+ assertTrue(((ManagedLedgerFactoryImpl)
pulsar.getDefaultManagedLedgerFactory()).getManagedLedgers()
.containsKey(topicName.getPersistenceNamingEncoding()));
admin.namespaces().unload("prop/ns-abc");
@@ -613,7 +613,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
}
// ML should have been closed as well
- assertFalse(((ManagedLedgerFactoryImpl)
pulsar.getManagedLedgerFactory()).getManagedLedgers()
+ assertFalse(((ManagedLedgerFactoryImpl)
pulsar.getDefaultManagedLedgerFactory()).getManagedLedgers()
.containsKey(topicName.getPersistenceNamingEncoding()));
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 81c12df4f39..1e96da737dd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -90,6 +90,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
@@ -169,6 +170,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
private BrokerService brokerService;
private EventLoopGroup eventLoopGroup;
+ private ManagedLedgerFactory managedLedgerFactory;
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
@@ -190,13 +192,14 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
.build();
brokerService = pulsarTestContext.getBrokerService();
+ managedLedgerFactory =
pulsarTestContext.getDefaultManagedLedgerFactory();
doAnswer(invocationOnMock -> CompletableFuture.completedFuture(null))
-
.when(pulsarTestContext.getManagedLedgerFactory()).getManagedLedgerPropertiesAsync(any());
+
.when(managedLedgerFactory).getManagedLedgerPropertiesAsync(any());
doAnswer(invocation -> {
DeleteLedgerCallback deleteLedgerCallback =
invocation.getArgument(1);
deleteLedgerCallback.deleteLedgerComplete(null);
return null;
-
}).when(pulsarTestContext.getManagedLedgerFactory()).asyncDelete(any(), any(),
any());
+ }).when(managedLedgerFactory).asyncDelete(any(), any(), any());
// Mock serviceCnx.
serverCnx =
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class,
pulsarTestContext.getPulsarService());
@@ -247,7 +250,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
doAnswer(invocationOnMock -> {
((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
- }).when(pulsarTestContext.getManagedLedgerFactory())
+ }).when(managedLedgerFactory)
.asyncOpen(anyString(), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class),
any(Supplier.class), any());
@@ -273,7 +276,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
.openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null)).start();
return null;
- }).when(pulsarTestContext.getManagedLedgerFactory())
+ }).when(managedLedgerFactory)
.asyncOpen(anyString(), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class),
any(Supplier.class), any());
@@ -1395,7 +1398,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
doAnswer(invocationOnMock -> {
((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
- }).when(pulsarTestContext.getManagedLedgerFactory())
+ }).when(managedLedgerFactory)
.asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -1404,7 +1407,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
((OpenLedgerCallback) invocationOnMock.getArguments()[2])
.openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null);
return null;
- }).when(pulsarTestContext.getManagedLedgerFactory())
+ }).when(managedLedgerFactory)
.asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
index 3caf4a1f239..bd4a0889c73 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java
@@ -187,14 +187,14 @@ public class ReplicationTxnTest extends
OneWayReplicatorTestBase {
for (int i = 0; i < txnLogPartitions; i++) {
TopicName txnLog = TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX +
i);
- assertNotNull(pulsar1.getManagedLedgerFactory()
+ assertNotNull(pulsar1.getDefaultManagedLedgerFactory()
.getManagedLedgerInfo(txnLog.getPersistenceNamingEncoding()));
assertFalse(broker1.getTopics().containsKey(txnLog.toString()));
}
// __transaction_pending_ack: it only uses ML, will not create topic.
TopicName pendingAck = TopicName.get(
MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic,
subscription));
- assertNotNull(pulsar1.getManagedLedgerFactory()
+ assertNotNull(pulsar1.getDefaultManagedLedgerFactory()
.getManagedLedgerInfo(pendingAck.getPersistenceNamingEncoding()));
assertFalse(broker1.getTopics().containsKey(pendingAck.toString()));
// __transaction_buffer_snapshot.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index aac7a85f477..2420ed58bed 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -1419,8 +1419,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
config1.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
config2.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
- ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl)
pulsar1.getManagedLedgerClientFactory()
- .getManagedLedgerFactory();
+ ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl)
pulsar1.getDefaultManagedLedgerFactory();
Field ledgersField =
ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
ledgersField.setAccessible(true);
ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>
ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>)
ledgersField
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 42b52d901e3..9a85995ab77 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -83,6 +83,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.commons.lang3.mutable.MutableInt;
@@ -202,6 +203,7 @@ public class ServerCnxTest {
private ManagedLedger ledgerMock;
private ManagedCursor cursorMock;
private ConcurrentHashSet<EmbeddedChannel>
channelsStoppedAnswerHealthCheck = new ConcurrentHashSet<>();
+ private ManagedLedgerFactory managedLedgerFactory;
@BeforeMethod(alwaysRun = true)
@@ -218,7 +220,7 @@ public class ServerCnxTest {
.spyByDefault()
.build();
pulsar = pulsarTestContext.getPulsarService();
-
+ managedLedgerFactory =
pulsarTestContext.getDefaultManagedLedgerFactory();
brokerService = pulsarTestContext.getBrokerService();
namespaceService = pulsar.getNamespaceService();
@@ -2043,7 +2045,7 @@ public class ServerCnxTest {
() -> ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
null));
return null;
- }).when(pulsarTestContext.getManagedLedgerFactory())
+ }).when(managedLedgerFactory)
.asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -2098,7 +2100,7 @@ public class ServerCnxTest {
() -> ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
null));
return null;
- }).when(pulsarTestContext.getManagedLedgerFactory())
+ }).when(managedLedgerFactory)
.asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -2165,7 +2167,7 @@ public class ServerCnxTest {
((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
- }).when(pulsarTestContext.getManagedLedgerFactory())
+ }).when(managedLedgerFactory)
.asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -2244,7 +2246,7 @@ public class ServerCnxTest {
() -> ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
null));
return null;
- }).when(pulsarTestContext.getManagedLedgerFactory())
+ }).when(managedLedgerFactory)
.asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -2316,7 +2318,7 @@ public class ServerCnxTest {
null));
return null;
- }).when(pulsarTestContext.getManagedLedgerFactory())
+ }).when(managedLedgerFactory)
.asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -2391,7 +2393,7 @@ public class ServerCnxTest {
() -> ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
null));
return null;
- }).when(pulsarTestContext.getManagedLedgerFactory())
+ }).when(managedLedgerFactory)
.asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -2400,7 +2402,7 @@ public class ServerCnxTest {
openTopicFail.complete(() -> ((OpenLedgerCallback)
invocationOnMock.getArguments()[2])
.openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null));
return null;
- }).when(pulsarTestContext.getManagedLedgerFactory())
+ }).when(managedLedgerFactory)
.asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -2926,7 +2928,7 @@ public class ServerCnxTest {
Thread.sleep(300);
((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
- }).when(pulsarTestContext.getManagedLedgerFactory())
+ }).when(managedLedgerFactory)
.asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -2937,7 +2939,7 @@ public class ServerCnxTest {
.openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null)).start();
return null;
- }).when(pulsarTestContext.getManagedLedgerFactory())
+ }).when(managedLedgerFactory)
.asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
index 7e8454f6c7e..fc10d315cb1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
@@ -70,7 +70,7 @@ public class TransactionMarkerDeleteTest extends
TransactionTestBase {
@Test
public void testMarkerDeleteTimes() throws Exception {
ManagedLedgerImpl managedLedger =
- spy((ManagedLedgerImpl)
getPulsarServiceList().get(0).getManagedLedgerFactory().open("test"));
+ spy((ManagedLedgerImpl)
getPulsarServiceList().get(0).getDefaultManagedLedgerFactory().open("test"));
PersistentTopic topic = mock(PersistentTopic.class);
BrokerService brokerService = mock(BrokerService.class);
PulsarService pulsarService = mock(PulsarService.class);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
index d0fd384ba78..d72e8f75427 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
@@ -105,7 +105,7 @@ public class ManagedLedgerMetricsTest extends
BrokerTestBase {
producer.send(message.getBytes());
}
- var managedLedgerFactory = (ManagedLedgerFactoryImpl)
pulsar.getManagedLedgerFactory();
+ var managedLedgerFactory = (ManagedLedgerFactoryImpl)
pulsar.getDefaultManagedLedgerFactory();
for (Entry<String, ManagedLedger> ledger :
managedLedgerFactory.getManagedLedgers().entrySet()) {
ManagedLedgerMBeanImpl stats = (ManagedLedgerMBeanImpl)
ledger.getValue().getStats();
stats.refreshStats(1, TimeUnit.SECONDS);
@@ -205,7 +205,7 @@ public class ManagedLedgerMetricsTest extends
BrokerTestBase {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setMaxEntriesPerLedger(2);
MLTransactionLogImpl mlTransactionLog = new
MLTransactionLogImpl(TransactionCoordinatorID.get(0),
- pulsar.getManagedLedgerFactory(), managedLedgerConfig,
txnLogBufferedWriterConfig,
+ pulsar.getDefaultManagedLedgerFactory(), managedLedgerConfig,
txnLogBufferedWriterConfig,
transactionTimer, DISABLED_BUFFERED_WRITER_METRICS);
mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
ManagedLedgerMetrics metrics = new ManagedLedgerMetrics(pulsar);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
index 7860b0708e3..70e386c68aa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
@@ -71,7 +71,7 @@ class NonStartableTestPulsarService extends
AbstractTestPulsarService {
super(spyConfig, config, localMetadataStore,
configurationMetadataStore, compactionServiceFactory,
brokerInterceptor, bookKeeperClientFactory, null);
setPulsarResources(pulsarResources);
- setManagedLedgerClientFactory(managedLedgerClientFactory);
+ setManagedLedgerStorage(managedLedgerClientFactory);
try {
setBrokerService(brokerServiceCustomizer.apply(
spyConfig.getBrokerService().spy(TestBrokerService.class,
this, getIoEventLoopGroup())));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index 3d79a17a90f..cdb047079bf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
@@ -56,7 +57,9 @@ import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil;
+import org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.compaction.CompactionServiceFactory;
@@ -136,7 +139,7 @@ public class PulsarTestContext implements AutoCloseable {
private final OrderedExecutor executor;
- private final ManagedLedgerStorage managedLedgerClientFactory;
+ private final ManagedLedgerStorage managedLedgerStorage;
private final PulsarService pulsarService;
@@ -167,8 +170,12 @@ public class PulsarTestContext implements AutoCloseable {
private final boolean enableOpenTelemetry;
private final InMemoryMetricReader openTelemetryMetricReader;
- public ManagedLedgerFactory getManagedLedgerFactory() {
- return managedLedgerClientFactory.getManagedLedgerFactory();
+ public ManagedLedgerStorage getManagedLedgerStorage() {
+ return managedLedgerStorage;
+ }
+
+ public ManagedLedgerFactory getDefaultManagedLedgerFactory() {
+ return
getManagedLedgerStorage().getDefaultStorageClass().getManagedLedgerFactory();
}
public PulsarMockBookKeeper getMockBookKeeper() {
@@ -524,8 +531,8 @@ public class PulsarTestContext implements AutoCloseable {
*/
public Builder managedLedgerClients(BookKeeper bookKeeperClient,
ManagedLedgerFactory
managedLedgerFactory) {
- return managedLedgerClientFactory(
-
PulsarTestContext.createManagedLedgerClientFactory(bookKeeperClient,
managedLedgerFactory));
+ return managedLedgerStorage(
+
PulsarTestContext.createManagedLedgerStorage(bookKeeperClient,
managedLedgerFactory));
}
/**
@@ -569,6 +576,9 @@ public class PulsarTestContext implements AutoCloseable {
if (configOverrideCustomizer != null) {
configOverrideCustomizer.accept(super.config);
}
+ if (super.managedLedgerStorage != null &&
!MockUtil.isMock(super.managedLedgerStorage)) {
+ super.managedLedgerStorage =
spyConfig.getManagedLedgerStorage().spy(super.managedLedgerStorage);
+ }
initializeCommonPulsarServices(spyConfig);
initializePulsarServices(spyConfig, this);
if (pulsarServiceCustomizer != null) {
@@ -622,7 +632,7 @@ public class PulsarTestContext implements AutoCloseable {
}
private void initializeCommonPulsarServices(SpyConfig spyConfig) {
- if (super.bookKeeperClient == null &&
super.managedLedgerClientFactory == null) {
+ if (super.bookKeeperClient == null && super.managedLedgerStorage
== null) {
if (super.executor == null) {
OrderedExecutor createdExecutor =
OrderedExecutor.newBuilder().numThreads(1)
.name(PulsarTestContext.class.getSimpleName() +
"-executor").build();
@@ -645,8 +655,11 @@ public class PulsarTestContext implements AutoCloseable {
});
bookKeeperClient(mockBookKeeper);
}
- if (super.bookKeeperClient == null &&
super.managedLedgerClientFactory != null) {
-
bookKeeperClient(super.managedLedgerClientFactory.getBookKeeperClient());
+ if (super.bookKeeperClient == null && super.managedLedgerStorage
!= null) {
+
bookKeeperClient(super.managedLedgerStorage.getStorageClasses().stream()
+
.filter(BookkeeperManagedLedgerStorageClass.class::isInstance)
+ .map(BookkeeperManagedLedgerStorageClass.class::cast)
+
.map(BookkeeperManagedLedgerStorageClass::getBookKeeperClient).findFirst().get());
}
if (super.localMetadataStore == null ||
super.configurationMetadataStore == null) {
if (super.mockZooKeeper != null) {
@@ -725,8 +738,8 @@ public class PulsarTestContext implements AutoCloseable {
}
@Override
- public Builder managedLedgerClientFactory(ManagedLedgerStorage
managedLedgerClientFactory) {
- throw new IllegalStateException("Cannot set
managedLedgerClientFactory when startable.");
+ public Builder managedLedgerStorage(ManagedLedgerStorage
managedLedgerStorage) {
+ throw new IllegalStateException("Cannot set managedLedgerStorage
when startable.");
}
@Override
@@ -788,10 +801,12 @@ public class PulsarTestContext implements AutoCloseable {
@Override
protected void initializePulsarServices(SpyConfig spyConfig, Builder
builder) {
- if (builder.managedLedgerClientFactory == null) {
+ if (builder.managedLedgerStorage == null) {
ManagedLedgerFactory mlFactoryMock =
Mockito.mock(ManagedLedgerFactory.class);
- managedLedgerClientFactory(
-
PulsarTestContext.createManagedLedgerClientFactory(builder.bookKeeperClient,
mlFactoryMock));
+ managedLedgerStorage(
+ spyConfig.getManagedLedgerStorage()
+
.spy(PulsarTestContext.createManagedLedgerStorage(builder.bookKeeperClient,
+ mlFactoryMock)));
}
if (builder.pulsarResources == null) {
SpyConfig.SpyType spyConfigPulsarResources =
spyConfig.getPulsarResources();
@@ -825,7 +840,7 @@ public class PulsarTestContext implements AutoCloseable {
builder.configurationMetadataStore,
compactionServiceFactory,
builder.brokerInterceptor,
bookKeeperClientFactory, builder.pulsarResources,
- builder.managedLedgerClientFactory,
builder.brokerServiceCustomizer);
+ builder.managedLedgerStorage,
builder.brokerServiceCustomizer);
if (compactionServiceFactory != null) {
compactionServiceFactory.initialize(pulsarService);
}
@@ -838,10 +853,31 @@ public class PulsarTestContext implements AutoCloseable {
}
@NotNull
- private static ManagedLedgerStorage
createManagedLedgerClientFactory(BookKeeper bookKeeperClient,
-
ManagedLedgerFactory managedLedgerFactory) {
- return new ManagedLedgerStorage() {
+ private static ManagedLedgerStorage createManagedLedgerStorage(BookKeeper
bookKeeperClient,
+
ManagedLedgerFactory managedLedgerFactory) {
+ BookkeeperManagedLedgerStorageClass managedLedgerStorageClass =
+ new BookkeeperManagedLedgerStorageClass() {
+ @Override
+ public String getName() {
+ return "bookkeeper";
+ }
+
+ @Override
+ public ManagedLedgerFactory getManagedLedgerFactory() {
+ return managedLedgerFactory;
+ }
+
+ @Override
+ public StatsProvider getStatsProvider() {
+ return new NullStatsProvider();
+ }
+ @Override
+ public BookKeeper getBookKeeperClient() {
+ return bookKeeperClient;
+ }
+ };
+ return new ManagedLedgerStorage() {
@Override
public void initialize(ServiceConfiguration conf,
MetadataStoreExtended metadataStore,
BookKeeperClientFactory bookkeeperProvider,
EventLoopGroup eventLoopGroup,
@@ -849,18 +885,17 @@ public class PulsarTestContext implements AutoCloseable {
}
@Override
- public ManagedLedgerFactory getManagedLedgerFactory() {
- return managedLedgerFactory;
- }
-
- @Override
- public StatsProvider getStatsProvider() {
- return new NullStatsProvider();
+ public Collection<ManagedLedgerStorageClass> getStorageClasses() {
+ return List.of(managedLedgerStorageClass);
}
@Override
- public BookKeeper getBookKeeperClient() {
- return bookKeeperClient;
+ public Optional<ManagedLedgerStorageClass>
getManagedLedgerStorageClass(String name) {
+ if (name == null || name.equals("bookkeeper")) {
+ return Optional.of(managedLedgerStorageClass);
+ } else {
+ return Optional.empty();
+ }
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
index 64789d1f0d4..285eb1bba6d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
@@ -112,6 +112,8 @@ public class SpyConfig {
*/
private final SpyType namespaceService;
+ private final SpyType managedLedgerStorage;
+
/**
* Create a builder for SpyConfig with no spies by default.
*
@@ -141,5 +143,6 @@ public class SpyConfig {
spyConfigBuilder.compactor(defaultSpyType);
spyConfigBuilder.compactedServiceFactory(defaultSpyType);
spyConfigBuilder.namespaceService(defaultSpyType);
+ spyConfigBuilder.managedLedgerStorage(defaultSpyType);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
index a0774414492..d82cd69c83d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
@@ -23,6 +23,7 @@ import
io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
+import lombok.SneakyThrows;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -30,6 +31,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -62,4 +64,23 @@ class StartableTestPulsarService extends
AbstractTestPulsarService {
public Supplier<NamespaceService> getNamespaceServiceProvider() throws
PulsarServerException {
return () ->
spyConfig.getNamespaceService().spy(NamespaceService.class, this);
}
+
+ @SneakyThrows
+ @Override
+ public ManagedLedgerStorage getManagedLedgerStorage() {
+ // support adding spy to managedLedgerStorage in beforePulsarStart
method
+ if (super.getManagedLedgerStorage() == null) {
+ setManagedLedgerStorage(createManagedLedgerStorageSpy());
+ }
+ return super.getManagedLedgerStorage();
+ }
+
+ @Override
+ protected ManagedLedgerStorage newManagedLedgerStorage() throws Exception {
+ return getManagedLedgerStorage();
+ }
+
+ private ManagedLedgerStorage createManagedLedgerStorageSpy() throws
Exception {
+ return
spyConfig.getManagedLedgerStorage().spy(super.newManagedLedgerStorage());
+ }
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index f21e11b9802..14cc813a17d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -806,7 +806,7 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
//
}
};
- pulsarService.getManagedLedgerFactory()
+ pulsarService.getDefaultManagedLedgerFactory()
.asyncOpenReadOnlyManagedLedger(snapshotSegmentTopicName.getPersistenceNamingEncoding(),
callback,
brokerService.getManagedLedgerConfig(snapshotSegmentTopicName).get(),null);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 14b1d563c11..3d7ab902bf4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -247,7 +247,7 @@ public class TransactionProduceTest extends
TransactionTestBase {
if (partition >= 0) {
topic = TopicName.get(topic).toString() +
TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
}
- return
getPulsarServiceList().get(0).getManagedLedgerFactory().openReadOnlyCursor(
+ return
getPulsarServiceList().get(0).getDefaultManagedLedgerFactory().openReadOnlyCursor(
TopicName.get(topic).getPersistenceNamingEncoding(),
PositionFactory.EARLIEST, new ManagedLedgerConfig());
} catch (Exception e) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 5480b1a21d5..35c9048ebb5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -510,7 +510,7 @@ public class TransactionTest extends TransactionTestBase {
admin.topics().createNonPartitionedTopic(topic);
PulsarService pulsarService = super.getPulsarServiceList().get(0);
pulsarService.getBrokerService().getTopics().clear();
- ManagedLedgerFactory managedLedgerFactory =
pulsarService.getBrokerService().getManagedLedgerFactory();
+ ManagedLedgerFactory managedLedgerFactory =
pulsarService.getDefaultManagedLedgerFactory();
Field field =
ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>
ledgers =
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 72aa078d5da..fc6a10e385a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -23,7 +23,7 @@ import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertM
import static
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
import static
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -45,6 +45,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.BKException;
@@ -952,8 +953,14 @@ public class PendingAckPersistentTest extends
TransactionTestBase {
assertNotNull(persistentTopic);
BrokerService brokerService = spy(persistentTopic.getBrokerService());
- doReturn(FutureUtil.failedFuture(new
BrokerServiceException.ServiceUnitNotReadyException("test")))
- .when(brokerService).getManagedLedgerConfig(any());
+ AtomicBoolean isGetManagedLedgerConfigFail = new AtomicBoolean(false);
+ doAnswer(invocation -> {
+ if (isGetManagedLedgerConfigFail.get()) {
+ return FutureUtil.failedFuture(new
BrokerServiceException.ServiceUnitNotReadyException("test"));
+ } else {
+ return invocation.callRealMethod();
+ }
+ }).when(brokerService).getManagedLedgerConfig(any());
Field field = AbstractTopic.class.getDeclaredField("brokerService");
field.setAccessible(true);
field.set(persistentTopic, brokerService);
@@ -968,11 +975,13 @@ public class PendingAckPersistentTest extends
TransactionTestBase {
producer.send("test");
Transaction transaction = pulsarClient.newTransaction()
- .withTransactionTimeout(30, TimeUnit.SECONDS).build().get();
+ .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+ isGetManagedLedgerConfigFail.set(true);
// pending ack init fail, so the ack will throw exception
try {
consumer.acknowledgeAsync(consumer.receive().getMessageId(),
transaction).get();
+ fail("ack should fail");
} catch (Exception e) {
assertTrue(e.getCause() instanceof
PulsarClientException.LookupException);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
index 9396a80cf25..6f79c573ed3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
@@ -291,7 +291,7 @@ public class OrphanPersistentTopicTest extends
ProducerConsumerBase {
Thread.sleep(10 * 1000);
}
log.info("Race condition occurs {} times",
mockRaceConditionCounter.get());
-
pulsar.getManagedLedgerFactory().delete(TopicName.get(tpName).getPersistenceNamingEncoding());
+
pulsar.getDefaultManagedLedgerFactory().delete(TopicName.get(tpName).getPersistenceNamingEncoding());
}
return invocation.callRealMethod();
}).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 2e71e8cc28c..e76c3d8fb84 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -91,8 +91,13 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.broker.testcontext.SpyConfig;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -150,6 +155,14 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
super.producerBaseSetup();
}
+ @Override
+ protected PulsarTestContext.Builder
createPulsarTestContextBuilder(ServiceConfiguration conf) {
+ return super.createPulsarTestContextBuilder(conf)
+ .spyConfig(SpyConfig.builder()
+
.managedLedgerStorage(SpyConfig.SpyType.SPY_ALSO_INVOCATIONS)
+ .build());
+ }
+
@AfterMethod(alwaysRun = true)
public void cleanupAfterMethod() throws Exception {
try {
@@ -1097,18 +1110,25 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
}
+
@Override
protected void beforePulsarStart(PulsarService pulsar) throws Exception {
super.beforePulsarStart(pulsar);
- doAnswer(i0 -> {
- ManagedLedgerFactory factory = (ManagedLedgerFactory)
spy(i0.callRealMethod());
- doAnswer(i1 -> {
- EntryCacheManager manager = (EntryCacheManager)
spy(i1.callRealMethod());
- doAnswer(i2 ->
spy(i2.callRealMethod())).when(manager).getEntryCache(any());
- return manager;
- }).when(factory).getEntryCacheManager();
- return factory;
- }).when(pulsar).getManagedLedgerFactory();
+ ManagedLedgerStorage managedLedgerStorage =
pulsar.getManagedLedgerStorage();
+ doAnswer(invocation -> {
+ ManagedLedgerStorageClass managedLedgerStorageClass =
+ (ManagedLedgerStorageClass)
spy(invocation.callRealMethod());
+ doAnswer(i0 -> {
+ ManagedLedgerFactory factory = (ManagedLedgerFactory)
spy(i0.callRealMethod());
+ doAnswer(i1 -> {
+ EntryCacheManager manager = (EntryCacheManager)
spy(i1.callRealMethod());
+ doAnswer(i2 ->
spy(i2.callRealMethod())).when(manager).getEntryCache(any());
+ return manager;
+ }).when(factory).getEntryCacheManager();
+ return factory;
+ }).when(managedLedgerStorageClass).getManagedLedgerFactory();
+ return managedLedgerStorageClass;
+ }).when(managedLedgerStorage).getDefaultStorageClass();
}
/**
@@ -1126,6 +1146,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
public void testActiveAndInActiveConsumerEntryCacheBehavior() throws
Exception {
log.info("-- Starting {} test --", methodName);
+
final long batchMessageDelayMs = 100;
final int receiverSize = 10;
final String topicName = "cache-topic";
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
index 36c36735c06..390e81ad664 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java
@@ -563,7 +563,7 @@ public class SubscriptionPauseOnAckStatPersistTest extends
ProducerConsumerBase
final String subscription = "s1";
final int msgSendCount = 100;
// Inject a injection to record the counter of calling
"cursor.isCursorDataFullyPersistable".
- final ManagedLedgerImpl ml = (ManagedLedgerImpl)
pulsar.getBrokerService().getManagedLedgerFactory().open(mlName);
+ final ManagedLedgerImpl ml = (ManagedLedgerImpl)
pulsar.getDefaultManagedLedgerFactory().open(mlName);
final ManagedCursorImpl cursor = (ManagedCursorImpl)
ml.openCursor(subscription);
final ManagedCursorImpl spyCursor = Mockito.spy(cursor);
AtomicInteger callingIsCursorDataFullyPersistableCounter = new
AtomicInteger();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index d3cb1d60d37..0b3ff345acf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -624,7 +624,7 @@ public class V1_ProducerConsumerTest extends
V1_ProducerConsumerBase {
return manager;
}).when(factory).getEntryCacheManager();
return factory;
- }).when(pulsar).getManagedLedgerFactory();
+ }).when(pulsar).getDefaultManagedLedgerFactory();
}
/**
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
index 1395424b141..2b1b409b71c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
@@ -62,7 +62,7 @@ public class SequenceIdWithErrorTest extends
BkEnsemblesTestBase {
ManagedLedgerClientFactory clientFactory = new
ManagedLedgerClientFactory();
clientFactory.initialize(pulsar.getConfiguration(),
pulsar.getLocalMetadataStore(),
pulsar.getBookKeeperClientFactory(), eventLoopGroup,
OpenTelemetry.noop());
- ManagedLedgerFactory mlFactory =
clientFactory.getManagedLedgerFactory();
+ ManagedLedgerFactory mlFactory =
clientFactory.getDefaultStorageClass().getManagedLedgerFactory();
ManagedLedger ml =
mlFactory.open(TopicName.get(topicName).getPersistenceNamingEncoding());
ml.close();
clientFactory.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 19f42a7e057..d75ccce7ff3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -927,7 +927,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
// verify second ledger created
String managedLedgerName =
((PersistentTopic)pulsar.getBrokerService().getTopicReference(topic).get())
.getManagedLedger().getName();
- ManagedLedgerInfo info =
pulsar.getManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName);
+ ManagedLedgerInfo info =
pulsar.getDefaultManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName);
Assert.assertEquals(info.ledgers.size(), 2);
Assert.assertTrue(ledgersOpened.isEmpty()); // no ledgers should have
been opened
@@ -950,7 +950,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
.send();
}
- info =
pulsar.getManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName);
+ info =
pulsar.getDefaultManagedLedgerFactory().getManagedLedgerInfo(managedLedgerName);
Assert.assertEquals(info.ledgers.size(), 3);
// should only have opened the penultimate ledger to get stat
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
index df4e086748f..3fbc91e7d2e 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java
@@ -30,17 +30,24 @@ public class PersistencePolicies {
private int bookkeeperWriteQuorum;
private int bookkeeperAckQuorum;
private double managedLedgerMaxMarkDeleteRate;
+ private String managedLedgerStorageClassName;
public PersistencePolicies() {
- this(2, 2, 2, 0.0);
+ this(2, 2, 2, 0.0, null);
}
public PersistencePolicies(int bookkeeperEnsemble, int
bookkeeperWriteQuorum, int bookkeeperAckQuorum,
- double managedLedgerMaxMarkDeleteRate) {
+ double managedLedgerMaxMarkDeleteRate) {
+ this(bookkeeperEnsemble, bookkeeperWriteQuorum, bookkeeperAckQuorum,
managedLedgerMaxMarkDeleteRate, null);
+ }
+
+ public PersistencePolicies(int bookkeeperEnsemble, int
bookkeeperWriteQuorum, int bookkeeperAckQuorum,
+ double managedLedgerMaxMarkDeleteRate, String
managedLedgerStorageClassName) {
this.bookkeeperEnsemble = bookkeeperEnsemble;
this.bookkeeperWriteQuorum = bookkeeperWriteQuorum;
this.bookkeeperAckQuorum = bookkeeperAckQuorum;
this.managedLedgerMaxMarkDeleteRate = managedLedgerMaxMarkDeleteRate;
+ this.managedLedgerStorageClassName = managedLedgerStorageClassName;
}
public int getBookkeeperEnsemble() {
@@ -59,10 +66,14 @@ public class PersistencePolicies {
return managedLedgerMaxMarkDeleteRate;
}
+ public String getManagedLedgerStorageClassName() {
+ return managedLedgerStorageClassName;
+ }
+
@Override
public int hashCode() {
return Objects.hash(bookkeeperEnsemble, bookkeeperWriteQuorum,
- bookkeeperAckQuorum, managedLedgerMaxMarkDeleteRate);
+ bookkeeperAckQuorum, managedLedgerMaxMarkDeleteRate,
managedLedgerStorageClassName);
}
@Override
public boolean equals(Object obj) {
@@ -71,7 +82,8 @@ public class PersistencePolicies {
return bookkeeperEnsemble == other.bookkeeperEnsemble
&& bookkeeperWriteQuorum == other.bookkeeperWriteQuorum
&& bookkeeperAckQuorum == other.bookkeeperAckQuorum
- && managedLedgerMaxMarkDeleteRate ==
other.managedLedgerMaxMarkDeleteRate;
+ && managedLedgerMaxMarkDeleteRate ==
other.managedLedgerMaxMarkDeleteRate
+ && managedLedgerStorageClassName ==
other.managedLedgerStorageClassName;
}
return false;
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index e8e644b6880..8adedcd14ac 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1379,6 +1379,11 @@ public class CmdNamespaces extends CmdBase {
description = "Throttling rate of mark-delete operation (0
means no throttle)")
private double managedLedgerMaxMarkDeleteRate = 0;
+ @Option(names = { "-c",
+ "--ml-storage-class" },
+ description = "Managed ledger storage class name")
+ private String managedLedgerStorageClassName;
+
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(namespaceName);
@@ -1390,7 +1395,8 @@ public class CmdNamespaces extends CmdBase {
throw new ParameterException("[--ml-mark-delete-max-rate]
cannot less than 0.");
}
getAdmin().namespaces().setPersistence(namespace, new
PersistencePolicies(bookkeeperEnsemble,
- bookkeeperWriteQuorum, bookkeeperAckQuorum,
managedLedgerMaxMarkDeleteRate));
+ bookkeeperWriteQuorum, bookkeeperAckQuorum,
managedLedgerMaxMarkDeleteRate,
+ managedLedgerStorageClassName));
}
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 3cc72db2e95..10850d107ed 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -1197,6 +1197,11 @@ public class CmdTopicPolicies extends CmdBase {
+ "If set to true, the policy will be replicate to other
clusters asynchronously", arity = "0")
private boolean isGlobal = false;
+ @Option(names = { "-c",
+ "--ml-storage-class" },
+ description = "Managed ledger storage class name")
+ private String managedLedgerStorageClassName;
+
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(topicName);
@@ -1208,7 +1213,8 @@ public class CmdTopicPolicies extends CmdBase {
throw new ParameterException("[--ml-mark-delete-max-rate]
cannot less than 0.");
}
getTopicPolicies(isGlobal).setPersistence(persistentTopic, new
PersistencePolicies(bookkeeperEnsemble,
- bookkeeperWriteQuorum, bookkeeperAckQuorum,
managedLedgerMaxMarkDeleteRate));
+ bookkeeperWriteQuorum, bookkeeperAckQuorum,
managedLedgerMaxMarkDeleteRate,
+ managedLedgerStorageClassName));
}
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 261bd81a5b7..955d6e13e1d 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -2148,6 +2148,11 @@ public class CmdTopics extends CmdBase {
+ "(0 means no throttle)")
private double managedLedgerMaxMarkDeleteRate = 0;
+ @Option(names = { "-c",
+ "--ml-storage-class" },
+ description = "Managed ledger storage class name")
+ private String managedLedgerStorageClassName;
+
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(topicName);
@@ -2159,7 +2164,8 @@ public class CmdTopics extends CmdBase {
throw new ParameterException("[--ml-mark-delete-max-rate]
cannot less than 0.");
}
getTopics().setPersistence(persistentTopic, new
PersistencePolicies(bookkeeperEnsemble,
- bookkeeperWriteQuorum, bookkeeperAckQuorum,
managedLedgerMaxMarkDeleteRate));
+ bookkeeperWriteQuorum, bookkeeperAckQuorum,
managedLedgerMaxMarkDeleteRate,
+ managedLedgerStorageClassName));
}
}