This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e3419ce67b369f203c388a041cc57ca3317c0812 Author: Zixuan Liu <[email protected]> AuthorDate: Thu Apr 30 15:19:53 2026 +0800 [fix][broker] Close pending acks cleanup gap in BacklogQuotaManager (#25624) (cherry-pick from commit d861e15a1ffd1ed0f622d5659bfe8bd9dffd2300) --- .../pulsar/broker/service/BacklogQuotaManager.java | 68 +++++++++-- .../broker/service/BacklogQuotaManagerTest.java | 129 +++++++++++++++++++++ 2 files changed, 188 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index 15f218ce216..6dd312bb478 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static java.util.concurrent.TimeUnit.SECONDS; +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -33,12 +34,14 @@ import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.resources.NamespaceResources; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopicMetrics.BacklogQuotaMetrics; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl; +import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -137,9 +140,7 @@ public class BacklogQuotaManager { * Backlog quota set for the topic */ private void dropBacklogForSizeLimit(PersistentTopic persistentTopic, BacklogQuota quota) { - // Set the reduction factor to 90%. The aim is to drop down the backlog to 90% of the quota limit. - double reductionFactor = 0.9; - double targetSize = reductionFactor * quota.getLimitSize(); + long targetSize = computeEvictionTarget(quota.getLimitSize()); // Get estimated unconsumed size for the managed ledger associated with this topic. Estimated size is more // useful than the actual storage size. Actual storage size gets updated only when managed ledger is trimmed. @@ -148,7 +149,7 @@ public class BacklogQuotaManager { if (log.isDebugEnabled()) { log.debug("[{}] target size is [{}] for quota limit [{}], backlog size is [{}]", persistentTopic.getName(), - targetSize, targetSize / reductionFactor, backlogSize); + targetSize, quota.getLimitSize(), backlogSize); } ManagedCursor previousSlowestConsumer = null; while (backlogSize > targetSize) { @@ -165,13 +166,17 @@ public class BacklogQuotaManager { if (slowestConsumer == previousSlowestConsumer) { log.info("[{}] Cursors not progressing, target size is [{}] for quota limit [{}], backlog size is [{}]", - persistentTopic.getName(), targetSize, targetSize / reductionFactor, backlogSize); + persistentTopic.getName(), targetSize, quota.getLimitSize(), backlogSize); break; } // Calculate number of messages to be skipped using the current backlog and the skip factor. long entriesInBacklog = slowestConsumer.getNumberOfEntriesInBacklog(false); - int messagesToSkip = (int) (messageSkipFactor * entriesInBacklog); + + int messagesToSkip = computeEntriesToEvict( + backlogSize, + quota.getLimitSize(), + entriesInBacklog); try { // If there are no messages to skip, break out of the loop if (messagesToSkip == 0) { @@ -186,6 +191,7 @@ public class BacklogQuotaManager { persistentTopic.getName(), messagesToSkip, slowestConsumer.getName(), entriesInBacklog); } slowestConsumer.skipEntries(messagesToSkip, IndividualDeletedEntries.Include); + markDeletePositionMoveForward(persistentTopic, slowestConsumer); } catch (Exception e) { log.error("[{}] Error skipping [{}] messages from slowest consumer [{}]", persistentTopic.getName(), messagesToSkip, slowestConsumer.getName(), e); @@ -213,9 +219,7 @@ public class BacklogQuotaManager { boolean preciseTimeBasedBacklogQuotaCheck) { // If enabled precise time based backlog quota check, will expire message based on the timeBaseQuota if (preciseTimeBasedBacklogQuotaCheck) { - // Set the reduction factor to 90%. The aim is to drop down the backlog to 90% of the quota limit. - double reductionFactor = 0.9; - int target = (int) (reductionFactor * quota.getLimitTime()); + int target = (int) computeEvictionTarget(quota.getLimitTime()); if (log.isDebugEnabled()) { log.debug("[{}] target backlog expire time is [{}]", persistentTopic.getName(), target); } @@ -244,6 +248,7 @@ public class BacklogQuotaManager { long ledgerId = mLedger.getLedgersInfo().ceilingKey(oldestPosition.getLedgerId() + 1); Position nextPosition = PositionFactory.create(ledgerId, -1); slowestConsumer.markDelete(nextPosition); + markDeletePositionMoveForward(persistentTopic, slowestConsumer); continue; } // Timestamp only > 0 if ledger has been closed @@ -254,6 +259,7 @@ public class BacklogQuotaManager { Position nextPosition = PositionFactory.create(ledgerId, -1); if (!nextPosition.equals(oldestPosition)) { slowestConsumer.markDelete(nextPosition); + markDeletePositionMoveForward(persistentTopic, slowestConsumer); continue; } } @@ -314,4 +320,48 @@ public class BacklogQuotaManager { // We may need to check other system cursors here : replicator, compaction return false; } + + /** + * Invoke {@link Dispatcher#markDeletePositionMoveForward()} for the subscription that owns the given cursor. + * This ensures pending acks and redelivery state are cleaned up when the cursor is advanced by + * backlog quota eviction (bypassing the subscription-level wrappers that normally fire this hook). + * + * @param persistentTopic the topic + * @param cursor the cursor that was advanced + */ + private void markDeletePositionMoveForward(PersistentTopic persistentTopic, ManagedCursor cursor) { + PersistentSubscription subscription = + persistentTopic.getSubscriptions().get(Codec.decode(cursor.getName())); + if (subscription != null && subscription.getDispatcher() != null) { + subscription.getDispatcher().markDeletePositionMoveForward(); + } + } + + + /** + * Compute the target value after backlog eviction. + * + * @param quotaLimit configured quota limit + * @return target value after eviction + */ + private static long computeEvictionTarget(long quotaLimit) { + double factor = 0.9; + return (long) (factor * quotaLimit); + } + + /** + * Compute the number of entries to evict in a single eviction iteration. + * + * @param currentValue current backlog value + * @param quotaLimit configured quota limit + * @param totalEntries total entries in backlog + * @return entries to evict + */ + @VisibleForTesting + static int computeEntriesToEvict( + long currentValue, long quotaLimit, long totalEntries) { + long evictionTarget = computeEvictionTarget(quotaLimit); + return (int) ((currentValue - evictionTarget) + * (double) totalEntries / currentValue); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 19383d50f2e..ccecf37486d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service; import static java.util.Map.entry; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.pulsar.broker.service.BacklogQuotaManager.computeEntriesToEvict; import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongGaugeValue; import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType.destination_storage; @@ -55,6 +56,7 @@ import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil; import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; @@ -71,6 +73,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ClusterData; @@ -2174,4 +2177,130 @@ public class BacklogQuotaManagerTest { assertTrue(stats.getBacklogSize() < 10 * 1024, "Storage size is [" + stats.getStorageSize() + "]"); } private static final Logger LOG = LoggerFactory.getLogger(BacklogQuotaManagerTest.class); + + private void assertPendingAcks(org.apache.pulsar.broker.service.Consumer consumer, int expected) { + PendingAcksMap pendingAcks = consumer.getPendingAcks(); + assertThat(pendingAcks).isNotNull(); + assertThat(pendingAcks.size()).isEqualTo(expected); + assertThat(consumer.getUnackedMessages()).isEqualTo(expected); + } + + @Test + public void testConsumerBacklogEvictionSizeQuotaCleansPendingAcks() throws Exception { + final int msgSize = 1024; + final int quotaSizeLimit = 10 * 1024; + final int numMsgs = 20; + + admin.namespaces().setBacklogQuota("prop/ns-quota", + BacklogQuota.builder() + .limitSize(quotaSizeLimit) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build()); + + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(adminUrl.toString()) + .build(); + + final String topic = + BrokerTestUtil.newUniqueName("persistent://prop/ns-quota/topic-pending-acks-size"); + final String subName = "key-shared-sub"; + + @Cleanup + Consumer<byte[]> consumer = client.newConsumer() + .topic(topic) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe(); + + @Cleanup + Producer<byte[]> producer = createProducer(client, topic); + + byte[] content = new byte[msgSize]; + for (int i = 0; i < numMsgs; i++) { + producer.send(content); + } + + // Receive all messages but don't ack — pending acks accumulate. + for (int i = 0; i < numMsgs; i++) { + consumer.receive(); + } + + PersistentTopic topicRef = + (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); + PersistentSubscription sub = topicRef.getSubscription(subName); + + org.apache.pulsar.broker.service.Consumer brokerConsumer = sub.getDispatcher().getConsumers().get(0); + assertThat(sub).isNotNull(); + assertPendingAcks(brokerConsumer, numMsgs); + + int expectedRemaining = numMsgs - computeEntriesToEvict( + (long) numMsgs * msgSize, + quotaSizeLimit, + numMsgs); + + Awaitility.await() + .pollDelay(TIME_TO_CHECK_BACKLOG_QUOTA + 1, SECONDS) + .pollInterval(1, SECONDS) + .untilAsserted(() -> assertPendingAcks(brokerConsumer, expectedRemaining)); + } + + @Test + public void testConsumerBacklogEvictionTimeQuotaNotPreciseCleansPendingAcks() + throws Exception { + admin.namespaces().setBacklogQuota("prop/ns-quota", + BacklogQuota.builder() + .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(), message_age); + + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(adminUrl.toString()) + .build(); + + final String topic = + BrokerTestUtil.newUniqueName("persistent://prop/ns-quota/topic-pending-acks-time"); + final String subName = "key-shared-sub-time"; + final int numMsgs = 14; + + @Cleanup + Consumer<byte[]> consumer = client.newConsumer() + .topic(topic) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe(); + + @Cleanup + Producer<byte[]> producer = createProducer(client, topic); + + byte[] content = new byte[1024]; + for (int i = 0; i < numMsgs; i++) { + producer.send(content); + } + + // Receive all messages but don't ack — pending acks accumulate. + for (int i = 0; i < numMsgs; i++) { + consumer.receive(); + } + + PersistentTopic topicRef = + (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); + PersistentSubscription sub = topicRef.getSubscription(subName); + + org.apache.pulsar.broker.service.Consumer brokerConsumer = sub.getDispatcher().getConsumers().get(0); + assertThat(sub).isNotNull(); + + assertPendingAcks(brokerConsumer, numMsgs); + + // Non-precise eviction removes whole closed ledgers only. + // With MAX_ENTRIES_PER_LEDGER=5 and 14 entries: + // ledgers are [5, 5, 4]. The last ledger remains open and is not evicted. + int expectedRemaining = numMsgs % MAX_ENTRIES_PER_LEDGER; + + Awaitility.await() + .pollDelay(TIME_TO_CHECK_BACKLOG_QUOTA * 2, SECONDS) + .pollInterval(1, SECONDS) + .untilAsserted(() -> assertPendingAcks(brokerConsumer, expectedRemaining)); + } }
