This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-3.8 in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
commit 009969a6913607d32fbcb38ffbf0a3d29f118c2c Author: Baodi Shi <[email protected]> AuthorDate: Thu Oct 16 21:44:04 2025 +0800 Fix nullptr after listener consumer closed (#510) (cherry picked from commit 386dedc47bde8c46fae1705afdf66e4b972c7447) --- lib/MultiTopicsConsumerImpl.cc | 4 +++- tests/ConsumerTest.cc | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index eb3546d..7d73403 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -556,7 +556,9 @@ void MultiTopicsConsumerImpl::messageReceived(const Consumer& consumer, const Me void MultiTopicsConsumerImpl::internalListener(const Consumer& consumer) { Message m; - incomingMessages_.pop(m); + if (!incomingMessages_.pop(m)) { + return; + } try { Consumer self{get_shared_this_ptr()}; messageProcessed(m); diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index b4c7261..dfbc276 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -1519,4 +1519,45 @@ TEST(ConsumerTest, testDuplicatedTopics) { } } +TEST(ConsumerTest, testConsumerListenerShouldNotSegfaultAfterClose) { + Client client(lookupUrl); + + const int MSG_COUNT = 100; + std::string topicName = "persistent://public/default/my-topic-" + std::to_string(time(nullptr)); + + // 1. Create producer send 100 msgs + Producer producer; + ProducerConfiguration producerConfig; + producerConfig.setBatchingEnabled(false); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer)); + for (int i = 0; i < MSG_COUNT; ++i) { + std::string msg = "my-message-" + std::to_string(i); + Message message = MessageBuilder().setContent(msg).build(); + ASSERT_EQ(ResultOk, producer.send(message)); + } + + // 2. Create consumer with listener + Consumer consumer; + ConsumerConfiguration consumerConfig; + consumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest); + Latch latchFirstReceiveMsg(1); + Latch latchAfterClosed(1); + consumerConfig.setMessageListener( + [&latchFirstReceiveMsg, &latchAfterClosed](Consumer consumer, const Message& msg) { + latchFirstReceiveMsg.countdown(); + LOG_INFO("Consume message: " << msg.getDataAsString()); + latchAfterClosed.wait(); + }); + auto result = client.subscribe(topicName, "test-sub", consumerConfig, consumer); + ASSERT_EQ(ResultOk, result); + + // 3. wait first message consumed in listener and then close consumer. + latchFirstReceiveMsg.wait(); + ASSERT_EQ(ResultOk, consumer.close()); + latchAfterClosed.countdown(); + + ASSERT_EQ(ResultOk, producer.close()); + ASSERT_EQ(ResultOk, client.close()); +} + } // namespace pulsar
