This is an automated email from the ASF dual-hosted git repository. coderzc pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 22637effb895aec8722f5b30227519671c7b1d5a Author: Zixuan Liu <[email protected]> AuthorDate: Mon Apr 27 20:50:37 2026 +0800 [fix][broker] Decrement unacked counter when removeAllUpTo removes pending acks (#25581) --- .../org/apache/pulsar/broker/service/Consumer.java | 29 ++++ .../pulsar/broker/service/PendingAcksMap.java | 22 ++- .../PersistentDispatcherMultipleConsumers.java | 4 +- .../pulsar/broker/service/PendingAcksMapTest.java | 31 +++- .../pulsar/broker/stats/ConsumerStatsTest.java | 166 +++++++++++++++++++++ .../pulsar/client/api/ProducerConsumerBase.java | 3 +- 6 files changed, 243 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 1038baf6ba2..c0b27b22382 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -1102,6 +1102,35 @@ public class Consumer { return pendingAcks; } + /** + * Remove all pending acks up to the given mark-delete position and decrement the consumer's unacked message + * counter by the remaining unacked count for each removed entry. + * + * <p>This is used when the cursor's mark-delete position advances past entries that are still in the consumer's + * pending acks. The remaining unacked count accounts for batch index level acknowledgments — only the truly + * unacked batch indexes are decremented. + * + * @param markDeleteLedgerId the ledger ID up to which to remove pending acks + * @param markDeleteEntryId the entry ID up to which to remove pending acks + */ + public void removePendingAcksUpToPositionAndDecrementUnacked(long markDeleteLedgerId, long markDeleteEntryId) { + if (pendingAcks == null) { + return; + } + + MutableInt mutableTotalUnacked = new MutableInt(0); + pendingAcks.removeAllUpTo(markDeleteLedgerId, markDeleteEntryId, + (ledgerId, entryId, batchSize, stickyKeyHash) -> { + mutableTotalUnacked.add((int) getUnAckedCountForBatchIndexLevelEnabled( + PositionFactory.create(ledgerId, entryId), batchSize)); + }); + int totalUnacked = mutableTotalUnacked.intValue(); + if (totalUnacked > 0) { + addAndGetUnAckedMsgs(this, -totalUnacked); + updateBlockedConsumerOnUnackedMsgs(this); + } + } + public int getPriorityLevel() { return priorityLevel; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java index 7a728a037dc..0f7802d8294 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java @@ -326,13 +326,16 @@ public class PendingAcksMap { } /** - * Remove all pending acks up to the given ledger ID and entry ID. + * Remove all pending acks up to the given ledger ID and entry ID, invoking a callback for each removed entry. * * @param markDeleteLedgerId the ledger ID up to which to remove pending acks * @param markDeleteEntryId the entry ID up to which to remove pending acks + * @param removedEntryCallback optional callback invoked for each removed entry (within the write lock), + * receiving ledgerId, entryId, batchSize, and stickyKeyHash */ - public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) { - internalRemoveAllUpTo(markDeleteLedgerId, markDeleteEntryId, false); + public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId, + PendingAcksConsumer removedEntryCallback) { + internalRemoveAllUpTo(markDeleteLedgerId, markDeleteEntryId, false, removedEntryCallback); } /** @@ -345,8 +348,10 @@ public class PendingAcksMap { * @param markDeleteLedgerId the ledger ID up to which to remove pending acks * @param markDeleteEntryId the entry ID up to which to remove pending acks * @param useWriteLock true if the method should use a write lock, false otherwise + * @param removedEntryCallback optional callback invoked for each removed entry (within the write lock) */ - private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntryId, boolean useWriteLock) { + private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntryId, boolean useWriteLock, + PendingAcksConsumer removedEntryCallback) { PendingAcksRemoveHandler pendingAcksRemoveHandler = pendingAcksRemoveHandlerSupplier.get(); // track if the write lock was acquired boolean acquiredWriteLock = false; @@ -382,14 +387,19 @@ public class PendingAcksMap { retryWithWriteLock = true; return; } + IntIntPair value = intIntPairEntry.getValue(); + int batchSize = value.leftInt(); + int stickyKeyHash = value.rightInt(); if (pendingAcksRemoveHandler != null) { if (!batchStarted) { pendingAcksRemoveHandler.startBatch(); batchStarted = true; } - int stickyKeyHash = intIntPairEntry.getValue().rightInt(); pendingAcksRemoveHandler.handleRemoving(consumer, ledgerId, entryId, stickyKeyHash, closed); } + if (removedEntryCallback != null) { + removedEntryCallback.accept(ledgerId, entryId, batchSize, stickyKeyHash); + } entryMapIterator.remove(); } if (ledgerMap.isEmpty()) { @@ -409,7 +419,7 @@ public class PendingAcksMap { } else { readLock.unlock(); if (retryWithWriteLock) { - internalRemoveAllUpTo(markDeleteLedgerId, markDeleteEntryId, true); + internalRemoveAllUpTo(markDeleteLedgerId, markDeleteEntryId, true, removedEntryCallback); } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 7d578fbe3be..59fd48a8079 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -363,8 +363,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractPersistentDis if (lastMarkDeletePositionBeforeReadMoreEntries != markDeletePosition) { redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()); for (Consumer consumer : consumerList) { - consumer.getPendingAcks() - .removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()); + consumer.removePendingAcksUpToPositionAndDecrementUnacked( + markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()); } lastMarkDeletePositionBeforeReadMoreEntries = markDeletePosition; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java index 42f5935ca88..8db0e3a0f73 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java @@ -117,7 +117,8 @@ public class PendingAcksMapTest { pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124); pendingAcksMap.addPendingAckIfAllowed(2L, 1L, 1, 125); - pendingAcksMap.removeAllUpTo(1L, 2L); + pendingAcksMap.removeAllUpTo(1L, 2L, (ledgerId, entryId, batchSize, stickyKeyHash) -> { + }); assertFalse(pendingAcksMap.contains(1L, 1L)); assertFalse(pendingAcksMap.contains(1L, 2L)); @@ -134,7 +135,8 @@ public class PendingAcksMapTest { pendingAcksMap.addPendingAckIfAllowed(2L, 2L, 1, 126); pendingAcksMap.addPendingAckIfAllowed(3L, 1L, 1, 127); - pendingAcksMap.removeAllUpTo(2L, 1L); + pendingAcksMap.removeAllUpTo(2L, 1L, (ledgerId, entryId, batchSize, stickyKeyHash) -> { + }); assertFalse(pendingAcksMap.contains(1L, 1L)); assertFalse(pendingAcksMap.contains(1L, 2L)); @@ -176,13 +178,36 @@ public class PendingAcksMapTest { pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124); pendingAcksMap.addPendingAckIfAllowed(2L, 1L, 1, 125); - pendingAcksMap.removeAllUpTo(1L, 2L); + pendingAcksMap.removeAllUpTo(1L, 2L, (ledgerId, entryId, batchSize, stickyKeyHash) -> { + }); verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false); verify(removeHandler).handleRemoving(consumer, 1L, 2L, 124, false); verify(removeHandler, never()).handleRemoving(consumer, 2L, 1L, 125, false); } + @Test + public void removeAllUpToWithCallback_InvokesCallbackForEachRemovedEntry() { + Consumer consumer = createMockConsumer("consumer1"); + PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> null, () -> null); + pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 3, 123); + pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 5, 124); + pendingAcksMap.addPendingAckIfAllowed(2L, 1L, 7, 125); + + List<int[]> callbackInvocations = new ArrayList<>(); + pendingAcksMap.removeAllUpTo(1L, 2L, + (ledgerId, entryId, batchSize, stickyKeyHash) -> { + callbackInvocations.add(new int[]{(int) ledgerId, (int) entryId, batchSize, stickyKeyHash}); + }); + + assertEquals(callbackInvocations.size(), 2); + assertEquals(callbackInvocations.get(0), new int[]{1, 1, 3, 123}); + assertEquals(callbackInvocations.get(1), new int[]{1, 2, 5, 124}); + assertFalse(pendingAcksMap.contains(1L, 1L)); + assertFalse(pendingAcksMap.contains(1L, 2L)); + assertTrue(pendingAcksMap.contains(2L, 1L)); + } + @Test public void size_ReturnsCorrectSize() { Consumer consumer = createMockConsumer("consumer1"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index ae2da5b6598..6e6b6d31f6e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -59,6 +59,7 @@ import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; import org.apache.pulsar.broker.service.StickyKeyDispatcher; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.broker.service.plugin.EntryFilterProducerTest; @@ -106,6 +107,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase { @Override protected ServiceConfiguration getDefaultConf() { ServiceConfiguration conf = super.getDefaultConf(); + conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); conf.setMaxUnackedMessagesPerConsumer(0); // wait for shutdown of the broker, this prevents flakiness which could be caused by metrics being // unregistered asynchronously. This impacts the execution of the next test method if this would be happening. @@ -727,6 +729,170 @@ public class ConsumerStatsTest extends ProducerConsumerBase { } + @DataProvider(name = "subscriptionTypes") + public Object[][] subscriptionTypes() { + return new Object[][]{ + {SubscriptionType.Shared}, + {SubscriptionType.Key_Shared} + }; + } + + /** + * Verify unacked count is correctly decremented when removeAllUpTo removes non-batch + * entries from pendingAcks after mark-delete advances via message expiry. + */ + @Test(dataProvider = "subscriptionTypes") + public void testUnackedCountNonBatchAfterExpire(SubscriptionType subType) throws Exception { + String topic = newTopicName(); + String sub = "sub"; + int numMessages = 10; + + @Cleanup Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic).enableBatching(false).create(); + @Cleanup Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topic).subscriptionName(sub) + .subscriptionType(subType) + .subscribe(); + + for (int i = 0; i < numMessages; i++) { + producer.send(("msg-" + i).getBytes()); + } + + org.apache.pulsar.broker.service.Consumer svcConsumer = + getTheUniqueServiceConsumer(topic, sub); + for (int i = 0; i < numMessages; i++) { + Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS); + Assert.assertNotNull(msg, "Expected to receive message " + i); + } + + Awaitility.await().untilAsserted(() -> + assertEquals(numMessages, svcConsumer.getUnackedMessages())); + + expireAndVerifyUnackedDrained(topic, sub, producer, consumer, svcConsumer); + } + + /** + * Verify unacked count is correctly decremented when removeAllUpTo removes batch + * entries from pendingAcks after mark-delete advances via message expiry. + */ + @Test(dataProvider = "subscriptionTypes") + public void testUnackedCountBatchAfterExpire(SubscriptionType subType) throws Exception { + String topic = newTopicName(); + String sub = "sub"; + int numMessages = 10; + + @Cleanup Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .batchingMaxMessages(20) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .enableBatching(true) + .create(); + @Cleanup Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topic).subscriptionName(sub) + .subscriptionType(subType) + .subscribe(); + + for (int i = 0; i < numMessages; i++) { + producer.newMessage().value(("batch-" + i).getBytes()).sendAsync(); + } + producer.flush(); + + for (int i = 0; i < numMessages; i++) { + Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS); + Assert.assertNotNull(msg, "Expected to receive message " + i); + } + + org.apache.pulsar.broker.service.Consumer svcConsumer = + getTheUniqueServiceConsumer(topic, sub); + + Awaitility.await().untilAsserted(() -> + assertEquals(numMessages, svcConsumer.getUnackedMessages())); + + expireAndVerifyUnackedDrained(topic, sub, producer, consumer, svcConsumer); + } + + /** + * Verify unacked count is correctly decremented when removeAllUpTo removes a partially-acked + * batch entry from pendingAcks after mark-delete advances via message expiry. + * + * <p>Flow: produce batch(batchSize=10) → consume all → ack 5 of 10 → expire → unacked should be 0. + */ + @Test(dataProvider = "subscriptionTypes") + public void testUnackedCountBatchPartialAckAfterExpire(SubscriptionType subType) throws Exception { + String topic = newTopicName(); + String sub = "sub"; + int numMessages = 10; + int ackCount = 5; + + @Cleanup Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .batchingMaxMessages(20) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .enableBatching(true) + .create(); + @Cleanup Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(sub) + .enableBatchIndexAcknowledgment(true) + .subscriptionType(subType) + .subscribe(); + + for (int i = 0; i < numMessages; i++) { + producer.newMessage().value(("batch-" + i).getBytes()).sendAsync(); + } + producer.flush(); + + List<Message<byte[]>> messages = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS); + Assert.assertNotNull(msg, "Expected to receive message " + i); + messages.add(msg); + } + + org.apache.pulsar.broker.service.Consumer svcConsumer = + getTheUniqueServiceConsumer(topic, sub); + + Awaitility.await().untilAsserted(() -> + assertEquals(numMessages, svcConsumer.getUnackedMessages())); + + // Partially ack — ack 5 of 10 batch indexes + for (int i = 0; i < ackCount; i++) { + consumer.acknowledge(messages.get(i)); + } + Awaitility.await().untilAsserted(() -> + assertEquals(numMessages - ackCount, svcConsumer.getUnackedMessages())); + + expireAndVerifyUnackedDrained(topic, sub, producer, consumer, svcConsumer); + } + + private void expireAndVerifyUnackedDrained(String topic, String sub, + Producer<byte[]> producer, Consumer<byte[]> consumer, + org.apache.pulsar.broker.service.Consumer svcConsumer) + throws Exception { + PersistentTopic pTopic = (PersistentTopic) pulsar.getBrokerService() + .getTopicReference(topic).get(); + + Thread.sleep(1100); + pTopic.getSubscription(sub).expireMessagesAsync(1).get(); + + // Trigger readMoreEntries to invoke removeAllUpTo + producer.send("trigger".getBytes()); + Message<byte[]> triggerMsg = consumer.receive(2, TimeUnit.SECONDS); + Assert.assertNotNull(triggerMsg); + consumer.acknowledge(triggerMsg); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> + assertEquals(0, svcConsumer.getUnackedMessages())); + } + + private org.apache.pulsar.broker.service.Consumer getTheUniqueServiceConsumer(String topic, String sub) { + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); + AbstractPersistentDispatcherMultipleConsumers dispatcher = + (AbstractPersistentDispatcherMultipleConsumers) persistentTopic.getSubscription(sub).getDispatcher(); + return dispatcher.getConsumers().iterator().next(); + } + private String findConsumerNameForHash(SubscriptionStats subscriptionStats, int hash) { return findConsumerForHash(subscriptionStats, hash).map(ConsumerStats::getConsumerName).orElse(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java index 01d2f107dff..1a963b523c2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.function.BiFunction; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.testng.Assert; @@ -71,7 +72,7 @@ public abstract class ProducerConsumerBase extends MockedPulsarServiceBaseTest { private static final Random random = new Random(); protected String newTopicName() { - return "my-property/my-ns/topic-" + Long.toHexString(random.nextLong()); + return TopicName.get("my-property/my-ns/topic-" + Long.toHexString(random.nextLong())).toString(); } protected <T> ReceivedMessages<T> receiveAndAckMessages(
