This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3574bbd4fb440c343f135f8bb15e6f7d3db59460
Author: Jiwei Guo <techno...@apache.org>
AuthorDate: Mon Dec 13 16:59:50 2021 +0800

    Fix consume message order issue when use listener. (#13023)
    
    (cherry picked from commit e134e372b3cc007bb507f04076011407cc28b7c0)
---
 .../apache/pulsar/client/impl/ConsumerBase.java    | 42 +++++++++++-----------
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  3 +-
 2 files changed, 23 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 87e4ee9..a78dad8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -910,30 +910,32 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
     protected void triggerListener() {
         // Trigger the notification on the message listener in a separate 
thread to avoid blocking the networking
         // thread while the message processing happens
-        try {
-            // Control executor to call MessageListener one by one.
-            if (executorQueueSize.get() < 1) {
-                final Message<T> msg = internalReceive(0, 
TimeUnit.MILLISECONDS);
-                if (msg != null) {
-                    executorQueueSize.incrementAndGet();
-                    if (SubscriptionType.Key_Shared == 
conf.getSubscriptionType()) {
-                        
executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
-                                callMessageListener(msg));
-                    } else {
-                        getExternalExecutor(msg).execute(() -> {
-                            callMessageListener(msg);
-                        });
+        internalPinnedExecutor.execute(() -> {
+            try {
+                // Control executor to call MessageListener one by one.
+                if (executorQueueSize.get() < 1) {
+                    final Message<T> msg = internalReceive(0, 
TimeUnit.MILLISECONDS);
+                    if (msg != null) {
+                        executorQueueSize.incrementAndGet();
+                        if (SubscriptionType.Key_Shared == 
conf.getSubscriptionType()) {
+                            
executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
+                                    callMessageListener(msg));
+                        } else {
+                            getExternalExecutor(msg).execute(() -> {
+                                callMessageListener(msg);
+                            });
+                        }
                     }
                 }
+            } catch (PulsarClientException e) {
+                log.warn("[{}] [{}] Failed to dequeue the message for 
listener", topic, subscription, e);
+                return;
             }
-        } catch (PulsarClientException e) {
-            log.warn("[{}] [{}] Failed to dequeue the message for listener", 
topic, subscription, e);
-            return;
-        }
 
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] [{}] Message has been cleared from the queue", 
topic, subscription);
-        }
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] [{}] Message has been cleared from the queue", 
topic, subscription);
+            }
+        });
     }
 
     protected void callMessageListener(Message<T> msg) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a872db0..f3e6b99 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1086,8 +1086,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
             uncompressedPayload.release();
         }
-        internalPinnedExecutor.execute(()
-                -> tryTriggerListener());
+        tryTriggerListener();
 
     }
 

Reply via email to