This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 386dedc Fix nullptr after listener consumer closed (#510)
386dedc is described below
commit 386dedc47bde8c46fae1705afdf66e4b972c7447
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Oct 16 21:44:04 2025 +0800
Fix nullptr after listener consumer closed (#510)
---
lib/MultiTopicsConsumerImpl.cc | 4 +++-
tests/ConsumerTest.cc | 41 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 44 insertions(+), 1 deletion(-)
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index eb3546d..7d73403 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -556,7 +556,9 @@ void MultiTopicsConsumerImpl::messageReceived(const
Consumer& consumer, const Me
void MultiTopicsConsumerImpl::internalListener(const Consumer& consumer) {
Message m;
- incomingMessages_.pop(m);
+ if (!incomingMessages_.pop(m)) {
+ return;
+ }
try {
Consumer self{get_shared_this_ptr()};
messageProcessed(m);
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index b4c7261..dfbc276 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1519,4 +1519,45 @@ TEST(ConsumerTest, testDuplicatedTopics) {
}
}
+TEST(ConsumerTest, testConsumerListenerShouldNotSegfaultAfterClose) {
+ Client client(lookupUrl);
+
+ const int MSG_COUNT = 100;
+ std::string topicName = "persistent://public/default/my-topic-" +
std::to_string(time(nullptr));
+
+ // 1. Create producer send 100 msgs
+ Producer producer;
+ ProducerConfiguration producerConfig;
+ producerConfig.setBatchingEnabled(false);
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig,
producer));
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ std::string msg = "my-message-" + std::to_string(i);
+ Message message = MessageBuilder().setContent(msg).build();
+ ASSERT_EQ(ResultOk, producer.send(message));
+ }
+
+ // 2. Create consumer with listener
+ Consumer consumer;
+ ConsumerConfiguration consumerConfig;
+ consumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest);
+ Latch latchFirstReceiveMsg(1);
+ Latch latchAfterClosed(1);
+ consumerConfig.setMessageListener(
+ [&latchFirstReceiveMsg, &latchAfterClosed](Consumer consumer, const
Message& msg) {
+ latchFirstReceiveMsg.countdown();
+ LOG_INFO("Consume message: " << msg.getDataAsString());
+ latchAfterClosed.wait();
+ });
+ auto result = client.subscribe(topicName, "test-sub", consumerConfig,
consumer);
+ ASSERT_EQ(ResultOk, result);
+
+ // 3. wait first message consumed in listener and then close consumer.
+ latchFirstReceiveMsg.wait();
+ ASSERT_EQ(ResultOk, consumer.close());
+ latchAfterClosed.countdown();
+
+ ASSERT_EQ(ResultOk, producer.close());
+ ASSERT_EQ(ResultOk, client.close());
+}
+
} // namespace pulsar