This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d08f64  Remove consumer unnecessary locks (#9261)
9d08f64 is described below

commit 9d08f64a827ad670c89cf708ddd409e1d5b5f763
Author: hangc0276 <[email protected]>
AuthorDate: Thu Apr 1 10:45:40 2021 +0800

    Remove consumer unnecessary locks (#9261)
    
    ### Motivation
    1. The `ConsumerImpl` has many unnecessary locks for thread-safe Queue, 
such as `Queues.newConcurrentLinkedQueue`, `GrowableArrayBlockingQueue`, 
`ConcurrentLinkedQueue`
    
    ### Changes
    1. Remove unnecessary locks in `ConsumerImpl`
    
    Related to PR#8207
---
 .../apache/pulsar/client/impl/ConsumerBase.java    |  6 ++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 60 ++++++++--------------
 .../client/impl/MultiTopicsConsumerImpl.java       | 23 ++++-----
 3 files changed, 35 insertions(+), 54 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index dfce6ed..28c248f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -35,10 +35,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import io.netty.util.Timeout;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerEventListener;
@@ -733,6 +733,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         if (opBatchReceive == null) {
             return;
         }
+
         try {
             reentrantLock.lock();
             notifyPendingBatchReceivedCallBack(opBatchReceive);
@@ -790,6 +791,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
             }
             msgPeeked = incomingMessages.peek();
         }
+
         completePendingBatchReceive(opBatchReceive.future, messages);
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c7f67f4..d57fc39 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -409,7 +409,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         CompletableFuture<Message<T>> result = 
cancellationHandler.createFuture();
         Message<T> message = null;
         try {
-            lock.writeLock().lock();
             message = incomingMessages.poll(0, TimeUnit.MILLISECONDS);
             if (message == null) {
                 pendingReceives.add(result);
@@ -418,8 +417,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             result.completeExceptionally(e);
-        } finally {
-            lock.writeLock().unlock();
         }
 
         if (message != null) {
@@ -496,6 +493,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         } finally {
             lock.writeLock().unlock();
         }
+
         return result;
     }
 
@@ -948,14 +946,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     private void failPendingReceive() {
-        lock.readLock().lock();
-        try {
-            if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
-                failPendingReceives(this.pendingReceives);
-                failPendingBatchReceives(this.pendingBatchReceives);
-            }
-        } finally {
-            lock.readLock().unlock();
+        if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
+            failPendingReceives(this.pendingReceives);
+            failPendingBatchReceives(this.pendingBatchReceives);
         }
     }
 
@@ -1053,23 +1046,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     uncompressedPayload, createEncryptionContext(msgMetadata), 
cnx, schema, redeliveryCount);
             uncompressedPayload.release();
 
-            lock.readLock().lock();
-            try {
-                // Enqueue the message so that it can be retrieved when 
application calls receive()
-                // if the conf.getReceiverQueueSize() is 0 then discard 
message if no one is waiting for it.
-                // if asyncReceive is waiting then notify callback without 
adding to incomingMessages queue
-                if (deadLetterPolicy != null && 
possibleSendToDeadLetterTopicMessages != null && redeliveryCount >= 
deadLetterPolicy.getMaxRedeliverCount()) {
-                    
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
 Collections.singletonList(message));
-                }
-                if (peekPendingReceive() != null) {
-                    notifyPendingReceivedCallback(message, null);
-                } else if (enqueueMessageAndCheckBatchReceive(message)) {
-                    if (hasPendingBatchReceive()) {
-                        notifyPendingBatchReceivedCallBack();
-                    }
-                }
-            } finally {
-                lock.readLock().unlock();
+            // Enqueue the message so that it can be retrieved when 
application calls receive()
+            // if the conf.getReceiverQueueSize() is 0 then discard message if 
no one is waiting for it.
+            // if asyncReceive is waiting then notify callback without adding 
to incomingMessages queue
+            if (deadLetterPolicy != null && 
possibleSendToDeadLetterTopicMessages != null &&
+                    redeliveryCount >= 
deadLetterPolicy.getMaxRedeliverCount()) {
+                
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
+                        Collections.singletonList(message));
+            }
+            if (peekPendingReceive() != null) {
+                notifyPendingReceivedCallback(message, null);
+            } else if (enqueueMessageAndCheckBatchReceive(message) && 
hasPendingBatchReceive()) {
+                notifyPendingBatchReceivedCallBack();
             }
         } else {
             // handle batch message enqueuing; uncompressed payload has all 
messages in batch
@@ -1280,17 +1268,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
                 }
-                lock.readLock().lock();
-                try {
-                    if (peekPendingReceive() != null) {
-                        notifyPendingReceivedCallback(message, null);
-                    } else if (enqueueMessageAndCheckBatchReceive(message)) {
-                        if (hasPendingBatchReceive()) {
-                            notifyPendingBatchReceivedCallBack();
-                        }
-                    }
-                } finally {
-                    lock.readLock().unlock();
+
+                if (peekPendingReceive() != null) {
+                    notifyPendingReceivedCallback(message, null);
+                } else if (enqueueMessageAndCheckBatchReceive(message) && 
hasPendingBatchReceive()) {
+                    notifyPendingBatchReceivedCallBack();
                 }
                 singleMessagePayload.release();
             }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 39f7715..5263401 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -97,8 +97,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     private volatile Timeout partitionsAutoUpdateTimeout = null;
     TopicsPartitionChangedListener topicsPartitionChangedListener;
     CompletableFuture<Void> partitionsAutoUpdateFuture = null;
-
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
     private final ConsumerStatsRecorder stats;
     private final UnAckedMessageTracker unAckedMessageTracker;
     private final ConsumerConfigurationData<T> internalConfig;
@@ -386,6 +386,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         } finally {
             lock.writeLock().unlock();
         }
+
         return result;
     }
 
@@ -595,18 +596,14 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
     @Override
     public void redeliverUnacknowledgedMessages() {
-        lock.writeLock().lock();
-        try {
-            consumers.values().stream().forEach(consumer -> {
-                consumer.redeliverUnacknowledgedMessages();
-                consumer.unAckedChunkedMessageIdSequenceMap.clear();
-            });
-            incomingMessages.clear();
-            resetIncomingMessageSize();
-            unAckedMessageTracker.clear();
-        } finally {
-            lock.writeLock().unlock();
-        }
+        consumers.values().stream().forEach(consumer -> {
+            consumer.redeliverUnacknowledgedMessages();
+            consumer.unAckedChunkedMessageIdSequenceMap.clear();
+        });
+        incomingMessages.clear();
+        resetIncomingMessageSize();
+        unAckedMessageTracker.clear();
+
         resumeReceivingFromPausedConsumersIfNeeded();
     }
 

Reply via email to