This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e3419ce67b369f203c388a041cc57ca3317c0812
Author: Zixuan Liu <[email protected]>
AuthorDate: Thu Apr 30 15:19:53 2026 +0800

    [fix][broker] Close pending acks cleanup gap in BacklogQuotaManager (#25624)
    
    (cherry-pick from commit d861e15a1ffd1ed0f622d5659bfe8bd9dffd2300)
---
 .../pulsar/broker/service/BacklogQuotaManager.java |  68 +++++++++--
 .../broker/service/BacklogQuotaManagerTest.java    | 129 +++++++++++++++++++++
 2 files changed, 188 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 15f218ce216..6dd312bb478 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -33,12 +34,14 @@ import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.broker.service.persistent.PersistentTopicMetrics.BacklogQuotaMetrics;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
+import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 
@@ -137,9 +140,7 @@ public class BacklogQuotaManager {
      *            Backlog quota set for the topic
      */
     private void dropBacklogForSizeLimit(PersistentTopic persistentTopic, 
BacklogQuota quota) {
-        // Set the reduction factor to 90%. The aim is to drop down the 
backlog to 90% of the quota limit.
-        double reductionFactor = 0.9;
-        double targetSize = reductionFactor * quota.getLimitSize();
+        long targetSize = computeEvictionTarget(quota.getLimitSize());
 
         // Get estimated unconsumed size for the managed ledger associated 
with this topic. Estimated size is more
         // useful than the actual storage size. Actual storage size gets 
updated only when managed ledger is trimmed.
@@ -148,7 +149,7 @@ public class BacklogQuotaManager {
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] target size is [{}] for quota limit [{}], backlog 
size is [{}]", persistentTopic.getName(),
-                    targetSize, targetSize / reductionFactor, backlogSize);
+                    targetSize, quota.getLimitSize(), backlogSize);
         }
         ManagedCursor previousSlowestConsumer = null;
         while (backlogSize > targetSize) {
@@ -165,13 +166,17 @@ public class BacklogQuotaManager {
 
             if (slowestConsumer == previousSlowestConsumer) {
                 log.info("[{}] Cursors not progressing, target size is [{}] 
for quota limit [{}], backlog size is [{}]",
-                        persistentTopic.getName(), targetSize, targetSize / 
reductionFactor, backlogSize);
+                        persistentTopic.getName(), targetSize, 
quota.getLimitSize(), backlogSize);
                 break;
             }
 
             // Calculate number of messages to be skipped using the current 
backlog and the skip factor.
             long entriesInBacklog = 
slowestConsumer.getNumberOfEntriesInBacklog(false);
-            int messagesToSkip = (int) (messageSkipFactor * entriesInBacklog);
+
+            int messagesToSkip = computeEntriesToEvict(
+                    backlogSize,
+                    quota.getLimitSize(),
+                    entriesInBacklog);
             try {
                 // If there are no messages to skip, break out of the loop
                 if (messagesToSkip == 0) {
@@ -186,6 +191,7 @@ public class BacklogQuotaManager {
                             persistentTopic.getName(), messagesToSkip, 
slowestConsumer.getName(), entriesInBacklog);
                 }
                 slowestConsumer.skipEntries(messagesToSkip, 
IndividualDeletedEntries.Include);
+                markDeletePositionMoveForward(persistentTopic, 
slowestConsumer);
             } catch (Exception e) {
                 log.error("[{}] Error skipping [{}] messages from slowest 
consumer [{}]", persistentTopic.getName(),
                         messagesToSkip, slowestConsumer.getName(), e);
@@ -213,9 +219,7 @@ public class BacklogQuotaManager {
                                          boolean 
preciseTimeBasedBacklogQuotaCheck) {
         // If enabled precise time based backlog quota check, will expire 
message based on the timeBaseQuota
         if (preciseTimeBasedBacklogQuotaCheck) {
-            // Set the reduction factor to 90%. The aim is to drop down the 
backlog to 90% of the quota limit.
-            double reductionFactor = 0.9;
-            int target = (int) (reductionFactor * quota.getLimitTime());
+            int target = (int) computeEvictionTarget(quota.getLimitTime());
             if (log.isDebugEnabled()) {
                 log.debug("[{}] target backlog expire time is [{}]", 
persistentTopic.getName(), target);
             }
@@ -244,6 +248,7 @@ public class BacklogQuotaManager {
                         long ledgerId = 
mLedger.getLedgersInfo().ceilingKey(oldestPosition.getLedgerId() + 1);
                         Position nextPosition = 
PositionFactory.create(ledgerId, -1);
                         slowestConsumer.markDelete(nextPosition);
+                        markDeletePositionMoveForward(persistentTopic, 
slowestConsumer);
                         continue;
                     }
                     // Timestamp only > 0 if ledger has been closed
@@ -254,6 +259,7 @@ public class BacklogQuotaManager {
                         Position nextPosition = 
PositionFactory.create(ledgerId, -1);
                         if (!nextPosition.equals(oldestPosition)) {
                             slowestConsumer.markDelete(nextPosition);
+                            markDeletePositionMoveForward(persistentTopic, 
slowestConsumer);
                             continue;
                         }
                     }
@@ -314,4 +320,48 @@ public class BacklogQuotaManager {
         // We may need to check other system cursors here : replicator, 
compaction
         return false;
     }
+
+    /**
+     * Invoke {@link Dispatcher#markDeletePositionMoveForward()} for the 
subscription that owns the given cursor.
+     * This ensures pending acks and redelivery state are cleaned up when the 
cursor is advanced by
+     * backlog quota eviction (bypassing the subscription-level wrappers that 
normally fire this hook).
+     *
+     * @param persistentTopic the topic
+     * @param cursor the cursor that was advanced
+     */
+    private void markDeletePositionMoveForward(PersistentTopic 
persistentTopic, ManagedCursor cursor) {
+        PersistentSubscription subscription =
+                
persistentTopic.getSubscriptions().get(Codec.decode(cursor.getName()));
+        if (subscription != null && subscription.getDispatcher() != null) {
+            subscription.getDispatcher().markDeletePositionMoveForward();
+        }
+    }
+
+
+    /**
+     * Compute the target value after backlog eviction.
+     *
+     * @param quotaLimit configured quota limit
+     * @return target value after eviction
+     */
+    private static long computeEvictionTarget(long quotaLimit) {
+        double factor = 0.9;
+        return (long) (factor * quotaLimit);
+    }
+
+    /**
+     * Compute the number of entries to evict in a single eviction iteration.
+     *
+     * @param currentValue current backlog value
+     * @param quotaLimit configured quota limit
+     * @param totalEntries total entries in backlog
+     * @return entries to evict
+     */
+    @VisibleForTesting
+    static int computeEntriesToEvict(
+            long currentValue, long quotaLimit, long totalEntries) {
+        long evictionTarget = computeEvictionTarget(quotaLimit);
+        return (int) ((currentValue - evictionTarget)
+                * (double) totalEntries / currentValue);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 19383d50f2e..ccecf37486d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static java.util.Map.entry;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.pulsar.broker.service.BacklogQuotaManager.computeEntriesToEvict;
 import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongGaugeValue;
 import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static 
org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType.destination_storage;
@@ -55,6 +56,7 @@ import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil;
 import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
@@ -71,6 +73,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -2174,4 +2177,130 @@ public class BacklogQuotaManagerTest {
         assertTrue(stats.getBacklogSize() < 10 * 1024, "Storage size is [" + 
stats.getStorageSize() + "]");
     }
     private static final Logger LOG = 
LoggerFactory.getLogger(BacklogQuotaManagerTest.class);
+
+    private void assertPendingAcks(org.apache.pulsar.broker.service.Consumer 
consumer, int expected) {
+        PendingAcksMap pendingAcks = consumer.getPendingAcks();
+        assertThat(pendingAcks).isNotNull();
+        assertThat(pendingAcks.size()).isEqualTo(expected);
+        assertThat(consumer.getUnackedMessages()).isEqualTo(expected);
+    }
+
+    @Test
+    public void testConsumerBacklogEvictionSizeQuotaCleansPendingAcks() throws 
Exception {
+        final int msgSize = 1024;
+        final int quotaSizeLimit = 10 * 1024;
+        final int numMsgs = 20;
+
+        admin.namespaces().setBacklogQuota("prop/ns-quota",
+                BacklogQuota.builder()
+                        .limitSize(quotaSizeLimit)
+                        
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+                        .build());
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(adminUrl.toString())
+                .build();
+
+        final String topic =
+                
BrokerTestUtil.newUniqueName("persistent://prop/ns-quota/topic-pending-acks-size");
+        final String subName = "key-shared-sub";
+
+        @Cleanup
+        Consumer<byte[]> consumer = client.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = createProducer(client, topic);
+
+        byte[] content = new byte[msgSize];
+        for (int i = 0; i < numMsgs; i++) {
+            producer.send(content);
+        }
+
+        // Receive all messages but don't ack — pending acks accumulate.
+        for (int i = 0; i < numMsgs; i++) {
+            consumer.receive();
+        }
+
+        PersistentTopic topicRef =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic).get();
+        PersistentSubscription sub = topicRef.getSubscription(subName);
+
+        org.apache.pulsar.broker.service.Consumer brokerConsumer = 
sub.getDispatcher().getConsumers().get(0);
+        assertThat(sub).isNotNull();
+        assertPendingAcks(brokerConsumer, numMsgs);
+
+        int expectedRemaining = numMsgs - computeEntriesToEvict(
+                (long) numMsgs * msgSize,
+                quotaSizeLimit,
+                numMsgs);
+
+        Awaitility.await()
+                .pollDelay(TIME_TO_CHECK_BACKLOG_QUOTA + 1, SECONDS)
+                .pollInterval(1, SECONDS)
+                .untilAsserted(() -> assertPendingAcks(brokerConsumer, 
expectedRemaining));
+    }
+
+    @Test
+    public void 
testConsumerBacklogEvictionTimeQuotaNotPreciseCleansPendingAcks()
+            throws Exception {
+        admin.namespaces().setBacklogQuota("prop/ns-quota",
+                BacklogQuota.builder()
+                        .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
+                        
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+                        .build(), message_age);
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(adminUrl.toString())
+                .build();
+
+        final String topic =
+                
BrokerTestUtil.newUniqueName("persistent://prop/ns-quota/topic-pending-acks-time");
+        final String subName = "key-shared-sub-time";
+        final int numMsgs = 14;
+
+        @Cleanup
+        Consumer<byte[]> consumer = client.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = createProducer(client, topic);
+
+        byte[] content = new byte[1024];
+        for (int i = 0; i < numMsgs; i++) {
+            producer.send(content);
+        }
+
+        // Receive all messages but don't ack — pending acks accumulate.
+        for (int i = 0; i < numMsgs; i++) {
+            consumer.receive();
+        }
+
+        PersistentTopic topicRef =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic).get();
+        PersistentSubscription sub = topicRef.getSubscription(subName);
+
+        org.apache.pulsar.broker.service.Consumer brokerConsumer = 
sub.getDispatcher().getConsumers().get(0);
+        assertThat(sub).isNotNull();
+
+        assertPendingAcks(brokerConsumer, numMsgs);
+
+        // Non-precise eviction removes whole closed ledgers only.
+        // With MAX_ENTRIES_PER_LEDGER=5 and 14 entries:
+        // ledgers are [5, 5, 4]. The last ledger remains open and is not 
evicted.
+        int expectedRemaining = numMsgs % MAX_ENTRIES_PER_LEDGER;
+
+        Awaitility.await()
+                .pollDelay(TIME_TO_CHECK_BACKLOG_QUOTA * 2, SECONDS)
+                .pollInterval(1, SECONDS)
+                .untilAsserted(() -> assertPendingAcks(brokerConsumer, 
expectedRemaining));
+    }
 }

Reply via email to