This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new ba52218 Fix possible crash caused by MessageId::getTopicName (#225)
ba52218 is described below
commit ba5221883772201497a45711caaf748189624a6b
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Mar 21 17:59:49 2023 +0800
Fix possible crash caused by MessageId::getTopicName (#225)
### Motivation
Currently if a `MessageId` does not have a topic name, the
`getTopicName` method will dereference a null pointer, which leads to a
crash. This case usually happens when an invalid message is
acknowledged, like
https://github.com/apache/pulsar-client-cpp/issues/224.
### Modifications
Return the const reference to an empty string in `getTopicName` when the
inner topic name field is a null pointer. Then, return
`ResultOperationNotSupported` when acknowledging such invalid messages.
---
include/pulsar/MessageId.h | 2 ++
lib/MessageIdImpl.h | 5 ++++-
lib/MultiTopicsConsumerImpl.cc | 10 ++++++++++
tests/AcknowledgeTest.cc | 13 +++++++++++++
tests/BasicEndToEndTest.cc | 2 +-
5 files changed, 30 insertions(+), 2 deletions(-)
diff --git a/include/pulsar/MessageId.h b/include/pulsar/MessageId.h
index e859e3c..a05a8fb 100644
--- a/include/pulsar/MessageId.h
+++ b/include/pulsar/MessageId.h
@@ -67,6 +67,8 @@ class PULSAR_PUBLIC MessageId {
/**
* Get the topic Name from which this message originated from
+ *
+ * @return the topic name or an empty string if there is no topic name
*/
const std::string& getTopicName() const;
diff --git a/lib/MessageIdImpl.h b/lib/MessageIdImpl.h
index dbab71a..f96b176 100644
--- a/lib/MessageIdImpl.h
+++ b/lib/MessageIdImpl.h
@@ -68,7 +68,10 @@ class MessageIdImpl {
int32_t batchIndex_ = -1;
int32_t batchSize_ = 0;
- const std::string& getTopicName() { return *topicName_; }
+ const std::string& getTopicName() {
+ static const std::string EMPTY_TOPIC = "";
+ return topicName_ ? *topicName_ : EMPTY_TOPIC;
+ }
void setTopicName(const std::shared_ptr<std::string>& topicName) {
topicName_ = topicName; }
virtual const BitSet& getBitSet() const noexcept {
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index e443d9a..878955f 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -661,6 +661,11 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const
MessageId& msgId, ResultCal
}
const std::string& topicPartitionName = msgId.getTopicName();
+ if (topicPartitionName.empty()) {
+ LOG_ERROR("MessageId without a topic name cannot be acknowledged for a
multi-topics consumer");
+ callback(ResultOperationNotSupported);
+ return;
+ }
auto optConsumer = consumers_.find(topicPartitionName);
if (optConsumer) {
@@ -681,6 +686,11 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const
MessageIdList& messageIdLis
std::unordered_map<std::string, MessageIdList> topicToMessageId;
for (const MessageId& messageId : messageIdList) {
auto topicName = messageId.getTopicName();
+ if (topicName.empty()) {
+ LOG_ERROR("MessageId without a topic name cannot be acknowledged
for a multi-topics consumer");
+ callback(ResultOperationNotSupported);
+ return;
+ }
topicToMessageId[topicName].emplace_back(messageId);
}
diff --git a/tests/AcknowledgeTest.cc b/tests/AcknowledgeTest.cc
index e30c7b1..0818466 100644
--- a/tests/AcknowledgeTest.cc
+++ b/tests/AcknowledgeTest.cc
@@ -302,4 +302,17 @@ TEST_F(AcknowledgeTest, testMixedCumulativeAck) {
ASSERT_EQ(ResultTimeout, consumer.getConsumer().receive(msg, 1000));
}
+TEST_F(AcknowledgeTest, testInvalidMessageId) {
+ Client client(lookupUrl);
+ std::vector<std::string> topics{"test-invalid-message-id0" + unique_str(),
+ "test-invalid-message-id1" + unique_str()};
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topics, "sub", consumer));
+
+ Message msg;
+ ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msg));
+ msg = MessageBuilder().setContent("msg").build();
+ ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msg));
+}
+
INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest,
testing::Values(100, 0));
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index d193017..e668f6f 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -537,7 +537,7 @@ void testPartitionedProducerConsumer(bool
lazyStartPartitionedProducers, std::st
ASSERT_EQ(consumer.getSubscriptionName(), "subscription-A");
for (int i = 0; i < 10; i++) {
Message m;
- consumer.receive(m, 10000);
+ ASSERT_EQ(ResultOk, consumer.receive(m, 10000));
consumer.acknowledge(m);
}
client.shutdown();