This is an automated email from the ASF dual-hosted git repository.

coderzc pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 22637effb895aec8722f5b30227519671c7b1d5a
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon Apr 27 20:50:37 2026 +0800

    [fix][broker] Decrement unacked counter when removeAllUpTo removes pending 
acks (#25581)
---
 .../org/apache/pulsar/broker/service/Consumer.java |  29 ++++
 .../pulsar/broker/service/PendingAcksMap.java      |  22 ++-
 .../PersistentDispatcherMultipleConsumers.java     |   4 +-
 .../pulsar/broker/service/PendingAcksMapTest.java  |  31 +++-
 .../pulsar/broker/stats/ConsumerStatsTest.java     | 166 +++++++++++++++++++++
 .../pulsar/client/api/ProducerConsumerBase.java    |   3 +-
 6 files changed, 243 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 1038baf6ba2..c0b27b22382 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -1102,6 +1102,35 @@ public class Consumer {
         return pendingAcks;
     }
 
+    /**
+     * Remove all pending acks up to the given mark-delete position and 
decrement the consumer's unacked message
+     * counter by the remaining unacked count for each removed entry.
+     *
+     * <p>This is used when the cursor's mark-delete position advances past 
entries that are still in the consumer's
+     * pending acks. The remaining unacked count accounts for batch index 
level acknowledgments — only the truly
+     * unacked batch indexes are decremented.
+     *
+     * @param markDeleteLedgerId the ledger ID up to which to remove pending 
acks
+     * @param markDeleteEntryId the entry ID up to which to remove pending acks
+     */
+    public void removePendingAcksUpToPositionAndDecrementUnacked(long 
markDeleteLedgerId, long markDeleteEntryId) {
+        if (pendingAcks == null) {
+            return;
+        }
+
+        MutableInt mutableTotalUnacked = new MutableInt(0);
+        pendingAcks.removeAllUpTo(markDeleteLedgerId, markDeleteEntryId,
+                (ledgerId, entryId, batchSize, stickyKeyHash) -> {
+                    mutableTotalUnacked.add((int) 
getUnAckedCountForBatchIndexLevelEnabled(
+                            PositionFactory.create(ledgerId, entryId), 
batchSize));
+                });
+        int totalUnacked = mutableTotalUnacked.intValue();
+        if (totalUnacked > 0) {
+            addAndGetUnAckedMsgs(this, -totalUnacked);
+            updateBlockedConsumerOnUnackedMsgs(this);
+        }
+    }
+
     public int getPriorityLevel() {
         return priorityLevel;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
index 7a728a037dc..0f7802d8294 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
@@ -326,13 +326,16 @@ public class PendingAcksMap {
     }
 
     /**
-     * Remove all pending acks up to the given ledger ID and entry ID.
+     * Remove all pending acks up to the given ledger ID and entry ID, 
invoking a callback for each removed entry.
      *
      * @param markDeleteLedgerId the ledger ID up to which to remove pending 
acks
      * @param markDeleteEntryId the entry ID up to which to remove pending acks
+     * @param removedEntryCallback optional callback invoked for each removed 
entry (within the write lock),
+     *                             receiving ledgerId, entryId, batchSize, and 
stickyKeyHash
      */
-    public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) 
{
-        internalRemoveAllUpTo(markDeleteLedgerId, markDeleteEntryId, false);
+    public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId,
+                             PendingAcksConsumer removedEntryCallback) {
+        internalRemoveAllUpTo(markDeleteLedgerId, markDeleteEntryId, false, 
removedEntryCallback);
     }
 
     /**
@@ -345,8 +348,10 @@ public class PendingAcksMap {
      * @param markDeleteLedgerId the ledger ID up to which to remove pending 
acks
      * @param markDeleteEntryId the entry ID up to which to remove pending acks
      * @param useWriteLock true if the method should use a write lock, false 
otherwise
+     * @param removedEntryCallback optional callback invoked for each removed 
entry (within the write lock)
      */
-    private void internalRemoveAllUpTo(long markDeleteLedgerId, long 
markDeleteEntryId, boolean useWriteLock) {
+    private void internalRemoveAllUpTo(long markDeleteLedgerId, long 
markDeleteEntryId, boolean useWriteLock,
+                                      PendingAcksConsumer 
removedEntryCallback) {
         PendingAcksRemoveHandler pendingAcksRemoveHandler = 
pendingAcksRemoveHandlerSupplier.get();
         // track if the write lock was acquired
         boolean acquiredWriteLock = false;
@@ -382,14 +387,19 @@ public class PendingAcksMap {
                         retryWithWriteLock = true;
                         return;
                     }
+                    IntIntPair value = intIntPairEntry.getValue();
+                    int batchSize = value.leftInt();
+                    int stickyKeyHash = value.rightInt();
                     if (pendingAcksRemoveHandler != null) {
                         if (!batchStarted) {
                             pendingAcksRemoveHandler.startBatch();
                             batchStarted = true;
                         }
-                        int stickyKeyHash = 
intIntPairEntry.getValue().rightInt();
                         pendingAcksRemoveHandler.handleRemoving(consumer, 
ledgerId, entryId, stickyKeyHash, closed);
                     }
+                    if (removedEntryCallback != null) {
+                        removedEntryCallback.accept(ledgerId, entryId, 
batchSize, stickyKeyHash);
+                    }
                     entryMapIterator.remove();
                 }
                 if (ledgerMap.isEmpty()) {
@@ -409,7 +419,7 @@ public class PendingAcksMap {
             } else {
                 readLock.unlock();
                 if (retryWithWriteLock) {
-                    internalRemoveAllUpTo(markDeleteLedgerId, 
markDeleteEntryId, true);
+                    internalRemoveAllUpTo(markDeleteLedgerId, 
markDeleteEntryId, true, removedEntryCallback);
                 }
             }
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 7d578fbe3be..59fd48a8079 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -363,8 +363,8 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractPersistentDis
         if (lastMarkDeletePositionBeforeReadMoreEntries != markDeletePosition) 
{
             redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(), 
markDeletePosition.getEntryId());
             for (Consumer consumer : consumerList) {
-                consumer.getPendingAcks()
-                        .removeAllUpTo(markDeletePosition.getLedgerId(), 
markDeletePosition.getEntryId());
+                consumer.removePendingAcksUpToPositionAndDecrementUnacked(
+                        markDeletePosition.getLedgerId(), 
markDeletePosition.getEntryId());
             }
             lastMarkDeletePositionBeforeReadMoreEntries = markDeletePosition;
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
index 42f5935ca88..8db0e3a0f73 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
@@ -117,7 +117,8 @@ public class PendingAcksMapTest {
         pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
         pendingAcksMap.addPendingAckIfAllowed(2L, 1L, 1, 125);
 
-        pendingAcksMap.removeAllUpTo(1L, 2L);
+        pendingAcksMap.removeAllUpTo(1L, 2L, (ledgerId, entryId, batchSize, 
stickyKeyHash) -> {
+        });
 
         assertFalse(pendingAcksMap.contains(1L, 1L));
         assertFalse(pendingAcksMap.contains(1L, 2L));
@@ -134,7 +135,8 @@ public class PendingAcksMapTest {
         pendingAcksMap.addPendingAckIfAllowed(2L, 2L, 1, 126);
         pendingAcksMap.addPendingAckIfAllowed(3L, 1L, 1, 127);
 
-        pendingAcksMap.removeAllUpTo(2L, 1L);
+        pendingAcksMap.removeAllUpTo(2L, 1L, (ledgerId, entryId, batchSize, 
stickyKeyHash) -> {
+        });
 
         assertFalse(pendingAcksMap.contains(1L, 1L));
         assertFalse(pendingAcksMap.contains(1L, 2L));
@@ -176,13 +178,36 @@ public class PendingAcksMapTest {
         pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
         pendingAcksMap.addPendingAckIfAllowed(2L, 1L, 1, 125);
 
-        pendingAcksMap.removeAllUpTo(1L, 2L);
+        pendingAcksMap.removeAllUpTo(1L, 2L, (ledgerId, entryId, batchSize, 
stickyKeyHash) -> {
+        });
 
         verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false);
         verify(removeHandler).handleRemoving(consumer, 1L, 2L, 124, false);
         verify(removeHandler, never()).handleRemoving(consumer, 2L, 1L, 125, 
false);
     }
 
+    @Test
+    public void removeAllUpToWithCallback_InvokesCallbackForEachRemovedEntry() 
{
+        Consumer consumer = createMockConsumer("consumer1");
+        PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> 
null, () -> null);
+        pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 3, 123);
+        pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 5, 124);
+        pendingAcksMap.addPendingAckIfAllowed(2L, 1L, 7, 125);
+
+        List<int[]> callbackInvocations = new ArrayList<>();
+        pendingAcksMap.removeAllUpTo(1L, 2L,
+                (ledgerId, entryId, batchSize, stickyKeyHash) -> {
+                    callbackInvocations.add(new int[]{(int) ledgerId, (int) 
entryId, batchSize, stickyKeyHash});
+                });
+
+        assertEquals(callbackInvocations.size(), 2);
+        assertEquals(callbackInvocations.get(0), new int[]{1, 1, 3, 123});
+        assertEquals(callbackInvocations.get(1), new int[]{1, 2, 5, 124});
+        assertFalse(pendingAcksMap.contains(1L, 1L));
+        assertFalse(pendingAcksMap.contains(1L, 2L));
+        assertTrue(pendingAcksMap.contains(2L, 1L));
+    }
+
     @Test
     public void size_ReturnsCorrectSize() {
         Consumer consumer = createMockConsumer("consumer1");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index ae2da5b6598..6e6b6d31f6e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -59,6 +59,7 @@ import 
org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.StickyKeyDispatcher;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
+import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
 import org.apache.pulsar.broker.service.plugin.EntryFilterProducerTest;
@@ -106,6 +107,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase 
{
     @Override
     protected ServiceConfiguration getDefaultConf() {
         ServiceConfiguration conf = super.getDefaultConf();
+        conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
         conf.setMaxUnackedMessagesPerConsumer(0);
         // wait for shutdown of the broker, this prevents flakiness which 
could be caused by metrics being
         // unregistered asynchronously. This impacts the execution of the next 
test method if this would be happening.
@@ -727,6 +729,170 @@ public class ConsumerStatsTest extends 
ProducerConsumerBase {
 
     }
 
+    @DataProvider(name = "subscriptionTypes")
+    public Object[][] subscriptionTypes() {
+        return new Object[][]{
+                {SubscriptionType.Shared},
+                {SubscriptionType.Key_Shared}
+        };
+    }
+
+    /**
+     * Verify unacked count is correctly decremented when removeAllUpTo 
removes non-batch
+     * entries from pendingAcks after mark-delete advances via message expiry.
+     */
+    @Test(dataProvider = "subscriptionTypes")
+    public void testUnackedCountNonBatchAfterExpire(SubscriptionType subType) 
throws Exception {
+        String topic = newTopicName();
+        String sub = "sub";
+        int numMessages = 10;
+
+        @Cleanup Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic).enableBatching(false).create();
+        @Cleanup Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic).subscriptionName(sub)
+                .subscriptionType(subType)
+                .subscribe();
+
+        for (int i = 0; i < numMessages; i++) {
+            producer.send(("msg-" + i).getBytes());
+        }
+
+        org.apache.pulsar.broker.service.Consumer svcConsumer =
+                getTheUniqueServiceConsumer(topic, sub);
+        for (int i = 0; i < numMessages; i++) {
+            Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
+            Assert.assertNotNull(msg, "Expected to receive message " + i);
+        }
+
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(numMessages, svcConsumer.getUnackedMessages()));
+
+        expireAndVerifyUnackedDrained(topic, sub, producer, consumer, 
svcConsumer);
+    }
+
+    /**
+     * Verify unacked count is correctly decremented when removeAllUpTo 
removes batch
+     * entries from pendingAcks after mark-delete advances via message expiry.
+     */
+    @Test(dataProvider = "subscriptionTypes")
+    public void testUnackedCountBatchAfterExpire(SubscriptionType subType) 
throws Exception {
+        String topic = newTopicName();
+        String sub = "sub";
+        int numMessages = 10;
+
+        @Cleanup Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .batchingMaxMessages(20)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .enableBatching(true)
+                .create();
+        @Cleanup Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic).subscriptionName(sub)
+                .subscriptionType(subType)
+                .subscribe();
+
+        for (int i = 0; i < numMessages; i++) {
+            producer.newMessage().value(("batch-" + i).getBytes()).sendAsync();
+        }
+        producer.flush();
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
+            Assert.assertNotNull(msg, "Expected to receive message " + i);
+        }
+
+        org.apache.pulsar.broker.service.Consumer svcConsumer =
+                getTheUniqueServiceConsumer(topic, sub);
+
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(numMessages, svcConsumer.getUnackedMessages()));
+
+        expireAndVerifyUnackedDrained(topic, sub, producer, consumer, 
svcConsumer);
+    }
+
+    /**
+     * Verify unacked count is correctly decremented when removeAllUpTo 
removes a partially-acked
+     * batch entry from pendingAcks after mark-delete advances via message 
expiry.
+     *
+     * <p>Flow: produce batch(batchSize=10) → consume all → ack 5 of 10 → 
expire → unacked should be 0.
+     */
+    @Test(dataProvider = "subscriptionTypes")
+    public void testUnackedCountBatchPartialAckAfterExpire(SubscriptionType 
subType) throws Exception {
+        String topic = newTopicName();
+        String sub = "sub";
+        int numMessages = 10;
+        int ackCount = 5;
+
+        @Cleanup Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .batchingMaxMessages(20)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .enableBatching(true)
+                .create();
+        @Cleanup Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(sub)
+                .enableBatchIndexAcknowledgment(true)
+                .subscriptionType(subType)
+                .subscribe();
+
+        for (int i = 0; i < numMessages; i++) {
+            producer.newMessage().value(("batch-" + i).getBytes()).sendAsync();
+        }
+        producer.flush();
+
+        List<Message<byte[]>> messages = new ArrayList<>();
+        for (int i = 0; i < numMessages; i++) {
+            Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
+            Assert.assertNotNull(msg, "Expected to receive message " + i);
+            messages.add(msg);
+        }
+
+        org.apache.pulsar.broker.service.Consumer svcConsumer =
+                getTheUniqueServiceConsumer(topic, sub);
+
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(numMessages, svcConsumer.getUnackedMessages()));
+
+        // Partially ack — ack 5 of 10 batch indexes
+        for (int i = 0; i < ackCount; i++) {
+            consumer.acknowledge(messages.get(i));
+        }
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(numMessages - ackCount, 
svcConsumer.getUnackedMessages()));
+
+        expireAndVerifyUnackedDrained(topic, sub, producer, consumer, 
svcConsumer);
+    }
+
+    private void expireAndVerifyUnackedDrained(String topic, String sub,
+                                               Producer<byte[]> producer, 
Consumer<byte[]> consumer,
+                                               
org.apache.pulsar.broker.service.Consumer svcConsumer)
+            throws Exception {
+        PersistentTopic pTopic = (PersistentTopic) pulsar.getBrokerService()
+                .getTopicReference(topic).get();
+
+        Thread.sleep(1100);
+        pTopic.getSubscription(sub).expireMessagesAsync(1).get();
+
+        // Trigger readMoreEntries to invoke removeAllUpTo
+        producer.send("trigger".getBytes());
+        Message<byte[]> triggerMsg = consumer.receive(2, TimeUnit.SECONDS);
+        Assert.assertNotNull(triggerMsg);
+        consumer.acknowledge(triggerMsg);
+
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
+                assertEquals(0, svcConsumer.getUnackedMessages()));
+    }
+
+    private org.apache.pulsar.broker.service.Consumer 
getTheUniqueServiceConsumer(String topic, String sub) {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).join().get();
+        AbstractPersistentDispatcherMultipleConsumers dispatcher =
+                (AbstractPersistentDispatcherMultipleConsumers) 
persistentTopic.getSubscription(sub).getDispatcher();
+        return dispatcher.getConsumers().iterator().next();
+    }
+
     private String findConsumerNameForHash(SubscriptionStats 
subscriptionStats, int hash) {
         return findConsumerForHash(subscriptionStats, 
hash).map(ConsumerStats::getConsumerName).orElse(null);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
index 01d2f107dff..1a963b523c2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.function.BiFunction;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.testng.Assert;
@@ -71,7 +72,7 @@ public abstract class ProducerConsumerBase extends 
MockedPulsarServiceBaseTest {
     private static final Random random = new Random();
 
     protected String newTopicName() {
-        return "my-property/my-ns/topic-" + 
Long.toHexString(random.nextLong());
+        return TopicName.get("my-property/my-ns/topic-" + 
Long.toHexString(random.nextLong())).toString();
     }
 
     protected <T> ReceivedMessages<T> receiveAndAckMessages(

Reply via email to