This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 861b198  Use shared_ptr for topic name in message ids (#218)
861b198 is described below

commit 861b198b7c93d4a9afaa8c20307060d7e0e9465f
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Mar 15 20:01:26 2023 -0700

    Use shared_ptr for topic name in message ids (#218)
    
    ### Motivation
    
    `Message` and `MessageId` are exposing `const std::string& getTopic()` 
method that returns a reference to an internal variable.
    
    If the consumer is destroyed the message & messageId object will become 
invalid and calling `getTopic()` will read an invalid memory location.
    
    The current situation is very error prone and can be a bigger problem when 
using regex subscription, since a topic can be deleted and the consumer will be 
closed in the background.
    
    ### Modifications
    
    Instead of keeping a `std::string&`, changed to use a 
`std::shared_ptr<std::string>` so that `Message` and `MessageId` instances will 
be valid regardless of the consumer.
---
 include/pulsar/Message.h        | 2 +-
 include/pulsar/MessageId.h      | 3 +++
 lib/BatchMessageContainerBase.h | 2 +-
 lib/Commands.cc                 | 2 +-
 lib/ConsumerImpl.cc             | 8 ++++----
 lib/ConsumerImplBase.h          | 1 +
 lib/HandlerBase.cc              | 4 ++--
 lib/HandlerBase.h               | 2 +-
 lib/Message.cc                  | 4 ++--
 lib/MessageBatch.cc             | 2 +-
 lib/MessageId.cc                | 4 +++-
 lib/MessageIdImpl.h             | 4 ++--
 lib/MessageImpl.cc              | 4 ++--
 lib/MessageImpl.h               | 4 ++--
 lib/MultiTopicsConsumerImpl.cc  | 5 ++---
 lib/ProducerImpl.cc             | 8 ++++----
 16 files changed, 32 insertions(+), 27 deletions(-)

diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index f9e037e..47bc974 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -191,7 +191,7 @@ class PULSAR_PUBLIC Message {
     Message(const MessageId& messageId, proto::MessageMetadata& metadata, 
SharedBuffer& payload);
     /// Used for Batch Messages
     Message(const MessageId& messageId, proto::MessageMetadata& metadata, 
SharedBuffer& payload,
-            proto::SingleMessageMetadata& singleMetadata, const std::string& 
topicName);
+            proto::SingleMessageMetadata& singleMetadata, const 
std::shared_ptr<std::string>& topicName);
     friend class PartitionedProducerImpl;
     friend class MultiTopicsConsumerImpl;
     friend class MessageBuilder;
diff --git a/include/pulsar/MessageId.h b/include/pulsar/MessageId.h
index 3871e87..e859e3c 100644
--- a/include/pulsar/MessageId.h
+++ b/include/pulsar/MessageId.h
@@ -72,6 +72,7 @@ class PULSAR_PUBLIC MessageId {
 
     /**
      * Set the topicName
+     * @deprecated This method will be eventually removed
      */
     void setTopicName(const std::string& topicName);
 
@@ -110,6 +111,8 @@ class PULSAR_PUBLIC MessageId {
     friend class MessageIdBuilder;
     friend class ChunkMessageIdImpl;
 
+    void setTopicName(const std::shared_ptr<std::string>& topic);
+
     friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const 
MessageId& messageId);
 
     typedef std::shared_ptr<MessageIdImpl> MessageIdImplPtr;
diff --git a/lib/BatchMessageContainerBase.h b/lib/BatchMessageContainerBase.h
index e9cf7ef..fe4e5df 100644
--- a/lib/BatchMessageContainerBase.h
+++ b/lib/BatchMessageContainerBase.h
@@ -115,7 +115,7 @@ class BatchMessageContainerBase : public boost::noncopyable 
{
 
    protected:
     // references to ProducerImpl's fields
-    const std::string& topicName_;
+    const std::shared_ptr<std::string> topicName_;
     const ProducerConfiguration& producerConfig_;
     const std::string& producerName_;
     const uint64_t& producerId_;
diff --git a/lib/Commands.cc b/lib/Commands.cc
index a04a386..0089b86 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -868,7 +868,7 @@ Message Commands::deSerializeSingleMessageInBatch(Message& 
batchedMessage, int32
     auto messageId = 
MessageIdBuilder::from(m).batchIndex(batchIndex).batchSize(batchSize).build();
     auto batchedMessageId = 
std::make_shared<BatchedMessageIdImpl>(*(messageId.impl_), acker);
     Message singleMessage(MessageId{batchedMessageId}, 
batchedMessage.impl_->metadata, payload, metadata,
-                          batchedMessage.impl_->getTopicName());
+                          batchedMessage.impl_->topicName_);
     singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_;
 
     return singleMessage;
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 1f26a70..996ffbf 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -175,14 +175,14 @@ Future<Result, ConsumerImplBaseWeakPtr> 
ConsumerImpl::getConsumerCreatedFuture()
 
 const std::string& ConsumerImpl::getSubscriptionName() const { return 
originalSubscriptionName_; }
 
-const std::string& ConsumerImpl::getTopic() const { return topic_; }
+const std::string& ConsumerImpl::getTopic() const { return *topic_; }
 
 void ConsumerImpl::start() {
     HandlerBase::start();
 
     // Initialize ackGroupingTrackerPtr_ here because the 
get_shared_this_ptr() was not initialized until the
     // constructor completed.
-    if (TopicName::get(topic_)->isPersistent()) {
+    if (TopicName::get(*topic_)->isPersistent()) {
         if (config_.getAckGroupingTimeMs() > 0) {
             ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
                 client_.lock(), get_shared_this_ptr(), consumerId_, 
config_.getAckGroupingTimeMs(),
@@ -225,7 +225,7 @@ void ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& cnx) {
     ClientImplPtr client = client_.lock();
     uint64_t requestId = client->newRequestId();
     SharedBuffer cmd = Commands::newSubscribe(
-        topic_, subscription_, consumerId_, requestId, getSubType(), 
consumerName_, subscriptionMode_,
+        *topic_, subscription_, consumerId_, requestId, getSubType(), 
consumerName_, subscriptionMode_,
         subscribeMessageId, readCompacted_, config_.getProperties(), 
config_.getSubscriptionProperties(),
         config_.getSchema(), getInitialPosition(), 
config_.isReplicateSubscriptionStateEnabled(),
         config_.getKeySharedPolicy(), config_.getPriorityLevel());
@@ -679,7 +679,7 @@ uint32_t 
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
         // This is a cheap copy since message contains only one shared pointer 
(impl_)
         Message msg = 
Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize, acker);
         msg.impl_->setRedeliveryCount(redeliveryCount);
-        msg.impl_->setTopicName(batchedMessage.getTopicName());
+        msg.impl_->setTopicName(batchedMessage.impl_->topicName_);
         msg.impl_->convertPayloadToKeyValue(config_.getSchema());
 
         if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) {
diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h
index 73c9161..5116fd0 100644
--- a/lib/ConsumerImplBase.h
+++ b/lib/ConsumerImplBase.h
@@ -101,6 +101,7 @@ class ConsumerImplBase : public HandlerBase, public 
std::enable_shared_from_this
    private:
     virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0;
 
+    friend class MultiTopicsConsumerImpl;
     friend class PulsarFriend;
 };
 }  // namespace pulsar
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 1e13fb1..501f5fc 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -30,7 +30,7 @@ namespace pulsar {
 
 HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& 
topic, const Backoff& backoff)
     : client_(client),
-      topic_(topic),
+      topic_(std::make_shared<std::string>(topic)),
       executor_(client->getIOExecutorProvider()->get()),
       mutex_(),
       creationTimestamp_(TimeUtils::now()),
@@ -71,7 +71,7 @@ void HandlerBase::grabCnx() {
     }
     LOG_INFO(getName() << "Getting connection from pool");
     ClientImplPtr client = client_.lock();
-    Future<Result, ClientConnectionWeakPtr> future = 
client->getConnection(topic_);
+    Future<Result, ClientConnectionWeakPtr> future = 
client->getConnection(*topic_);
     future.addListener(std::bind(&HandlerBase::handleNewConnection, 
std::placeholders::_1,
                                  std::placeholders::_2, get_weak_from_this()));
 }
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 0cc8968..3b12f09 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -101,7 +101,7 @@ class HandlerBase {
 
    protected:
     ClientImplWeakPtr client_;
-    const std::string topic_;
+    const std::shared_ptr<std::string> topic_;
     ExecutorServicePtr executor_;
     mutable std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
diff --git a/lib/Message.cc b/lib/Message.cc
index 545c893..98364f1 100644
--- a/lib/Message.cc
+++ b/lib/Message.cc
@@ -77,13 +77,13 @@ Message::Message(const MessageId& messageId, 
proto::MessageMetadata& metadata, S
 }
 
 Message::Message(const MessageId& messageID, proto::MessageMetadata& metadata, 
SharedBuffer& payload,
-                 proto::SingleMessageMetadata& singleMetadata, const 
std::string& topicName)
+                 proto::SingleMessageMetadata& singleMetadata, const 
std::shared_ptr<std::string>& topicName)
     : impl_(std::make_shared<MessageImpl>()) {
     impl_->messageId = messageID;
     impl_->metadata = metadata;
     impl_->payload = payload;
     
impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties());
-    impl_->topicName_ = &topicName;
+    impl_->topicName_ = topicName;
 
     impl_->metadata.clear_properties();
     if (singleMetadata.properties_size() > 0) {
diff --git a/lib/MessageBatch.cc b/lib/MessageBatch.cc
index d0a3d58..f2c1cb2 100644
--- a/lib/MessageBatch.cc
+++ b/lib/MessageBatch.cc
@@ -25,7 +25,7 @@
 
 namespace pulsar {
 
-const static std::string emptyString;
+const static std::shared_ptr<std::string> emptyString;
 
 MessageBatch::MessageBatch() : impl_(std::make_shared<MessageImpl>()), 
batchMessage_(impl_) {
     impl_->setTopicName(emptyString);
diff --git a/lib/MessageId.cc b/lib/MessageId.cc
index ebe52c1..12b6f40 100644
--- a/lib/MessageId.cc
+++ b/lib/MessageId.cc
@@ -168,7 +168,9 @@ PULSAR_PUBLIC bool MessageId::operator!=(const MessageId& 
other) const { return
 PULSAR_PUBLIC const std::string& MessageId::getTopicName() const { return 
impl_->getTopicName(); }
 
 PULSAR_PUBLIC void MessageId::setTopicName(const std::string& topicName) {
-    return impl_->setTopicName(topicName);
+    return setTopicName(std::make_shared<std::string>(topicName));
 }
 
+void MessageId::setTopicName(const std::shared_ptr<std::string>& topic) { 
return impl_->setTopicName(topic); }
+
 }  // namespace pulsar
diff --git a/lib/MessageIdImpl.h b/lib/MessageIdImpl.h
index 70892fa..dbab71a 100644
--- a/lib/MessageIdImpl.h
+++ b/lib/MessageIdImpl.h
@@ -69,7 +69,7 @@ class MessageIdImpl {
     int32_t batchSize_ = 0;
 
     const std::string& getTopicName() { return *topicName_; }
-    void setTopicName(const std::string& topicName) { topicName_ = &topicName; 
}
+    void setTopicName(const std::shared_ptr<std::string>& topicName) { 
topicName_ = topicName; }
 
     virtual const BitSet& getBitSet() const noexcept {
         static const BitSet emptyBitSet;
@@ -77,7 +77,7 @@ class MessageIdImpl {
     }
 
    private:
-    const std::string* topicName_ = nullptr;
+    std::shared_ptr<std::string> topicName_;
     friend class MessageImpl;
     friend class MultiTopicsConsumerImpl;
     friend class UnAckedMessageTrackerEnabled;
diff --git a/lib/MessageImpl.cc b/lib/MessageImpl.cc
index 63232e7..e70ef5d 100644
--- a/lib/MessageImpl.cc
+++ b/lib/MessageImpl.cc
@@ -85,8 +85,8 @@ void MessageImpl::setOrderingKey(const std::string& 
orderingKey) { metadata.set_
 
 void MessageImpl::setEventTimestamp(uint64_t eventTimestamp) { 
metadata.set_event_time(eventTimestamp); }
 
-void MessageImpl::setTopicName(const std::string& topicName) {
-    topicName_ = &topicName;
+void MessageImpl::setTopicName(const std::shared_ptr<std::string>& topicName) {
+    topicName_ = topicName;
     messageId.setTopicName(topicName);
 }
 
diff --git a/lib/MessageImpl.h b/lib/MessageImpl.h
index 790a021..1046ede 100644
--- a/lib/MessageImpl.h
+++ b/lib/MessageImpl.h
@@ -44,7 +44,7 @@ class MessageImpl {
     std::shared_ptr<KeyValueImpl> keyValuePtr;
     MessageId messageId;
     ClientConnection* cnx_;
-    const std::string* topicName_;
+    std::shared_ptr<std::string> topicName_;
     int redeliveryCount_;
     bool hasSchemaVersion_;
     const std::string* schemaVersion_;
@@ -66,7 +66,7 @@ class MessageImpl {
     /**
      * Set a valid topicName
      */
-    void setTopicName(const std::string& topicName);
+    void setTopicName(const std::shared_ptr<std::string>& topicName);
 
     int getRedeliveryCount();
     void setRedeliveryCount(int count);
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index c43bf2f..52af40f 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -513,8 +513,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback 
originalCallback) {
 void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const 
Message& msg) {
     LOG_DEBUG("Received Message from one of the topic - " << 
consumer.getTopic()
                                                           << " message:" << 
msg.getDataAsString());
-    const std::string& topicPartitionName = consumer.getTopic();
-    msg.impl_->setTopicName(topicPartitionName);
+    msg.impl_->setTopicName(consumer.impl_->topic_);
 
     Lock lock(pendingReceiveMutex_);
     if (!pendingReceives_.empty()) {
@@ -729,7 +728,7 @@ Future<Result, ConsumerImplBaseWeakPtr> 
MultiTopicsConsumerImpl::getConsumerCrea
 }
 const std::string& MultiTopicsConsumerImpl::getSubscriptionName() const { 
return subscriptionName_; }
 
-const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic_; }
+const std::string& MultiTopicsConsumerImpl::getTopic() const { return *topic_; 
}
 
 const std::string& MultiTopicsConsumerImpl::getName() const { return 
consumerStr_; }
 
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 2fd31cd..2dbd6c8 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -67,7 +67,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const 
TopicName& topicName,
       partition_(partition),
       producerName_(conf_.getProducerName()),
       userProvidedProducerName_(false),
-      producerStr_("[" + topic_ + ", " + producerName_ + "] "),
+      producerStr_("[" + *topic_ + ", " + producerName_ + "] "),
       producerId_(client->newProducerId()),
       msgSequenceGenerator_(0),
       batchTimer_(executor_->createDeadlineTimer()),
@@ -131,7 +131,7 @@ ProducerImpl::~ProducerImpl() {
     }
 }
 
-const std::string& ProducerImpl::getTopic() const { return topic_; }
+const std::string& ProducerImpl::getTopic() const { return *topic_; }
 
 const std::string& ProducerImpl::getProducerName() const { return 
producerName_; }
 
@@ -152,7 +152,7 @@ void ProducerImpl::connectionOpened(const 
ClientConnectionPtr& cnx) {
     ClientImplPtr client = client_.lock();
     int requestId = client->newRequestId();
 
-    SharedBuffer cmd = Commands::newProducer(topic_, producerId_, 
producerName_, requestId,
+    SharedBuffer cmd = Commands::newProducer(*topic_, producerId_, 
producerName_, requestId,
                                              conf_.getProperties(), 
conf_.getSchema(), epoch_,
                                              userProvidedProducerName_, 
conf_.isEncryptionEnabled(),
                                              
static_cast<proto::ProducerAccessMode>(conf_.getAccessMode()),
@@ -209,7 +209,7 @@ void ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result r
         cnx->registerProducer(producerId_, shared_from_this());
         producerName_ = responseData.producerName;
         schemaVersion_ = responseData.schemaVersion;
-        producerStr_ = "[" + topic_ + ", " + producerName_ + "] ";
+        producerStr_ = "[" + *topic_ + ", " + producerName_ + "] ";
         topicEpoch = responseData.topicEpoch;
 
         if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() == 
-1) {

Reply via email to