This is an automated email from the ASF dual-hosted git repository.
Technoboy- pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 2537ffc8744 [fix][broker] Close pending acks cleanup gap in
BacklogQuotaManager (#25624)
2537ffc8744 is described below
commit 2537ffc87440d344092d167f65d2c4602224dfcc
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue May 12 19:05:41 2026 +0800
[fix][broker] Close pending acks cleanup gap in BacklogQuotaManager (#25624)
---
.../pulsar/broker/service/BacklogQuotaManager.java | 68 +++++++++--
.../broker/service/BacklogQuotaManagerTest.java | 129 +++++++++++++++++++++
2 files changed, 188 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 47ff3588ab2..b5a0d5877fe 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;
import static java.util.concurrent.TimeUnit.SECONDS;
+import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -33,12 +34,14 @@ import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import
org.apache.pulsar.broker.service.persistent.PersistentTopicMetrics.BacklogQuotaMetrics;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
+import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -137,9 +140,7 @@ public class BacklogQuotaManager {
* Backlog quota set for the topic
*/
private void dropBacklogForSizeLimit(PersistentTopic persistentTopic,
BacklogQuota quota) {
- // Set the reduction factor to 90%. The aim is to drop down the
backlog to 90% of the quota limit.
- double reductionFactor = 0.9;
- double targetSize = reductionFactor * quota.getLimitSize();
+ long targetSize = computeEvictionTarget(quota.getLimitSize());
// Get estimated unconsumed size for the managed ledger associated
with this topic. Estimated size is more
// useful than the actual storage size. Actual storage size gets
updated only when managed ledger is trimmed.
@@ -148,7 +149,7 @@ public class BacklogQuotaManager {
if (log.isDebugEnabled()) {
log.debug("[{}] target size is [{}] for quota limit [{}], backlog
size is [{}]", persistentTopic.getName(),
- targetSize, targetSize / reductionFactor, backlogSize);
+ targetSize, quota.getLimitSize(), backlogSize);
}
ManagedCursor previousSlowestConsumer = null;
while (backlogSize > targetSize) {
@@ -165,13 +166,17 @@ public class BacklogQuotaManager {
if (slowestConsumer == previousSlowestConsumer) {
log.info("[{}] Cursors not progressing, target size is [{}]
for quota limit [{}], backlog size is [{}]",
- persistentTopic.getName(), targetSize, targetSize /
reductionFactor, backlogSize);
+ persistentTopic.getName(), targetSize,
quota.getLimitSize(), backlogSize);
break;
}
// Calculate number of messages to be skipped using the current
backlog and the skip factor.
long entriesInBacklog =
slowestConsumer.getNumberOfEntriesInBacklog(false);
- int messagesToSkip = (int) (messageSkipFactor * entriesInBacklog);
+
+ int messagesToSkip = computeEntriesToEvict(
+ backlogSize,
+ quota.getLimitSize(),
+ entriesInBacklog);
try {
// If there are no messages to skip, break out of the loop
if (messagesToSkip == 0) {
@@ -186,6 +191,7 @@ public class BacklogQuotaManager {
persistentTopic.getName(), messagesToSkip,
slowestConsumer.getName(), entriesInBacklog);
}
slowestConsumer.skipEntries(messagesToSkip,
IndividualDeletedEntries.Include);
+ markDeletePositionMoveForward(persistentTopic,
slowestConsumer);
} catch (Exception e) {
log.error("[{}] Error skipping [{}] messages from slowest
consumer [{}]", persistentTopic.getName(),
messagesToSkip, slowestConsumer.getName(), e);
@@ -213,9 +219,7 @@ public class BacklogQuotaManager {
boolean
preciseTimeBasedBacklogQuotaCheck) {
// If enabled precise time based backlog quota check, will expire
message based on the timeBaseQuota
if (preciseTimeBasedBacklogQuotaCheck) {
- // Set the reduction factor to 90%. The aim is to drop down the
backlog to 90% of the quota limit.
- double reductionFactor = 0.9;
- int target = (int) (reductionFactor * quota.getLimitTime());
+ int target = (int) computeEvictionTarget(quota.getLimitTime());
if (log.isDebugEnabled()) {
log.debug("[{}] target backlog expire time is [{}]",
persistentTopic.getName(), target);
}
@@ -244,6 +248,7 @@ public class BacklogQuotaManager {
long ledgerId =
mLedger.getLedgersInfo().ceilingKey(oldestPosition.getLedgerId() + 1);
Position nextPosition =
PositionFactory.create(ledgerId, -1);
slowestConsumer.markDelete(nextPosition);
+ markDeletePositionMoveForward(persistentTopic,
slowestConsumer);
continue;
}
// Timestamp only > 0 if ledger has been closed
@@ -254,6 +259,7 @@ public class BacklogQuotaManager {
Position nextPosition =
PositionFactory.create(ledgerId, -1);
if (!nextPosition.equals(oldestPosition)) {
slowestConsumer.markDelete(nextPosition);
+ markDeletePositionMoveForward(persistentTopic,
slowestConsumer);
continue;
}
}
@@ -314,4 +320,48 @@ public class BacklogQuotaManager {
// We may need to check other system cursors here : replicator,
compaction
return false;
}
+
+ /**
+ * Invoke {@link Dispatcher#markDeletePositionMoveForward()} for the
subscription that owns the given cursor.
+ * This ensures pending acks and redelivery state are cleaned up when the
cursor is advanced by
+ * backlog quota eviction (bypassing the subscription-level wrappers that
normally fire this hook).
+ *
+ * @param persistentTopic the topic
+ * @param cursor the cursor that was advanced
+ */
+ private void markDeletePositionMoveForward(PersistentTopic
persistentTopic, ManagedCursor cursor) {
+ PersistentSubscription subscription =
+
persistentTopic.getSubscriptions().get(Codec.decode(cursor.getName()));
+ if (subscription != null && subscription.getDispatcher() != null) {
+ subscription.getDispatcher().markDeletePositionMoveForward();
+ }
+ }
+
+
+ /**
+ * Compute the target value after backlog eviction.
+ *
+ * @param quotaLimit configured quota limit
+ * @return target value after eviction
+ */
+ private static long computeEvictionTarget(long quotaLimit) {
+ double factor = 0.9;
+ return (long) (factor * quotaLimit);
+ }
+
+ /**
+ * Compute the number of entries to evict in a single eviction iteration.
+ *
+ * @param currentValue current backlog value
+ * @param quotaLimit configured quota limit
+ * @param totalEntries total entries in backlog
+ * @return entries to evict
+ */
+ @VisibleForTesting
+ static int computeEntriesToEvict(
+ long currentValue, long quotaLimit, long totalEntries) {
+ long evictionTarget = computeEvictionTarget(quotaLimit);
+ return (int) ((currentValue - evictionTarget)
+ * (double) totalEntries / currentValue);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 13562fafdf8..a693364ec8f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
import static java.util.Map.entry;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.pulsar.broker.service.BacklogQuotaManager.computeEntriesToEvict;
import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongGaugeValue;
import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static
org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType.destination_storage;
@@ -55,6 +56,7 @@ import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil;
import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
@@ -71,6 +73,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -2174,4 +2177,130 @@ public class BacklogQuotaManagerTest {
assertTrue(stats.getBacklogSize() < 10 * 1024, "Storage size is [" +
stats.getStorageSize() + "]");
}
private static final Logger LOG =
LoggerFactory.getLogger(BacklogQuotaManagerTest.class);
+
+ private void assertPendingAcks(org.apache.pulsar.broker.service.Consumer
consumer, int expected) {
+ PendingAcksMap pendingAcks = consumer.getPendingAcks();
+ assertThat(pendingAcks).isNotNull();
+ assertThat(pendingAcks.size()).isEqualTo(expected);
+ assertThat(consumer.getUnackedMessages()).isEqualTo(expected);
+ }
+
+ @Test
+ public void testConsumerBacklogEvictionSizeQuotaCleansPendingAcks() throws
Exception {
+ final int msgSize = 1024;
+ final int quotaSizeLimit = 10 * 1024;
+ final int numMsgs = 20;
+
+ admin.namespaces().setBacklogQuota("prop/ns-quota",
+ BacklogQuota.builder()
+ .limitSize(quotaSizeLimit)
+
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+ .build());
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(adminUrl.toString())
+ .build();
+
+ final String topic =
+
BrokerTestUtil.newUniqueName("persistent://prop/ns-quota/topic-pending-acks-size");
+ final String subName = "key-shared-sub";
+
+ @Cleanup
+ Consumer<byte[]> consumer = client.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+
+ @Cleanup
+ Producer<byte[]> producer = createProducer(client, topic);
+
+ byte[] content = new byte[msgSize];
+ for (int i = 0; i < numMsgs; i++) {
+ producer.send(content);
+ }
+
+ // Receive all messages but don't ack — pending acks accumulate.
+ for (int i = 0; i < numMsgs; i++) {
+ consumer.receive();
+ }
+
+ PersistentTopic topicRef =
+ (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topic).get();
+ PersistentSubscription sub = topicRef.getSubscription(subName);
+
+ org.apache.pulsar.broker.service.Consumer brokerConsumer =
sub.getDispatcher().getConsumers().get(0);
+ assertThat(sub).isNotNull();
+ assertPendingAcks(brokerConsumer, numMsgs);
+
+ int expectedRemaining = numMsgs - computeEntriesToEvict(
+ (long) numMsgs * msgSize,
+ quotaSizeLimit,
+ numMsgs);
+
+ Awaitility.await()
+ .pollDelay(TIME_TO_CHECK_BACKLOG_QUOTA + 1, SECONDS)
+ .pollInterval(1, SECONDS)
+ .untilAsserted(() -> assertPendingAcks(brokerConsumer,
expectedRemaining));
+ }
+
+ @Test
+ public void
testConsumerBacklogEvictionTimeQuotaNotPreciseCleansPendingAcks()
+ throws Exception {
+ admin.namespaces().setBacklogQuota("prop/ns-quota",
+ BacklogQuota.builder()
+ .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
+
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+ .build(), message_age);
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(adminUrl.toString())
+ .build();
+
+ final String topic =
+
BrokerTestUtil.newUniqueName("persistent://prop/ns-quota/topic-pending-acks-time");
+ final String subName = "key-shared-sub-time";
+ final int numMsgs = 14;
+
+ @Cleanup
+ Consumer<byte[]> consumer = client.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+
+ @Cleanup
+ Producer<byte[]> producer = createProducer(client, topic);
+
+ byte[] content = new byte[1024];
+ for (int i = 0; i < numMsgs; i++) {
+ producer.send(content);
+ }
+
+ // Receive all messages but don't ack — pending acks accumulate.
+ for (int i = 0; i < numMsgs; i++) {
+ consumer.receive();
+ }
+
+ PersistentTopic topicRef =
+ (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topic).get();
+ PersistentSubscription sub = topicRef.getSubscription(subName);
+
+ org.apache.pulsar.broker.service.Consumer brokerConsumer =
sub.getDispatcher().getConsumers().get(0);
+ assertThat(sub).isNotNull();
+
+ assertPendingAcks(brokerConsumer, numMsgs);
+
+ // Non-precise eviction removes whole closed ledgers only.
+ // With MAX_ENTRIES_PER_LEDGER=5 and 14 entries:
+ // ledgers are [5, 5, 4]. The last ledger remains open and is not
evicted.
+ int expectedRemaining = numMsgs % MAX_ENTRIES_PER_LEDGER;
+
+ Awaitility.await()
+ .pollDelay(TIME_TO_CHECK_BACKLOG_QUOTA * 2, SECONDS)
+ .pollInterval(1, SECONDS)
+ .untilAsserted(() -> assertPendingAcks(brokerConsumer,
expectedRemaining));
+ }
}