This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f3b5ff73608b0143756f68ee1c178794bd8b535c Author: baomingyu <[email protected]> AuthorDate: Thu Apr 8 09:51:39 2021 +0800 Fix 8115 Some partitions get stuck after adding additional consumers to the KEY_SHARED subscriptions (#10096) Fixes #8115 Master Issue: #8115 first point: Sometimes it will not success to call this method and the method readMoreEntries will not be called ` if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) { readMoreEntries(); } ` second point: Sometimes keyNumbers will not be decrement to zero , and broker will not be start next loop to readMoreEntries. some partition topic will be stunk and stop to push message to consumer ,even though there is permits in consumers. (cherry picked from commit c4f154e79c03cff9055aa4e2ede7748c5952f2bc) --- .../PersistentStickyKeyDispatcherMultipleConsumers.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index d7d08d1..ca078ac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -29,6 +29,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.mledger.Entry; @@ -172,6 +173,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size()); + int currentThreadKeyNumber = groupedEntries.size(); + if (currentThreadKeyNumber == 0) { + currentThreadKeyNumber = -1; + } for (Map.Entry<Consumer, List<Entry>> current : groupedEntries.entrySet()) { Consumer consumer = current.getKey(); List<Entry> entriesWithSameKey = current.getValue(); @@ -213,7 +218,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker()).addListener(future -> { - if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) { + if (future.isDone() && keyNumbers.decrementAndGet() == 0) { readMoreEntries(); } }); @@ -221,6 +226,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount())); totalMessagesSent += sendMessageInfo.getTotalMessages(); totalBytesSent += sendMessageInfo.getTotalBytes(); + } else { + currentThreadKeyNumber = keyNumbers.decrementAndGet(); } } @@ -257,6 +264,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi } // readMoreEntries should run regardless whether or not stuck is caused by stuckConsumers for avoid stopping dispatch. readMoreEntries(); + } else if (currentThreadKeyNumber == 0) { + topic.getBrokerService().executor().schedule(() -> { + synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) { + readMoreEntries(); + } + }, 100, TimeUnit.MILLISECONDS); } }
