This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 861b198 Use shared_ptr for topic name in message ids (#218)
861b198 is described below
commit 861b198b7c93d4a9afaa8c20307060d7e0e9465f
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Mar 15 20:01:26 2023 -0700
Use shared_ptr for topic name in message ids (#218)
### Motivation
`Message` and `MessageId` are exposing `const std::string& getTopic()`
method that returns a reference to an internal variable.
If the consumer is destroyed the message & messageId object will become
invalid and calling `getTopic()` will read an invalid memory location.
The current situation is very error prone and can be a bigger problem when
using regex subscription, since a topic can be deleted and the consumer will be
closed in the background.
### Modifications
Instead of keeping a `std::string&`, changed to use a
`std::shared_ptr<std::string>` so that `Message` and `MessageId` instances will
be valid regardless of the consumer.
---
include/pulsar/Message.h | 2 +-
include/pulsar/MessageId.h | 3 +++
lib/BatchMessageContainerBase.h | 2 +-
lib/Commands.cc | 2 +-
lib/ConsumerImpl.cc | 8 ++++----
lib/ConsumerImplBase.h | 1 +
lib/HandlerBase.cc | 4 ++--
lib/HandlerBase.h | 2 +-
lib/Message.cc | 4 ++--
lib/MessageBatch.cc | 2 +-
lib/MessageId.cc | 4 +++-
lib/MessageIdImpl.h | 4 ++--
lib/MessageImpl.cc | 4 ++--
lib/MessageImpl.h | 4 ++--
lib/MultiTopicsConsumerImpl.cc | 5 ++---
lib/ProducerImpl.cc | 8 ++++----
16 files changed, 32 insertions(+), 27 deletions(-)
diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index f9e037e..47bc974 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -191,7 +191,7 @@ class PULSAR_PUBLIC Message {
Message(const MessageId& messageId, proto::MessageMetadata& metadata,
SharedBuffer& payload);
/// Used for Batch Messages
Message(const MessageId& messageId, proto::MessageMetadata& metadata,
SharedBuffer& payload,
- proto::SingleMessageMetadata& singleMetadata, const std::string&
topicName);
+ proto::SingleMessageMetadata& singleMetadata, const
std::shared_ptr<std::string>& topicName);
friend class PartitionedProducerImpl;
friend class MultiTopicsConsumerImpl;
friend class MessageBuilder;
diff --git a/include/pulsar/MessageId.h b/include/pulsar/MessageId.h
index 3871e87..e859e3c 100644
--- a/include/pulsar/MessageId.h
+++ b/include/pulsar/MessageId.h
@@ -72,6 +72,7 @@ class PULSAR_PUBLIC MessageId {
/**
* Set the topicName
+ * @deprecated This method will be eventually removed
*/
void setTopicName(const std::string& topicName);
@@ -110,6 +111,8 @@ class PULSAR_PUBLIC MessageId {
friend class MessageIdBuilder;
friend class ChunkMessageIdImpl;
+ void setTopicName(const std::shared_ptr<std::string>& topic);
+
friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const
MessageId& messageId);
typedef std::shared_ptr<MessageIdImpl> MessageIdImplPtr;
diff --git a/lib/BatchMessageContainerBase.h b/lib/BatchMessageContainerBase.h
index e9cf7ef..fe4e5df 100644
--- a/lib/BatchMessageContainerBase.h
+++ b/lib/BatchMessageContainerBase.h
@@ -115,7 +115,7 @@ class BatchMessageContainerBase : public boost::noncopyable
{
protected:
// references to ProducerImpl's fields
- const std::string& topicName_;
+ const std::shared_ptr<std::string> topicName_;
const ProducerConfiguration& producerConfig_;
const std::string& producerName_;
const uint64_t& producerId_;
diff --git a/lib/Commands.cc b/lib/Commands.cc
index a04a386..0089b86 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -868,7 +868,7 @@ Message Commands::deSerializeSingleMessageInBatch(Message&
batchedMessage, int32
auto messageId =
MessageIdBuilder::from(m).batchIndex(batchIndex).batchSize(batchSize).build();
auto batchedMessageId =
std::make_shared<BatchedMessageIdImpl>(*(messageId.impl_), acker);
Message singleMessage(MessageId{batchedMessageId},
batchedMessage.impl_->metadata, payload, metadata,
- batchedMessage.impl_->getTopicName());
+ batchedMessage.impl_->topicName_);
singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_;
return singleMessage;
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 1f26a70..996ffbf 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -175,14 +175,14 @@ Future<Result, ConsumerImplBaseWeakPtr>
ConsumerImpl::getConsumerCreatedFuture()
const std::string& ConsumerImpl::getSubscriptionName() const { return
originalSubscriptionName_; }
-const std::string& ConsumerImpl::getTopic() const { return topic_; }
+const std::string& ConsumerImpl::getTopic() const { return *topic_; }
void ConsumerImpl::start() {
HandlerBase::start();
// Initialize ackGroupingTrackerPtr_ here because the
get_shared_this_ptr() was not initialized until the
// constructor completed.
- if (TopicName::get(topic_)->isPersistent()) {
+ if (TopicName::get(*topic_)->isPersistent()) {
if (config_.getAckGroupingTimeMs() > 0) {
ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
client_.lock(), get_shared_this_ptr(), consumerId_,
config_.getAckGroupingTimeMs(),
@@ -225,7 +225,7 @@ void ConsumerImpl::connectionOpened(const
ClientConnectionPtr& cnx) {
ClientImplPtr client = client_.lock();
uint64_t requestId = client->newRequestId();
SharedBuffer cmd = Commands::newSubscribe(
- topic_, subscription_, consumerId_, requestId, getSubType(),
consumerName_, subscriptionMode_,
+ *topic_, subscription_, consumerId_, requestId, getSubType(),
consumerName_, subscriptionMode_,
subscribeMessageId, readCompacted_, config_.getProperties(),
config_.getSubscriptionProperties(),
config_.getSchema(), getInitialPosition(),
config_.isReplicateSubscriptionStateEnabled(),
config_.getKeySharedPolicy(), config_.getPriorityLevel());
@@ -679,7 +679,7 @@ uint32_t
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
// This is a cheap copy since message contains only one shared pointer
(impl_)
Message msg =
Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize, acker);
msg.impl_->setRedeliveryCount(redeliveryCount);
- msg.impl_->setTopicName(batchedMessage.getTopicName());
+ msg.impl_->setTopicName(batchedMessage.impl_->topicName_);
msg.impl_->convertPayloadToKeyValue(config_.getSchema());
if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) {
diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h
index 73c9161..5116fd0 100644
--- a/lib/ConsumerImplBase.h
+++ b/lib/ConsumerImplBase.h
@@ -101,6 +101,7 @@ class ConsumerImplBase : public HandlerBase, public
std::enable_shared_from_this
private:
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0;
+ friend class MultiTopicsConsumerImpl;
friend class PulsarFriend;
};
} // namespace pulsar
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 1e13fb1..501f5fc 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -30,7 +30,7 @@ namespace pulsar {
HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string&
topic, const Backoff& backoff)
: client_(client),
- topic_(topic),
+ topic_(std::make_shared<std::string>(topic)),
executor_(client->getIOExecutorProvider()->get()),
mutex_(),
creationTimestamp_(TimeUtils::now()),
@@ -71,7 +71,7 @@ void HandlerBase::grabCnx() {
}
LOG_INFO(getName() << "Getting connection from pool");
ClientImplPtr client = client_.lock();
- Future<Result, ClientConnectionWeakPtr> future =
client->getConnection(topic_);
+ Future<Result, ClientConnectionWeakPtr> future =
client->getConnection(*topic_);
future.addListener(std::bind(&HandlerBase::handleNewConnection,
std::placeholders::_1,
std::placeholders::_2, get_weak_from_this()));
}
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 0cc8968..3b12f09 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -101,7 +101,7 @@ class HandlerBase {
protected:
ClientImplWeakPtr client_;
- const std::string topic_;
+ const std::shared_ptr<std::string> topic_;
ExecutorServicePtr executor_;
mutable std::mutex mutex_;
std::mutex pendingReceiveMutex_;
diff --git a/lib/Message.cc b/lib/Message.cc
index 545c893..98364f1 100644
--- a/lib/Message.cc
+++ b/lib/Message.cc
@@ -77,13 +77,13 @@ Message::Message(const MessageId& messageId,
proto::MessageMetadata& metadata, S
}
Message::Message(const MessageId& messageID, proto::MessageMetadata& metadata,
SharedBuffer& payload,
- proto::SingleMessageMetadata& singleMetadata, const
std::string& topicName)
+ proto::SingleMessageMetadata& singleMetadata, const
std::shared_ptr<std::string>& topicName)
: impl_(std::make_shared<MessageImpl>()) {
impl_->messageId = messageID;
impl_->metadata = metadata;
impl_->payload = payload;
impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties());
- impl_->topicName_ = &topicName;
+ impl_->topicName_ = topicName;
impl_->metadata.clear_properties();
if (singleMetadata.properties_size() > 0) {
diff --git a/lib/MessageBatch.cc b/lib/MessageBatch.cc
index d0a3d58..f2c1cb2 100644
--- a/lib/MessageBatch.cc
+++ b/lib/MessageBatch.cc
@@ -25,7 +25,7 @@
namespace pulsar {
-const static std::string emptyString;
+const static std::shared_ptr<std::string> emptyString;
MessageBatch::MessageBatch() : impl_(std::make_shared<MessageImpl>()),
batchMessage_(impl_) {
impl_->setTopicName(emptyString);
diff --git a/lib/MessageId.cc b/lib/MessageId.cc
index ebe52c1..12b6f40 100644
--- a/lib/MessageId.cc
+++ b/lib/MessageId.cc
@@ -168,7 +168,9 @@ PULSAR_PUBLIC bool MessageId::operator!=(const MessageId&
other) const { return
PULSAR_PUBLIC const std::string& MessageId::getTopicName() const { return
impl_->getTopicName(); }
PULSAR_PUBLIC void MessageId::setTopicName(const std::string& topicName) {
- return impl_->setTopicName(topicName);
+ return setTopicName(std::make_shared<std::string>(topicName));
}
+void MessageId::setTopicName(const std::shared_ptr<std::string>& topic) {
return impl_->setTopicName(topic); }
+
} // namespace pulsar
diff --git a/lib/MessageIdImpl.h b/lib/MessageIdImpl.h
index 70892fa..dbab71a 100644
--- a/lib/MessageIdImpl.h
+++ b/lib/MessageIdImpl.h
@@ -69,7 +69,7 @@ class MessageIdImpl {
int32_t batchSize_ = 0;
const std::string& getTopicName() { return *topicName_; }
- void setTopicName(const std::string& topicName) { topicName_ = &topicName;
}
+ void setTopicName(const std::shared_ptr<std::string>& topicName) {
topicName_ = topicName; }
virtual const BitSet& getBitSet() const noexcept {
static const BitSet emptyBitSet;
@@ -77,7 +77,7 @@ class MessageIdImpl {
}
private:
- const std::string* topicName_ = nullptr;
+ std::shared_ptr<std::string> topicName_;
friend class MessageImpl;
friend class MultiTopicsConsumerImpl;
friend class UnAckedMessageTrackerEnabled;
diff --git a/lib/MessageImpl.cc b/lib/MessageImpl.cc
index 63232e7..e70ef5d 100644
--- a/lib/MessageImpl.cc
+++ b/lib/MessageImpl.cc
@@ -85,8 +85,8 @@ void MessageImpl::setOrderingKey(const std::string&
orderingKey) { metadata.set_
void MessageImpl::setEventTimestamp(uint64_t eventTimestamp) {
metadata.set_event_time(eventTimestamp); }
-void MessageImpl::setTopicName(const std::string& topicName) {
- topicName_ = &topicName;
+void MessageImpl::setTopicName(const std::shared_ptr<std::string>& topicName) {
+ topicName_ = topicName;
messageId.setTopicName(topicName);
}
diff --git a/lib/MessageImpl.h b/lib/MessageImpl.h
index 790a021..1046ede 100644
--- a/lib/MessageImpl.h
+++ b/lib/MessageImpl.h
@@ -44,7 +44,7 @@ class MessageImpl {
std::shared_ptr<KeyValueImpl> keyValuePtr;
MessageId messageId;
ClientConnection* cnx_;
- const std::string* topicName_;
+ std::shared_ptr<std::string> topicName_;
int redeliveryCount_;
bool hasSchemaVersion_;
const std::string* schemaVersion_;
@@ -66,7 +66,7 @@ class MessageImpl {
/**
* Set a valid topicName
*/
- void setTopicName(const std::string& topicName);
+ void setTopicName(const std::shared_ptr<std::string>& topicName);
int getRedeliveryCount();
void setRedeliveryCount(int count);
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index c43bf2f..52af40f 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -513,8 +513,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback
originalCallback) {
void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const
Message& msg) {
LOG_DEBUG("Received Message from one of the topic - " <<
consumer.getTopic()
<< " message:" <<
msg.getDataAsString());
- const std::string& topicPartitionName = consumer.getTopic();
- msg.impl_->setTopicName(topicPartitionName);
+ msg.impl_->setTopicName(consumer.impl_->topic_);
Lock lock(pendingReceiveMutex_);
if (!pendingReceives_.empty()) {
@@ -729,7 +728,7 @@ Future<Result, ConsumerImplBaseWeakPtr>
MultiTopicsConsumerImpl::getConsumerCrea
}
const std::string& MultiTopicsConsumerImpl::getSubscriptionName() const {
return subscriptionName_; }
-const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic_; }
+const std::string& MultiTopicsConsumerImpl::getTopic() const { return *topic_;
}
const std::string& MultiTopicsConsumerImpl::getName() const { return
consumerStr_; }
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 2fd31cd..2dbd6c8 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -67,7 +67,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const
TopicName& topicName,
partition_(partition),
producerName_(conf_.getProducerName()),
userProvidedProducerName_(false),
- producerStr_("[" + topic_ + ", " + producerName_ + "] "),
+ producerStr_("[" + *topic_ + ", " + producerName_ + "] "),
producerId_(client->newProducerId()),
msgSequenceGenerator_(0),
batchTimer_(executor_->createDeadlineTimer()),
@@ -131,7 +131,7 @@ ProducerImpl::~ProducerImpl() {
}
}
-const std::string& ProducerImpl::getTopic() const { return topic_; }
+const std::string& ProducerImpl::getTopic() const { return *topic_; }
const std::string& ProducerImpl::getProducerName() const { return
producerName_; }
@@ -152,7 +152,7 @@ void ProducerImpl::connectionOpened(const
ClientConnectionPtr& cnx) {
ClientImplPtr client = client_.lock();
int requestId = client->newRequestId();
- SharedBuffer cmd = Commands::newProducer(topic_, producerId_,
producerName_, requestId,
+ SharedBuffer cmd = Commands::newProducer(*topic_, producerId_,
producerName_, requestId,
conf_.getProperties(),
conf_.getSchema(), epoch_,
userProvidedProducerName_,
conf_.isEncryptionEnabled(),
static_cast<proto::ProducerAccessMode>(conf_.getAccessMode()),
@@ -209,7 +209,7 @@ void ProducerImpl::handleCreateProducer(const
ClientConnectionPtr& cnx, Result r
cnx->registerProducer(producerId_, shared_from_this());
producerName_ = responseData.producerName;
schemaVersion_ = responseData.schemaVersion;
- producerStr_ = "[" + topic_ + ", " + producerName_ + "] ";
+ producerStr_ = "[" + *topic_ + ", " + producerName_ + "] ";
topicEpoch = responseData.topicEpoch;
if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() ==
-1) {