This is an automated email from the ASF dual-hosted git repository.

Technoboy- pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new bcd64102e0f [fix][broker] Fix race in pending acks removal in 
redeliverUnacknowledgedMessages (#25589)
bcd64102e0f is described below

commit bcd64102e0f3b7e483dfcebc87420ff210634f62
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue May 12 18:30:06 2026 +0800

    [fix][broker] Fix race in pending acks removal in 
redeliverUnacknowledgedMessages (#25589)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 25 +++----
 .../pulsar/broker/service/PendingAcksMap.java      | 49 ++++++++++++-
 .../pulsar/broker/service/PendingAcksMapTest.java  | 81 ++++++++++++++++++++++
 3 files changed, 136 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index a7ca3f8339f..d37cecc2767 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -1148,8 +1148,6 @@ public class Consumer {
 
     public void redeliverUnacknowledgedMessages(long consumerEpoch) {
         // cleanup unackedMessage bucket and redeliver those unack-msgs again
-        clearUnAckedMsgs();
-        blockedConsumerOnUnackedMsgs = false;
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] consumer {} received redelivery", topicName, 
subscription, consumerId);
         }
@@ -1157,7 +1155,7 @@ public class Consumer {
         if (pendingAcks != null) {
             List<Position> pendingPositions = new ArrayList<>((int) 
pendingAcks.size());
             MutableInt totalRedeliveryMessages = new MutableInt(0);
-            pendingAcks.forEach((ledgerId, entryId, batchSize, stickyKeyHash) 
-> {
+            pendingAcks.forEachAndClear((ledgerId, entryId, batchSize, 
stickyKeyHash) -> {
                 int unAckedCount =
                         (int) 
getUnAckedCountForBatchIndexLevelEnabled(PositionFactory.create(ledgerId, 
entryId),
                                 batchSize);
@@ -1165,15 +1163,18 @@ public class Consumer {
                 pendingPositions.add(PositionFactory.create(ledgerId, 
entryId));
             });
 
-            for (Position p : pendingPositions) {
-                pendingAcks.remove(p.getLedgerId(), p.getEntryId());
+            if (totalRedeliveryMessages.intValue() > 0) {
+                addAndGetUnAckedMsgs(this, 
-totalRedeliveryMessages.intValue());
             }
+            blockedConsumerOnUnackedMsgs = false;
 
             
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.intValue(), 
totalRedeliveryMessages.intValue());
             msgRedeliverCounter.add(totalRedeliveryMessages.intValue());
 
             subscription.redeliverUnacknowledgedMessages(this, 
pendingPositions);
         } else {
+            clearUnAckedMsgs();
+            blockedConsumerOnUnackedMsgs = false;
             subscription.redeliverUnacknowledgedMessages(this, consumerEpoch);
         }
 
@@ -1185,7 +1186,7 @@ public class Consumer {
         List<Position> pendingPositions = new ArrayList<>();
         for (MessageIdData msg : messageIds) {
             Position position = PositionFactory.create(msg.getLedgerId(), 
msg.getEntryId());
-            IntIntPair pendingAck = pendingAcks.get(position.getLedgerId(), 
position.getEntryId());
+            IntIntPair pendingAck = 
pendingAcks.removeAndGet(position.getLedgerId(), position.getEntryId());
             if (pendingAck != null) {
                 int unAckedCount = (int) 
getUnAckedCountForBatchIndexLevelEnabled(position, pendingAck.leftInt());
                 pendingAcks.remove(position.getLedgerId(), 
position.getEntryId());
@@ -1206,17 +1207,7 @@ public class Consumer {
         msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, 
totalRedeliveryMessages);
         msgRedeliverCounter.add(totalRedeliveryMessages);
 
-        int numberOfBlockedPermits = 
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0);
-
-        // if permitsReceivedWhileConsumerBlocked has been accumulated then 
pass it to Dispatcher to flow messages
-        if (numberOfBlockedPermits > 0) {
-            MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits);
-            if (log.isDebugEnabled()) {
-               log.debug("[{}-{}] Added {} blockedPermits to 
broker.service.Consumer's messagePermits for consumer {}",
-                       topicName, subscription, numberOfBlockedPermits, 
consumerId);
-            }
-            subscription.consumerFlow(this, numberOfBlockedPermits);
-        }
+        flowConsumerBlockedPermits(this);
     }
 
     public Subscription getSubscription() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
index 0f7802d8294..549aef36cec 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
@@ -204,9 +204,26 @@ public class PendingAcksMap {
      * @param processor the processor to handle each pending ack
      */
     public void forEachAndClose(PendingAcksConsumer processor) {
+        internalForEachAndClear(processor, true);
+    }
+
+    /**
+     * Iterate over all the pending acks and clear the map.
+     * Unlike {@link #forEachAndClose(PendingAcksConsumer)}, this method does 
not close the map,
+     * so new entries can still be added after this method returns.
+     *
+     * @param processor the processor to handle each pending ack
+     */
+    public void forEachAndClear(PendingAcksConsumer processor) {
+        internalForEachAndClear(processor, false);
+    }
+
+    private void internalForEachAndClear(PendingAcksConsumer processor, 
boolean close) {
         try {
             writeLock.lock();
-            closed = true;
+            if (close) {
+                closed = true;
+            }
             PendingAcksRemoveHandler pendingAcksRemoveHandler = 
pendingAcksRemoveHandlerSupplier.get();
             if (pendingAcksRemoveHandler != null) {
                 try {
@@ -226,7 +243,6 @@ public class PendingAcksMap {
             writeLock.unlock();
         }
     }
-
     /**
      * Check if the map contains a pending ack for the given ledger ID and 
entry ID.
      *
@@ -325,6 +341,35 @@ public class PendingAcksMap {
         }
     }
 
+    /**
+     * Atomically remove and return the pending ack for the given ledger ID 
and entry ID.
+     * Unlike {@link #remove(long, long)}, this method returns the removed 
entry so the caller
+     * can access the batch size and sticky key hash without a separate get 
operation.
+     *
+     * @param ledgerId the ledger ID
+     * @param entryId the entry ID
+     * @return the removed entry as an IntIntPair (batchSize, stickyKeyHash), 
or null if not found
+     */
+    public IntIntPair removeAndGet(long ledgerId, long entryId) {
+        try {
+            writeLock.lock();
+            Long2ObjectSortedMap<IntIntPair> ledgerMap = 
pendingAcks.get(ledgerId);
+            if (ledgerMap == null) {
+                return null;
+            }
+            IntIntPair removedEntry = ledgerMap.remove(entryId);
+            if (removedEntry != null) {
+                handleRemovePendingAck(ledgerId, entryId, 
removedEntry.rightInt());
+            }
+            if (removedEntry != null && ledgerMap.isEmpty()) {
+                pendingAcks.remove(ledgerId);
+            }
+            return removedEntry;
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
     /**
      * Remove all pending acks up to the given ledger ID and entry ID, 
invoking a callback for each removed entry.
      *
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
index 8db0e3a0f73..ca32a1aee5f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
+import it.unimi.dsi.fastutil.ints.IntIntPair;
 import java.util.ArrayList;
 import java.util.List;
 import org.testng.annotations.Test;
@@ -218,4 +219,84 @@ public class PendingAcksMapTest {
 
         assertEquals(pendingAcksMap.size(), 3);
     }
+
+    @Test
+    public void forEachAndClear_ProcessesAndClearsAllPendingAcks() {
+        Consumer consumer = createMockConsumer("consumer1");
+        PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> 
null, () -> null);
+        pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123);
+        pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
+
+        List<Long> processedEntries = new ArrayList<>();
+        pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize, 
stickyKeyHash) -> processedEntries.add(entryId));
+
+        assertEquals(processedEntries, List.of(1L, 2L));
+        assertEquals(pendingAcksMap.size(), 0);
+    }
+
+    @Test
+    public void forEachAndClear_AllowsAddingAfterClear() {
+        Consumer consumer = createMockConsumer("consumer1");
+        PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> 
null, () -> null);
+        pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123);
+
+        pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize, 
stickyKeyHash) -> {});
+
+        // Unlike forEachAndClose, forEachAndClear should allow new additions
+        boolean result = pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
+        assertTrue(result);
+        assertTrue(pendingAcksMap.contains(1L, 2L));
+    }
+
+    @Test
+    public void forEachAndClear_InvokesRemoveHandler() {
+        Consumer consumer = createMockConsumer("consumer1");
+        PendingAcksMap.PendingAcksRemoveHandler removeHandler = 
mock(PendingAcksMap.PendingAcksRemoveHandler.class);
+        PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> 
null, () -> removeHandler);
+        pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123);
+        pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
+
+        pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize, 
stickyKeyHash) -> {});
+
+        verify(removeHandler).startBatch();
+        verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false);
+        verify(removeHandler).handleRemoving(consumer, 1L, 2L, 124, false);
+        verify(removeHandler).endBatch();
+    }
+
+    @Test
+    public void removeAndGet_RemovesAndReturnsEntry() {
+        Consumer consumer = createMockConsumer("consumer1");
+        PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> 
null, () -> null);
+        pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 5, 123);
+
+        IntIntPair result = pendingAcksMap.removeAndGet(1L, 1L);
+
+        assertTrue(result != null);
+        assertEquals(result.leftInt(), 5);
+        assertEquals(result.rightInt(), 123);
+        assertFalse(pendingAcksMap.contains(1L, 1L));
+    }
+
+    @Test
+    public void removeAndGet_ReturnsNullWhenNotFound() {
+        Consumer consumer = createMockConsumer("consumer1");
+        PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> 
null, () -> null);
+
+        IntIntPair result = pendingAcksMap.removeAndGet(1L, 1L);
+
+        assertTrue(result == null);
+    }
+
+    @Test
+    public void removeAndGet_InvokesRemoveHandler() {
+        Consumer consumer = createMockConsumer("consumer1");
+        PendingAcksMap.PendingAcksRemoveHandler removeHandler = 
mock(PendingAcksMap.PendingAcksRemoveHandler.class);
+        PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> 
null, () -> removeHandler);
+        pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 5, 123);
+
+        pendingAcksMap.removeAndGet(1L, 1L);
+
+        verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false);
+    }
 }
\ No newline at end of file

Reply via email to