merlimat commented on a change in pull request #5276: Fixed race condition 
while triggering message redelivery after an ack-timeout event
URL: https://github.com/apache/pulsar/pull/5276#discussion_r329706992
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
 ##########
 @@ -546,21 +546,29 @@ public void redeliverUnacknowledgedMessages() {
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] consumer {} received redelivery", topicName, 
subscription, consumerId);
         }
-        // redeliver unacked-msgs
-        subscription.redeliverUnacknowledgedMessages(this);
-        flowConsumerBlockedPermits(this);
+
         if (pendingAcks != null) {
-            AtomicInteger totalRedeliveryMessages = new AtomicInteger(0);
-            pendingAcks.forEach(
-                    (ledgerId, entryId, batchSize, none) -> 
totalRedeliveryMessages.addAndGet((int) batchSize));
-            msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.get(), 
totalRedeliveryMessages.get());
-            pendingAcks.clear();
+            List<PositionImpl> pendingPositions = new ArrayList<>((int) 
pendingAcks.size());
+            MutableInt totalRedeliveryMessages = new MutableInt(0);
+            pendingAcks.forEach((ledgerId, entryId, batchSize, none) -> {
+                totalRedeliveryMessages.add((int) batchSize);
+                pendingPositions.add(new PositionImpl(ledgerId, entryId));
+            });
+
+            for (PositionImpl p : pendingPositions) {
+                pendingAcks.remove(p.getLedgerId(), p.getEntryId());
 
 Review comment:
   It's to avoid missing any insertion that might be happening into pendingAcks 
from a different thread. (eg: if the dispatcher thread was actually sending 
messages at the same time)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to