This is an automated email from the ASF dual-hosted git repository.
Technoboy- pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new ef327a84916 [fix][broker] Decrement unacked counter when removeAllUpTo
removes pending acks (#25581)
ef327a84916 is described below
commit ef327a849169469b2b2c4d8b63fab4a60d87e208
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon Apr 27 20:50:37 2026 +0800
[fix][broker] Decrement unacked counter when removeAllUpTo removes pending
acks (#25581)
---
.../org/apache/pulsar/broker/service/Consumer.java | 29 ++++
.../pulsar/broker/service/PendingAcksMap.java | 22 ++-
.../PersistentDispatcherMultipleConsumers.java | 4 +-
.../pulsar/broker/service/PendingAcksMapTest.java | 31 +++-
.../pulsar/broker/stats/ConsumerStatsTest.java | 166 +++++++++++++++++++++
.../pulsar/client/api/ProducerConsumerBase.java | 3 +-
6 files changed, 243 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 3edf6e2d681..a7ca3f8339f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -1113,6 +1113,35 @@ public class Consumer {
return pendingAcks;
}
+ /**
+ * Remove all pending acks up to the given mark-delete position and
decrement the consumer's unacked message
+ * counter by the remaining unacked count for each removed entry.
+ *
+ * <p>This is used when the cursor's mark-delete position advances past
entries that are still in the consumer's
+ * pending acks. The remaining unacked count accounts for batch index
level acknowledgments — only the truly
+ * unacked batch indexes are decremented.
+ *
+ * @param markDeleteLedgerId the ledger ID up to which to remove pending
acks
+ * @param markDeleteEntryId the entry ID up to which to remove pending acks
+ */
+ public void removePendingAcksUpToPositionAndDecrementUnacked(long
markDeleteLedgerId, long markDeleteEntryId) {
+ if (pendingAcks == null) {
+ return;
+ }
+
+ MutableInt mutableTotalUnacked = new MutableInt(0);
+ pendingAcks.removeAllUpTo(markDeleteLedgerId, markDeleteEntryId,
+ (ledgerId, entryId, batchSize, stickyKeyHash) -> {
+ mutableTotalUnacked.add((int)
getUnAckedCountForBatchIndexLevelEnabled(
+ PositionFactory.create(ledgerId, entryId),
batchSize));
+ });
+ int totalUnacked = mutableTotalUnacked.intValue();
+ if (totalUnacked > 0) {
+ addAndGetUnAckedMsgs(this, -totalUnacked);
+ updateBlockedConsumerOnUnackedMsgs(this);
+ }
+ }
+
public int getPriorityLevel() {
return priorityLevel;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
index 7a728a037dc..0f7802d8294 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
@@ -326,13 +326,16 @@ public class PendingAcksMap {
}
/**
- * Remove all pending acks up to the given ledger ID and entry ID.
+ * Remove all pending acks up to the given ledger ID and entry ID,
invoking a callback for each removed entry.
*
* @param markDeleteLedgerId the ledger ID up to which to remove pending
acks
* @param markDeleteEntryId the entry ID up to which to remove pending acks
+ * @param removedEntryCallback optional callback invoked for each removed
entry (within the write lock),
+ * receiving ledgerId, entryId, batchSize, and
stickyKeyHash
*/
- public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId)
{
- internalRemoveAllUpTo(markDeleteLedgerId, markDeleteEntryId, false);
+ public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId,
+ PendingAcksConsumer removedEntryCallback) {
+ internalRemoveAllUpTo(markDeleteLedgerId, markDeleteEntryId, false,
removedEntryCallback);
}
/**
@@ -345,8 +348,10 @@ public class PendingAcksMap {
* @param markDeleteLedgerId the ledger ID up to which to remove pending
acks
* @param markDeleteEntryId the entry ID up to which to remove pending acks
* @param useWriteLock true if the method should use a write lock, false
otherwise
+ * @param removedEntryCallback optional callback invoked for each removed
entry (within the write lock)
*/
- private void internalRemoveAllUpTo(long markDeleteLedgerId, long
markDeleteEntryId, boolean useWriteLock) {
+ private void internalRemoveAllUpTo(long markDeleteLedgerId, long
markDeleteEntryId, boolean useWriteLock,
+ PendingAcksConsumer
removedEntryCallback) {
PendingAcksRemoveHandler pendingAcksRemoveHandler =
pendingAcksRemoveHandlerSupplier.get();
// track if the write lock was acquired
boolean acquiredWriteLock = false;
@@ -382,14 +387,19 @@ public class PendingAcksMap {
retryWithWriteLock = true;
return;
}
+ IntIntPair value = intIntPairEntry.getValue();
+ int batchSize = value.leftInt();
+ int stickyKeyHash = value.rightInt();
if (pendingAcksRemoveHandler != null) {
if (!batchStarted) {
pendingAcksRemoveHandler.startBatch();
batchStarted = true;
}
- int stickyKeyHash =
intIntPairEntry.getValue().rightInt();
pendingAcksRemoveHandler.handleRemoving(consumer,
ledgerId, entryId, stickyKeyHash, closed);
}
+ if (removedEntryCallback != null) {
+ removedEntryCallback.accept(ledgerId, entryId,
batchSize, stickyKeyHash);
+ }
entryMapIterator.remove();
}
if (ledgerMap.isEmpty()) {
@@ -409,7 +419,7 @@ public class PendingAcksMap {
} else {
readLock.unlock();
if (retryWithWriteLock) {
- internalRemoveAllUpTo(markDeleteLedgerId,
markDeleteEntryId, true);
+ internalRemoveAllUpTo(markDeleteLedgerId,
markDeleteEntryId, true, removedEntryCallback);
}
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index a88fbf863eb..220b9e23a3e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -363,8 +363,8 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractPersistentDis
if (lastMarkDeletePositionBeforeReadMoreEntries != markDeletePosition)
{
redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(),
markDeletePosition.getEntryId());
for (Consumer consumer : consumerList) {
- consumer.getPendingAcks()
- .removeAllUpTo(markDeletePosition.getLedgerId(),
markDeletePosition.getEntryId());
+ consumer.removePendingAcksUpToPositionAndDecrementUnacked(
+ markDeletePosition.getLedgerId(),
markDeletePosition.getEntryId());
}
lastMarkDeletePositionBeforeReadMoreEntries = markDeletePosition;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
index 42f5935ca88..8db0e3a0f73 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
@@ -117,7 +117,8 @@ public class PendingAcksMapTest {
pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
pendingAcksMap.addPendingAckIfAllowed(2L, 1L, 1, 125);
- pendingAcksMap.removeAllUpTo(1L, 2L);
+ pendingAcksMap.removeAllUpTo(1L, 2L, (ledgerId, entryId, batchSize,
stickyKeyHash) -> {
+ });
assertFalse(pendingAcksMap.contains(1L, 1L));
assertFalse(pendingAcksMap.contains(1L, 2L));
@@ -134,7 +135,8 @@ public class PendingAcksMapTest {
pendingAcksMap.addPendingAckIfAllowed(2L, 2L, 1, 126);
pendingAcksMap.addPendingAckIfAllowed(3L, 1L, 1, 127);
- pendingAcksMap.removeAllUpTo(2L, 1L);
+ pendingAcksMap.removeAllUpTo(2L, 1L, (ledgerId, entryId, batchSize,
stickyKeyHash) -> {
+ });
assertFalse(pendingAcksMap.contains(1L, 1L));
assertFalse(pendingAcksMap.contains(1L, 2L));
@@ -176,13 +178,36 @@ public class PendingAcksMapTest {
pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
pendingAcksMap.addPendingAckIfAllowed(2L, 1L, 1, 125);
- pendingAcksMap.removeAllUpTo(1L, 2L);
+ pendingAcksMap.removeAllUpTo(1L, 2L, (ledgerId, entryId, batchSize,
stickyKeyHash) -> {
+ });
verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false);
verify(removeHandler).handleRemoving(consumer, 1L, 2L, 124, false);
verify(removeHandler, never()).handleRemoving(consumer, 2L, 1L, 125,
false);
}
+ @Test
+ public void removeAllUpToWithCallback_InvokesCallbackForEachRemovedEntry()
{
+ Consumer consumer = createMockConsumer("consumer1");
+ PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () ->
null, () -> null);
+ pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 3, 123);
+ pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 5, 124);
+ pendingAcksMap.addPendingAckIfAllowed(2L, 1L, 7, 125);
+
+ List<int[]> callbackInvocations = new ArrayList<>();
+ pendingAcksMap.removeAllUpTo(1L, 2L,
+ (ledgerId, entryId, batchSize, stickyKeyHash) -> {
+ callbackInvocations.add(new int[]{(int) ledgerId, (int)
entryId, batchSize, stickyKeyHash});
+ });
+
+ assertEquals(callbackInvocations.size(), 2);
+ assertEquals(callbackInvocations.get(0), new int[]{1, 1, 3, 123});
+ assertEquals(callbackInvocations.get(1), new int[]{1, 2, 5, 124});
+ assertFalse(pendingAcksMap.contains(1L, 1L));
+ assertFalse(pendingAcksMap.contains(1L, 2L));
+ assertTrue(pendingAcksMap.contains(2L, 1L));
+ }
+
@Test
public void size_ReturnsCorrectSize() {
Consumer consumer = createMockConsumer("consumer1");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 568a3cb0850..30b8310b28d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -59,6 +59,7 @@ import
org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.StickyKeyDispatcher;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
+import
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.plugin.EntryFilterProducerTest;
@@ -106,6 +107,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase
{
@Override
protected ServiceConfiguration getDefaultConf() {
ServiceConfiguration conf = super.getDefaultConf();
+ conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
conf.setMaxUnackedMessagesPerConsumer(0);
// wait for shutdown of the broker, this prevents flakiness which
could be caused by metrics being
// unregistered asynchronously. This impacts the execution of the next
test method if this would be happening.
@@ -729,6 +731,170 @@ public class ConsumerStatsTest extends
ProducerConsumerBase {
}
+ @DataProvider(name = "subscriptionTypes")
+ public Object[][] subscriptionTypes() {
+ return new Object[][]{
+ {SubscriptionType.Shared},
+ {SubscriptionType.Key_Shared}
+ };
+ }
+
+ /**
+ * Verify unacked count is correctly decremented when removeAllUpTo
removes non-batch
+ * entries from pendingAcks after mark-delete advances via message expiry.
+ */
+ @Test(dataProvider = "subscriptionTypes")
+ public void testUnackedCountNonBatchAfterExpire(SubscriptionType subType)
throws Exception {
+ String topic = newTopicName();
+ String sub = "sub";
+ int numMessages = 10;
+
+ @Cleanup Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic).enableBatching(false).create();
+ @Cleanup Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic).subscriptionName(sub)
+ .subscriptionType(subType)
+ .subscribe();
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(("msg-" + i).getBytes());
+ }
+
+ org.apache.pulsar.broker.service.Consumer svcConsumer =
+ getTheUniqueServiceConsumer(topic, sub);
+ for (int i = 0; i < numMessages; i++) {
+ Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
+ Assert.assertNotNull(msg, "Expected to receive message " + i);
+ }
+
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(numMessages, svcConsumer.getUnackedMessages()));
+
+ expireAndVerifyUnackedDrained(topic, sub, producer, consumer,
svcConsumer);
+ }
+
+ /**
+ * Verify unacked count is correctly decremented when removeAllUpTo
removes batch
+ * entries from pendingAcks after mark-delete advances via message expiry.
+ */
+ @Test(dataProvider = "subscriptionTypes")
+ public void testUnackedCountBatchAfterExpire(SubscriptionType subType)
throws Exception {
+ String topic = newTopicName();
+ String sub = "sub";
+ int numMessages = 10;
+
+ @Cleanup Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .batchingMaxMessages(20)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+ .enableBatching(true)
+ .create();
+ @Cleanup Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic).subscriptionName(sub)
+ .subscriptionType(subType)
+ .subscribe();
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.newMessage().value(("batch-" + i).getBytes()).sendAsync();
+ }
+ producer.flush();
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
+ Assert.assertNotNull(msg, "Expected to receive message " + i);
+ }
+
+ org.apache.pulsar.broker.service.Consumer svcConsumer =
+ getTheUniqueServiceConsumer(topic, sub);
+
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(numMessages, svcConsumer.getUnackedMessages()));
+
+ expireAndVerifyUnackedDrained(topic, sub, producer, consumer,
svcConsumer);
+ }
+
+ /**
+ * Verify unacked count is correctly decremented when removeAllUpTo
removes a partially-acked
+ * batch entry from pendingAcks after mark-delete advances via message
expiry.
+ *
+ * <p>Flow: produce batch(batchSize=10) → consume all → ack 5 of 10 →
expire → unacked should be 0.
+ */
+ @Test(dataProvider = "subscriptionTypes")
+ public void testUnackedCountBatchPartialAckAfterExpire(SubscriptionType
subType) throws Exception {
+ String topic = newTopicName();
+ String sub = "sub";
+ int numMessages = 10;
+ int ackCount = 5;
+
+ @Cleanup Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .batchingMaxMessages(20)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+ .enableBatching(true)
+ .create();
+ @Cleanup Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(sub)
+ .enableBatchIndexAcknowledgment(true)
+ .subscriptionType(subType)
+ .subscribe();
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.newMessage().value(("batch-" + i).getBytes()).sendAsync();
+ }
+ producer.flush();
+
+ List<Message<byte[]>> messages = new ArrayList<>();
+ for (int i = 0; i < numMessages; i++) {
+ Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
+ Assert.assertNotNull(msg, "Expected to receive message " + i);
+ messages.add(msg);
+ }
+
+ org.apache.pulsar.broker.service.Consumer svcConsumer =
+ getTheUniqueServiceConsumer(topic, sub);
+
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(numMessages, svcConsumer.getUnackedMessages()));
+
+ // Partially ack — ack 5 of 10 batch indexes
+ for (int i = 0; i < ackCount; i++) {
+ consumer.acknowledge(messages.get(i));
+ }
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(numMessages - ackCount,
svcConsumer.getUnackedMessages()));
+
+ expireAndVerifyUnackedDrained(topic, sub, producer, consumer,
svcConsumer);
+ }
+
+ private void expireAndVerifyUnackedDrained(String topic, String sub,
+ Producer<byte[]> producer,
Consumer<byte[]> consumer,
+
org.apache.pulsar.broker.service.Consumer svcConsumer)
+ throws Exception {
+ PersistentTopic pTopic = (PersistentTopic) pulsar.getBrokerService()
+ .getTopicReference(topic).get();
+
+ Thread.sleep(1100);
+ pTopic.getSubscription(sub).expireMessagesAsync(1).get();
+
+ // Trigger readMoreEntries to invoke removeAllUpTo
+ producer.send("trigger".getBytes());
+ Message<byte[]> triggerMsg = consumer.receive(2, TimeUnit.SECONDS);
+ Assert.assertNotNull(triggerMsg);
+ consumer.acknowledge(triggerMsg);
+
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
+ assertEquals(0, svcConsumer.getUnackedMessages()));
+ }
+
+ private org.apache.pulsar.broker.service.Consumer
getTheUniqueServiceConsumer(String topic, String sub) {
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(topic,
false).join().get();
+ AbstractPersistentDispatcherMultipleConsumers dispatcher =
+ (AbstractPersistentDispatcherMultipleConsumers)
persistentTopic.getSubscription(sub).getDispatcher();
+ return dispatcher.getConsumers().iterator().next();
+ }
+
private String findConsumerNameForHash(SubscriptionStats
subscriptionStats, int hash) {
return findConsumerForHash(subscriptionStats,
hash).map(ConsumerStats::getConsumerName).orElse(null);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
index 90babc00d2a..638f5aec484 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
@@ -30,6 +30,7 @@ import java.util.Set;
import java.util.function.BiFunction;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.testng.Assert;
@@ -71,7 +72,7 @@ public abstract class ProducerConsumerBase extends
MockedPulsarServiceBaseTest {
private static final Random random = new Random();
protected String newTopicName() {
- return "my-property/my-ns/topic-" +
Long.toHexString(random.nextLong());
+ return TopicName.get("my-property/my-ns/topic-" +
Long.toHexString(random.nextLong())).toString();
}
protected <T> ReceivedMessages<T> receiveAndAckMessages(