This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 587548ec5f9e9a9fc6d465aca3cd3fbb76b9ee64
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Aug 25 00:33:35 2021 -0700

    Fixed race condition on multi-topic consumer (#11764)
    
    ### Motivation
    
    Under certain conditions applications using the multi-topic consumers might 
get the consumption stalled:
    
    The conditions to reproduce the issue are:
     * Consumer is subscribed to multiple topics, but only 1 topic has traffic
     * Messages are published in batches (no repro if no batches)
     * Receiver queue size == 1 (or small, in order to exercise race condition)
    
    The problem is that there is race condition between 2 threads when we're 
deciding to put one of the individual consumers in "paused" state, when the 
shared queue is full.
    
    What happens is that, just after we checked the conditions and we decide to 
mark the consumer as paused, the application  has emptied the shared queue 
completely. From that point on, there is no re-attempt to check whether we need 
to unblock that consumer.
    
    ### Modification
    
    Instead of introducing a sync block (contended by many consumers), we just 
double check the state of the shared queue after marking the consumer as 
"paused". If the other thread has emptied the queue in the meantime, we'll be 
guaranteed to unblock the consumer.
    
    (cherry picked from commit f1d66d1a635dfcdc3335a5c8d5ea6d68e38ae71c)
---
 .../java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java  | 5 +++++
 1 file changed, 5 insertions(+)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 727f2d6..997358d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -259,6 +259,11 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 // mark this consumer to be resumed later: if No more space 
left in shared queue,
                 // or if any consumer is already paused (to create fair chance 
for already paused consumers)
                 pausedConsumers.add(consumer);
+
+                // Since we din't get a mutex, the condition on the incoming 
queue might have changed after
+                // we have paused the current consumer. We need to re-check in 
order to avoid this consumer
+                // from getting stalled.
+                resumeReceivingFromPausedConsumersIfNeeded();
             } else {
                 // Schedule next receiveAsync() if the incoming queue is not 
full. Use a different thread to avoid
                 // recursion and stack overflow

Reply via email to