This is an automated email from the ASF dual-hosted git repository.
thetumbled pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ee33c99606e [improve][broker] PIP-444: Rate limit for deleting ledger
to alleviate the zk pressure. (#24760)
ee33c99606e is described below
commit ee33c99606e8f573dc55f47529e7c209bd8194e3
Author: Wenzhi Feng <[email protected]>
AuthorDate: Sat Oct 11 15:28:31 2025 +0800
[improve][broker] PIP-444: Rate limit for deleting ledger to alleviate the
zk pressure. (#24760)
Co-authored-by: fengwenzhi <[email protected]>
---
.../bookkeeper/mledger/ManagedLedgerConfig.java | 28 +++++++++++++
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 47 +++++++++++++++++++---
.../apache/pulsar/broker/ServiceConfiguration.java | 5 +++
.../pulsar/broker/service/BrokerService.java | 25 ++++++++++++
4 files changed, 100 insertions(+), 5 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index bd4c1a5014e..8fbf1cfda0d 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -23,6 +23,8 @@ import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.Arrays;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.Setter;
@@ -61,6 +63,8 @@ public class ManagedLedgerConfig {
private int metadataMaxEntriesPerLedger = 50000;
private int ledgerRolloverTimeout = 4 * 3600;
private double throttleMarkDelete = 0;
+ private Semaphore ledgerDeletionSemaphore;
+ private ExecutorService ledgerDeleteExecutor;
private long retentionTimeMs = 0;
private long retentionSizeInMB = 0;
private boolean autoSkipNonRecoverableData;
@@ -410,6 +414,30 @@ public class ManagedLedgerConfig {
return this;
}
+ /**
+ * @return the semaphore used to limit concurrent ledger deletions
+ */
+ public Semaphore getLedgerDeletionSemaphore() {
+ return ledgerDeletionSemaphore;
+ }
+
+ public ManagedLedgerConfig setLedgerDeletionSemaphore(Semaphore semaphore)
{
+ this.ledgerDeletionSemaphore = semaphore;
+ return this;
+ }
+
+ /**
+ * @return the executor service to be used for deleting ledgers
+ */
+ public ExecutorService getLedgerDeleteExecutor() {
+ return ledgerDeleteExecutor;
+ }
+
+ public ManagedLedgerConfig setLedgerDeleteExecutor(ExecutorService
executor) {
+ this.ledgerDeleteExecutor = executor;
+ return this;
+ }
+
/**
* Set the retention time for the ManagedLedger.
* <p>
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index d157677d210..346ddb2f6c9 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -57,6 +57,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -354,6 +355,11 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
private long lastEvictOffloadedLedgers;
private static final int MINIMUM_EVICTION_INTERVAL_DIVIDER = 10;
+ // Semaphore to limit concurrent ledger deletion
+ private Semaphore deleteLedgerSemaphore = null;
+ // Executor service for executing ledger deletion tasks
+ private ExecutorService deleteLedgerExecutor = null;
+
public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper
bookKeeper, MetaStore store,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
final String name) {
@@ -402,6 +408,10 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
this.minBacklogEntriesForCaching =
config.getMinimumBacklogEntriesForCaching();
this.maxBacklogBetweenCursorsForCaching =
config.getMaxBacklogBetweenCursorsForCaching();
this.managedLedgerAttributes = new ManagedLedgerAttributes(this);
+ if (config.getLedgerDeletionSemaphore() != null) {
+ this.deleteLedgerSemaphore = config.getLedgerDeletionSemaphore();
+ this.deleteLedgerExecutor = config.getLedgerDeleteExecutor();
+ }
}
synchronized void initialize(final ManagedLedgerInitializeLedgerCallback
callback, final Object ctx) {
@@ -569,7 +579,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
public void operationComplete(Void v, Stat stat) {
ledgersStat = stat;
emptyLedgersToBeDeleted.forEach(ledgerId -> {
- bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
+ asyncDeleteLedgerWithConcurrencyLimit(ledgerId, (rc, ctx)
-> {
log.info("[{}] Deleted empty ledger ledgerId={}
rc={}", name, ledgerId, rc);
}, null);
});
@@ -1763,7 +1773,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
log.warn("[{}] Error updating meta data with the new list
of ledgers: {}", name, e.getMessage());
handleBadVersion(e);
mbean.startDataLedgerDeleteOp();
- bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> {
+ asyncDeleteLedgerWithConcurrencyLimit(lh.getId(), (rc1,
ctx1) -> {
mbean.endDataLedgerDeleteOp();
if (rc1 != BKException.Code.OK) {
log.warn("[{}] Failed to delete ledger {}: {}",
name, lh.getId(),
@@ -1846,7 +1856,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
STATE_UPDATER.set(this, State.LedgerOpened);
// Delete original "currentLedger" if it has been removed from
"ledgers".
if (originalCurrentLedger != null &&
!ledgers.containsKey(originalCurrentLedger.getId())){
- bookKeeper.asyncDeleteLedger(originalCurrentLedger.getId(), (rc,
ctx) -> {
+
asyncDeleteLedgerWithConcurrencyLimit(originalCurrentLedger.getId(), (rc, ctx)
-> {
mbean.endDataLedgerDeleteOp();
log.info("[{}] Delete complete for empty ledger {}. rc={}",
name, originalCurrentLedger.getId(), rc);
}, null);
@@ -3385,7 +3395,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
private void asyncDeleteLedgerWithRetry(CompletableFuture<Void> future,
long ledgerId, long retry) {
- bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
+ asyncDeleteLedgerWithConcurrencyLimit(ledgerId, (rc, ctx) -> {
if (isNoSuchLedgerExistsException(rc)) {
log.warn("[{}] Ledger was already deleted {}", name, ledgerId);
future.complete(null);
@@ -3408,6 +3418,33 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}, null);
}
+ /**
+ * Delete a ledger asynchronously, applying a concurrency limit if
configured.
+ * @param ledgerId
+ * @param cb
+ * @param ctx
+ */
+ private void asyncDeleteLedgerWithConcurrencyLimit(long ledgerId,
+
org.apache.bookkeeper.client.AsyncCallback.DeleteCallback cb,
+ Object ctx) {
+ if (deleteLedgerSemaphore != null) {
+ AsyncCallback.DeleteCallback cbWrapper = (rc, ctx1) -> {
+ deleteLedgerSemaphore.release();
+ cb.deleteComplete(rc, ctx1);
+ };
+ deleteLedgerExecutor.execute(() -> {
+ try {
+ deleteLedgerSemaphore.acquire();
+ bookKeeper.asyncDeleteLedger(ledgerId, cbWrapper, ctx);
+ } catch (InterruptedException e) {
+ log.error("[{}] Interrupted while waiting to delete ledger
{}", name, ledgerId);
+ }
+ });
+ } else {
+ bookKeeper.asyncDeleteLedger(ledgerId, cb, ctx);
+ }
+ }
+
@SuppressWarnings("checkstyle:fallthrough")
private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) {
List<LedgerInfo> ledgers =
Lists.newArrayList(ManagedLedgerImpl.this.ledgers.values());
@@ -3422,7 +3459,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
if (log.isDebugEnabled()) {
log.debug("[{}] Deleting ledger {}", name, ls);
}
- bookKeeper.asyncDeleteLedger(ls.getLedgerId(), (rc, ctx1) -> {
+ asyncDeleteLedgerWithConcurrencyLimit(ls.getLedgerId(), (rc, ctx1)
-> {
switch (rc) {
case Code.NoSuchLedgerExistsException:
case Code.NoSuchLedgerExistsOnMetadataServerException:
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5ca0db944a4..30fef55ece3 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2262,6 +2262,11 @@ public class ServiceConfiguration implements
PulsarConfiguration {
doc = "Rate limit the amount of writes per second generated by
consumer acking the messages"
)
private double managedLedgerDefaultMarkDeleteRateLimit = 1.0;
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Max number of concurrent requests for deleting ledgers at
broker level"
+ )
+ private int managedLedgerDeleteMaxConcurrentRequests = 1000;
@FieldContext(
category = CATEGORY_STORAGE_ML,
dynamic = true,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 5c19de44341..79dffdf7aad 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -329,6 +329,12 @@ public class BrokerService implements Closeable {
private final TopicEventsDispatcher topicEventsDispatcher = new
TopicEventsDispatcher();
private volatile boolean unloaded = false;
+ // semaphore for limiting the concurrency of ledger deletion at broker
level,
+ // thus all managed ledgers sharing the same semaphore
+ private final Semaphore ledgerDeletionSemaphore;
+
+ private final ExecutorProvider ledgerDeletionExecutorProvider;
+
public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup)
throws Exception {
this.pulsar = pulsar;
this.clock = pulsar.getClock();
@@ -451,6 +457,16 @@ public class BrokerService implements Closeable {
.getBrokerEntryPayloadProcessors(),
BrokerService.class.getClassLoader());
this.bundlesQuotas = new BundlesQuotas(pulsar);
+ if
(pulsar.getConfiguration().getManagedLedgerDeleteMaxConcurrentRequests() > 0) {
+ log.info("Setting managed ledger deletion max concurrent requests
to {}",
+
pulsar.getConfiguration().getManagedLedgerDeleteMaxConcurrentRequests());
+ this.ledgerDeletionSemaphore = new Semaphore(
+
pulsar.getConfiguration().getManagedLedgerDeleteMaxConcurrentRequests());
+ this.ledgerDeletionExecutorProvider = new ExecutorProvider(1,
"pulsar-ledger-deletion");
+ } else {
+ this.ledgerDeletionSemaphore = null;
+ this.ledgerDeletionExecutorProvider = null;
+ }
}
protected DispatchRateLimiterFactory
createDispatchRateLimiterFactory(ServiceConfiguration config)
@@ -802,6 +818,12 @@ public class BrokerService implements Closeable {
try {
log.info("Shutting down Pulsar Broker service");
+ // shutdown executor for ledger deletion
+ if (ledgerDeletionExecutorProvider != null) {
+ log.info("Shutting down executor for ledger deletion...");
+ ledgerDeletionExecutorProvider.shutdownNow();
+ }
+
// unregister non-static metrics collectors
pendingTopicLoadRequests.unregister();
pendingLookupRequests.unregister();
@@ -2057,6 +2079,9 @@ public class BrokerService implements Closeable {
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate()
>= 0
? persistencePolicies.getManagedLedgerMaxMarkDeleteRate()
:
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit());
+
managedLedgerConfig.setLedgerDeletionSemaphore(this.ledgerDeletionSemaphore);
+
managedLedgerConfig.setLedgerDeleteExecutor(this.ledgerDeletionExecutorProvider
!= null
+ ? this.ledgerDeletionExecutorProvider.getExecutor() :
null);
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());