This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f3b5ff73608b0143756f68ee1c178794bd8b535c
Author: baomingyu <[email protected]>
AuthorDate: Thu Apr 8 09:51:39 2021 +0800

    Fix 8115 Some partitions get stuck after adding additional consumers to the 
KEY_SHARED subscriptions (#10096)
    
    Fixes #8115
    
    Master Issue: #8115
    
    first point:
     Sometimes it will not success to call this method and the method 
readMoreEntries will not be called
    ` if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
                            readMoreEntries();
     } `
    
    second point:
      Sometimes  keyNumbers will not be decrement to zero , and broker will not 
be start next  loop to readMoreEntries.
    some partition topic will be stunk and stop to push message to consumer 
,even though  there is permits in consumers.
    
    (cherry picked from commit c4f154e79c03cff9055aa4e2ede7748c5952f2bc)
---
 .../PersistentStickyKeyDispatcherMultipleConsumers.java   | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index d7d08d1..ca078ac 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -29,6 +29,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.mledger.Entry;
@@ -172,6 +173,10 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
         AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
 
+        int currentThreadKeyNumber = groupedEntries.size();
+        if (currentThreadKeyNumber == 0) {
+            currentThreadKeyNumber = -1;
+        }
         for (Map.Entry<Consumer, List<Entry>> current : 
groupedEntries.entrySet()) {
             Consumer consumer = current.getKey();
             List<Entry> entriesWithSameKey = current.getValue();
@@ -213,7 +218,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                 consumer.sendMessages(entriesWithSameKey, batchSizes, 
batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(),
                         getRedeliveryTracker()).addListener(future -> {
-                            if (future.isSuccess() && 
keyNumbers.decrementAndGet() == 0) {
+                            if (future.isDone() && 
keyNumbers.decrementAndGet() == 0) {
                                 readMoreEntries();
                             }
                         });
@@ -221,6 +226,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                 TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, 
-(sendMessageInfo.getTotalMessages() - 
batchIndexesAcks.getTotalAckedIndexCount()));
                 totalMessagesSent += sendMessageInfo.getTotalMessages();
                 totalBytesSent += sendMessageInfo.getTotalBytes();
+            } else {
+                currentThreadKeyNumber = keyNumbers.decrementAndGet();
             }
         }
 
@@ -257,6 +264,12 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             }
             // readMoreEntries should run regardless whether or not stuck is 
caused by stuckConsumers for avoid stopping dispatch.
             readMoreEntries();
+        }  else if (currentThreadKeyNumber == 0) {
+            topic.getBrokerService().executor().schedule(() -> {
+                synchronized 
(PersistentStickyKeyDispatcherMultipleConsumers.this) {
+                    readMoreEntries();
+                }
+            }, 100, TimeUnit.MILLISECONDS);
         }
     }
 

Reply via email to