This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 5ed247de3a5 [fix][client] Fix NPE of MultiTopicsConsumerImpl due to
race condition (#18287)
5ed247de3a5 is described below
commit 5ed247de3a5e0ff67f181b4805fd6140d8174994
Author: Penghui Li <[email protected]>
AuthorDate: Wed Nov 2 18:19:10 2022 +0800
[fix][client] Fix NPE of MultiTopicsConsumerImpl due to race condition
(#18287)
(cherry picked from commit 516db5176b5f5ea8a9fe8072b71482abc880faee)
---
.../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index aad4c622734..71fef6f83f0 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -250,6 +250,10 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
log.debug("[{}] [{}] Receive message from sub consumer:{}",
topic, subscription, consumer.getTopic());
}
+ // Stop to process the remaining message after the consumer is
closed.
+ if (getState() == State.Closed) {
+ return;
+ }
// Process the message, add to the queue and trigger listener or
async callback
messageReceived(consumer, message);
@@ -533,14 +537,14 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
.map(ConsumerImpl::unsubscribeAsync).collect(Collectors.toList());
FutureUtil.waitForAll(futureList)
- .thenCompose((r) -> {
+ .thenComposeAsync((r) -> {
setState(State.Closed);
cleanupMultiConsumer();
log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer",
topic, subscription, consumerName);
// fail all pending-receive futures to notify application
return failPendingReceive();
- })
+ }, internalPinnedExecutor)
.whenComplete((r, ex) -> {
if (ex == null) {
unsubscribeFuture.complete(null);
@@ -575,13 +579,13 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
.map(ConsumerImpl::closeAsync).collect(Collectors.toList());
FutureUtil.waitForAll(futureList)
- .thenCompose((r) -> {
+ .thenComposeAsync((r) -> {
setState(State.Closed);
cleanupMultiConsumer();
log.info("[{}] [{}] Closed Topics Consumer", topic,
subscription);
// fail all pending-receive futures to notify application
return failPendingReceive();
- })
+ }, internalPinnedExecutor)
.whenComplete((r, ex) -> {
if (ex == null) {
closeFuture.complete(null);