This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 5ed247de3a5 [fix][client] Fix NPE of MultiTopicsConsumerImpl due to 
race condition (#18287)
5ed247de3a5 is described below

commit 5ed247de3a5e0ff67f181b4805fd6140d8174994
Author: Penghui Li <[email protected]>
AuthorDate: Wed Nov 2 18:19:10 2022 +0800

    [fix][client] Fix NPE of MultiTopicsConsumerImpl due to race condition 
(#18287)
    
    (cherry picked from commit 516db5176b5f5ea8a9fe8072b71482abc880faee)
---
 .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java   | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index aad4c622734..71fef6f83f0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -250,6 +250,10 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 log.debug("[{}] [{}] Receive message from sub consumer:{}",
                     topic, subscription, consumer.getTopic());
             }
+            // Stop to process the remaining message after the consumer is 
closed.
+            if (getState() == State.Closed) {
+                return;
+            }
             // Process the message, add to the queue and trigger listener or 
async callback
             messageReceived(consumer, message);
 
@@ -533,14 +537,14 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             .map(ConsumerImpl::unsubscribeAsync).collect(Collectors.toList());
 
         FutureUtil.waitForAll(futureList)
-            .thenCompose((r) -> {
+            .thenComposeAsync((r) -> {
                 setState(State.Closed);
                 cleanupMultiConsumer();
                 log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer",
                         topic, subscription, consumerName);
                 // fail all pending-receive futures to notify application
                 return failPendingReceive();
-            })
+            }, internalPinnedExecutor)
             .whenComplete((r, ex) -> {
                 if (ex == null) {
                     unsubscribeFuture.complete(null);
@@ -575,13 +579,13 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             .map(ConsumerImpl::closeAsync).collect(Collectors.toList());
 
         FutureUtil.waitForAll(futureList)
-            .thenCompose((r) -> {
+            .thenComposeAsync((r) -> {
                 setState(State.Closed);
                 cleanupMultiConsumer();
                 log.info("[{}] [{}] Closed Topics Consumer", topic, 
subscription);
                 // fail all pending-receive futures to notify application
                 return failPendingReceive();
-            })
+            }, internalPinnedExecutor)
             .whenComplete((r, ex) -> {
                 if (ex == null) {
                     closeFuture.complete(null);

Reply via email to