This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 587548ec5f9e9a9fc6d465aca3cd3fbb76b9ee64 Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Aug 25 00:33:35 2021 -0700 Fixed race condition on multi-topic consumer (#11764) ### Motivation Under certain conditions applications using the multi-topic consumers might get the consumption stalled: The conditions to reproduce the issue are: * Consumer is subscribed to multiple topics, but only 1 topic has traffic * Messages are published in batches (no repro if no batches) * Receiver queue size == 1 (or small, in order to exercise race condition) The problem is that there is race condition between 2 threads when we're deciding to put one of the individual consumers in "paused" state, when the shared queue is full. What happens is that, just after we checked the conditions and we decide to mark the consumer as paused, the application has emptied the shared queue completely. From that point on, there is no re-attempt to check whether we need to unblock that consumer. ### Modification Instead of introducing a sync block (contended by many consumers), we just double check the state of the shared queue after marking the consumer as "paused". If the other thread has emptied the queue in the meantime, we'll be guaranteed to unblock the consumer. (cherry picked from commit f1d66d1a635dfcdc3335a5c8d5ea6d68e38ae71c) --- .../java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 727f2d6..997358d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -259,6 +259,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { // mark this consumer to be resumed later: if No more space left in shared queue, // or if any consumer is already paused (to create fair chance for already paused consumers) pausedConsumers.add(consumer); + + // Since we din't get a mutex, the condition on the incoming queue might have changed after + // we have paused the current consumer. We need to re-check in order to avoid this consumer + // from getting stalled. + resumeReceivingFromPausedConsumersIfNeeded(); } else { // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid // recursion and stack overflow