This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3574bbd4fb440c343f135f8bb15e6f7d3db59460 Author: Jiwei Guo <techno...@apache.org> AuthorDate: Mon Dec 13 16:59:50 2021 +0800 Fix consume message order issue when use listener. (#13023) (cherry picked from commit e134e372b3cc007bb507f04076011407cc28b7c0) --- .../apache/pulsar/client/impl/ConsumerBase.java | 42 +++++++++++----------- .../apache/pulsar/client/impl/ConsumerImpl.java | 3 +- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 87e4ee9..a78dad8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -910,30 +910,32 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T protected void triggerListener() { // Trigger the notification on the message listener in a separate thread to avoid blocking the networking // thread while the message processing happens - try { - // Control executor to call MessageListener one by one. - if (executorQueueSize.get() < 1) { - final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS); - if (msg != null) { - executorQueueSize.incrementAndGet(); - if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) { - executorProvider.getExecutor(peekMessageKey(msg)).execute(() -> - callMessageListener(msg)); - } else { - getExternalExecutor(msg).execute(() -> { - callMessageListener(msg); - }); + internalPinnedExecutor.execute(() -> { + try { + // Control executor to call MessageListener one by one. + if (executorQueueSize.get() < 1) { + final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS); + if (msg != null) { + executorQueueSize.incrementAndGet(); + if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) { + executorProvider.getExecutor(peekMessageKey(msg)).execute(() -> + callMessageListener(msg)); + } else { + getExternalExecutor(msg).execute(() -> { + callMessageListener(msg); + }); + } } } + } catch (PulsarClientException e) { + log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e); + return; } - } catch (PulsarClientException e) { - log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e); - return; - } - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription); - } + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription); + } + }); } protected void callMessageListener(Message<T> msg) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index a872db0..f3e6b99 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1086,8 +1086,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle uncompressedPayload.release(); } - internalPinnedExecutor.execute(() - -> tryTriggerListener()); + tryTriggerListener(); }