This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 5cb975b5e39 [fix][broker] Fix race in pending acks removal in
redeliverUnacknowledgedMessages (#25589)
5cb975b5e39 is described below
commit 5cb975b5e39359b031fe571512ccc583ab14c1f2
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Apr 28 15:18:55 2026 +0800
[fix][broker] Fix race in pending acks removal in
redeliverUnacknowledgedMessages (#25589)
(cherry-pick from commit 9e1bae353ad3f869aaab99a98548fbe37375364e)
---
.../org/apache/pulsar/broker/service/Consumer.java | 25 +++----
.../pulsar/broker/service/PendingAcksMap.java | 49 ++++++++++++-
.../pulsar/broker/service/PendingAcksMapTest.java | 81 ++++++++++++++++++++++
3 files changed, 136 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index c0b27b22382..0d9492afee1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -1137,8 +1137,6 @@ public class Consumer {
public void redeliverUnacknowledgedMessages(long consumerEpoch) {
// cleanup unackedMessage bucket and redeliver those unack-msgs again
- clearUnAckedMsgs();
- blockedConsumerOnUnackedMsgs = false;
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received redelivery", topicName,
subscription, consumerId);
}
@@ -1146,7 +1144,7 @@ public class Consumer {
if (pendingAcks != null) {
List<Position> pendingPositions = new ArrayList<>((int)
pendingAcks.size());
MutableInt totalRedeliveryMessages = new MutableInt(0);
- pendingAcks.forEach((ledgerId, entryId, batchSize, stickyKeyHash)
-> {
+ pendingAcks.forEachAndClear((ledgerId, entryId, batchSize,
stickyKeyHash) -> {
int unAckedCount =
(int)
getUnAckedCountForBatchIndexLevelEnabled(PositionFactory.create(ledgerId,
entryId),
batchSize);
@@ -1154,15 +1152,18 @@ public class Consumer {
pendingPositions.add(PositionFactory.create(ledgerId,
entryId));
});
- for (Position p : pendingPositions) {
- pendingAcks.remove(p.getLedgerId(), p.getEntryId());
+ if (totalRedeliveryMessages.intValue() > 0) {
+ addAndGetUnAckedMsgs(this,
-totalRedeliveryMessages.intValue());
}
+ blockedConsumerOnUnackedMsgs = false;
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.intValue(),
totalRedeliveryMessages.intValue());
msgRedeliverCounter.add(totalRedeliveryMessages.intValue());
subscription.redeliverUnacknowledgedMessages(this,
pendingPositions);
} else {
+ clearUnAckedMsgs();
+ blockedConsumerOnUnackedMsgs = false;
subscription.redeliverUnacknowledgedMessages(this, consumerEpoch);
}
@@ -1174,7 +1175,7 @@ public class Consumer {
List<Position> pendingPositions = new ArrayList<>();
for (MessageIdData msg : messageIds) {
Position position = PositionFactory.create(msg.getLedgerId(),
msg.getEntryId());
- IntIntPair pendingAck = pendingAcks.get(position.getLedgerId(),
position.getEntryId());
+ IntIntPair pendingAck =
pendingAcks.removeAndGet(position.getLedgerId(), position.getEntryId());
if (pendingAck != null) {
int unAckedCount = (int)
getUnAckedCountForBatchIndexLevelEnabled(position, pendingAck.leftInt());
pendingAcks.remove(position.getLedgerId(),
position.getEntryId());
@@ -1195,17 +1196,7 @@ public class Consumer {
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages,
totalRedeliveryMessages);
msgRedeliverCounter.add(totalRedeliveryMessages);
- int numberOfBlockedPermits =
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0);
-
- // if permitsReceivedWhileConsumerBlocked has been accumulated then
pass it to Dispatcher to flow messages
- if (numberOfBlockedPermits > 0) {
- MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits);
- if (log.isDebugEnabled()) {
- log.debug("[{}-{}] Added {} blockedPermits to
broker.service.Consumer's messagePermits for consumer {}",
- topicName, subscription, numberOfBlockedPermits,
consumerId);
- }
- subscription.consumerFlow(this, numberOfBlockedPermits);
- }
+ flowConsumerBlockedPermits(this);
}
public Subscription getSubscription() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
index 0f7802d8294..549aef36cec 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
@@ -204,9 +204,26 @@ public class PendingAcksMap {
* @param processor the processor to handle each pending ack
*/
public void forEachAndClose(PendingAcksConsumer processor) {
+ internalForEachAndClear(processor, true);
+ }
+
+ /**
+ * Iterate over all the pending acks and clear the map.
+ * Unlike {@link #forEachAndClose(PendingAcksConsumer)}, this method does
not close the map,
+ * so new entries can still be added after this method returns.
+ *
+ * @param processor the processor to handle each pending ack
+ */
+ public void forEachAndClear(PendingAcksConsumer processor) {
+ internalForEachAndClear(processor, false);
+ }
+
+ private void internalForEachAndClear(PendingAcksConsumer processor,
boolean close) {
try {
writeLock.lock();
- closed = true;
+ if (close) {
+ closed = true;
+ }
PendingAcksRemoveHandler pendingAcksRemoveHandler =
pendingAcksRemoveHandlerSupplier.get();
if (pendingAcksRemoveHandler != null) {
try {
@@ -226,7 +243,6 @@ public class PendingAcksMap {
writeLock.unlock();
}
}
-
/**
* Check if the map contains a pending ack for the given ledger ID and
entry ID.
*
@@ -325,6 +341,35 @@ public class PendingAcksMap {
}
}
+ /**
+ * Atomically remove and return the pending ack for the given ledger ID
and entry ID.
+ * Unlike {@link #remove(long, long)}, this method returns the removed
entry so the caller
+ * can access the batch size and sticky key hash without a separate get
operation.
+ *
+ * @param ledgerId the ledger ID
+ * @param entryId the entry ID
+ * @return the removed entry as an IntIntPair (batchSize, stickyKeyHash),
or null if not found
+ */
+ public IntIntPair removeAndGet(long ledgerId, long entryId) {
+ try {
+ writeLock.lock();
+ Long2ObjectSortedMap<IntIntPair> ledgerMap =
pendingAcks.get(ledgerId);
+ if (ledgerMap == null) {
+ return null;
+ }
+ IntIntPair removedEntry = ledgerMap.remove(entryId);
+ if (removedEntry != null) {
+ handleRemovePendingAck(ledgerId, entryId,
removedEntry.rightInt());
+ }
+ if (removedEntry != null && ledgerMap.isEmpty()) {
+ pendingAcks.remove(ledgerId);
+ }
+ return removedEntry;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
/**
* Remove all pending acks up to the given ledger ID and entry ID,
invoking a callback for each removed entry.
*
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
index 8db0e3a0f73..ca32a1aee5f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import it.unimi.dsi.fastutil.ints.IntIntPair;
import java.util.ArrayList;
import java.util.List;
import org.testng.annotations.Test;
@@ -218,4 +219,84 @@ public class PendingAcksMapTest {
assertEquals(pendingAcksMap.size(), 3);
}
+
+ @Test
+ public void forEachAndClear_ProcessesAndClearsAllPendingAcks() {
+ Consumer consumer = createMockConsumer("consumer1");
+ PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () ->
null, () -> null);
+ pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123);
+ pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
+
+ List<Long> processedEntries = new ArrayList<>();
+ pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize,
stickyKeyHash) -> processedEntries.add(entryId));
+
+ assertEquals(processedEntries, List.of(1L, 2L));
+ assertEquals(pendingAcksMap.size(), 0);
+ }
+
+ @Test
+ public void forEachAndClear_AllowsAddingAfterClear() {
+ Consumer consumer = createMockConsumer("consumer1");
+ PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () ->
null, () -> null);
+ pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123);
+
+ pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize,
stickyKeyHash) -> {});
+
+ // Unlike forEachAndClose, forEachAndClear should allow new additions
+ boolean result = pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
+ assertTrue(result);
+ assertTrue(pendingAcksMap.contains(1L, 2L));
+ }
+
+ @Test
+ public void forEachAndClear_InvokesRemoveHandler() {
+ Consumer consumer = createMockConsumer("consumer1");
+ PendingAcksMap.PendingAcksRemoveHandler removeHandler =
mock(PendingAcksMap.PendingAcksRemoveHandler.class);
+ PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () ->
null, () -> removeHandler);
+ pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123);
+ pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
+
+ pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize,
stickyKeyHash) -> {});
+
+ verify(removeHandler).startBatch();
+ verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false);
+ verify(removeHandler).handleRemoving(consumer, 1L, 2L, 124, false);
+ verify(removeHandler).endBatch();
+ }
+
+ @Test
+ public void removeAndGet_RemovesAndReturnsEntry() {
+ Consumer consumer = createMockConsumer("consumer1");
+ PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () ->
null, () -> null);
+ pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 5, 123);
+
+ IntIntPair result = pendingAcksMap.removeAndGet(1L, 1L);
+
+ assertTrue(result != null);
+ assertEquals(result.leftInt(), 5);
+ assertEquals(result.rightInt(), 123);
+ assertFalse(pendingAcksMap.contains(1L, 1L));
+ }
+
+ @Test
+ public void removeAndGet_ReturnsNullWhenNotFound() {
+ Consumer consumer = createMockConsumer("consumer1");
+ PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () ->
null, () -> null);
+
+ IntIntPair result = pendingAcksMap.removeAndGet(1L, 1L);
+
+ assertTrue(result == null);
+ }
+
+ @Test
+ public void removeAndGet_InvokesRemoveHandler() {
+ Consumer consumer = createMockConsumer("consumer1");
+ PendingAcksMap.PendingAcksRemoveHandler removeHandler =
mock(PendingAcksMap.PendingAcksRemoveHandler.class);
+ PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () ->
null, () -> removeHandler);
+ pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 5, 123);
+
+ pendingAcksMap.removeAndGet(1L, 1L);
+
+ verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false);
+ }
}
\ No newline at end of file