This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.8
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit 009969a6913607d32fbcb38ffbf0a3d29f118c2c
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Oct 16 21:44:04 2025 +0800

    Fix nullptr after listener consumer closed (#510)
    
    (cherry picked from commit 386dedc47bde8c46fae1705afdf66e4b972c7447)
---
 lib/MultiTopicsConsumerImpl.cc |  4 +++-
 tests/ConsumerTest.cc          | 41 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 44 insertions(+), 1 deletion(-)

diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index eb3546d..7d73403 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -556,7 +556,9 @@ void MultiTopicsConsumerImpl::messageReceived(const 
Consumer& consumer, const Me
 
 void MultiTopicsConsumerImpl::internalListener(const Consumer& consumer) {
     Message m;
-    incomingMessages_.pop(m);
+    if (!incomingMessages_.pop(m)) {
+        return;
+    }
     try {
         Consumer self{get_shared_this_ptr()};
         messageProcessed(m);
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index b4c7261..dfbc276 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1519,4 +1519,45 @@ TEST(ConsumerTest, testDuplicatedTopics) {
     }
 }
 
+TEST(ConsumerTest, testConsumerListenerShouldNotSegfaultAfterClose) {
+    Client client(lookupUrl);
+
+    const int MSG_COUNT = 100;
+    std::string topicName = "persistent://public/default/my-topic-" + 
std::to_string(time(nullptr));
+
+    // 1. Create producer send 100 msgs
+    Producer producer;
+    ProducerConfiguration producerConfig;
+    producerConfig.setBatchingEnabled(false);
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, 
producer));
+    for (int i = 0; i < MSG_COUNT; ++i) {
+        std::string msg = "my-message-" + std::to_string(i);
+        Message message = MessageBuilder().setContent(msg).build();
+        ASSERT_EQ(ResultOk, producer.send(message));
+    }
+
+    // 2. Create consumer with listener
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest);
+    Latch latchFirstReceiveMsg(1);
+    Latch latchAfterClosed(1);
+    consumerConfig.setMessageListener(
+        [&latchFirstReceiveMsg, &latchAfterClosed](Consumer consumer, const 
Message& msg) {
+            latchFirstReceiveMsg.countdown();
+            LOG_INFO("Consume message: " << msg.getDataAsString());
+            latchAfterClosed.wait();
+        });
+    auto result = client.subscribe(topicName, "test-sub", consumerConfig, 
consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // 3. wait first message consumed in listener and then close consumer.
+    latchFirstReceiveMsg.wait();
+    ASSERT_EQ(ResultOk, consumer.close());
+    latchAfterClosed.countdown();
+
+    ASSERT_EQ(ResultOk, producer.close());
+    ASSERT_EQ(ResultOk, client.close());
+}
+
 }  // namespace pulsar

Reply via email to