This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new d3962278 [ISSUE #1159] [C++] Use Pass-by-Value instead of 
Pass-by-Reference for attemptId in PushConsumer (#1160)
d3962278 is described below

commit d3962278fca112aeffcaf701549241fb37c7d9bf
Author: lizhimins <[email protected]>
AuthorDate: Fri Dec 26 15:20:47 2025 +0800

    [ISSUE #1159] [C++] Use Pass-by-Value instead of Pass-by-Reference for 
attemptId in PushConsumer (#1160)
---
 cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp       | 13 +++++++------
 cpp/source/rocketmq/ProcessQueueImpl.cpp                  |  6 +++---
 cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h |  8 ++++----
 3 files changed, 14 insertions(+), 13 deletions(-)

diff --git a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp 
b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
index d1c3ba30..72ee3f8a 100644
--- a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
+++ b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
@@ -36,7 +36,7 @@ 
AsyncReceiveMessageCallback::AsyncReceiveMessageCallback(std::weak_ptr<ProcessQu
 }
 
 void AsyncReceiveMessageCallback::onCompletion(
-    const std::error_code& ec, std::string& attempt_id, const 
ReceiveMessageResult& result) {
+    const std::error_code& ec, const std::string& attempt_id, const 
ReceiveMessageResult& result) {
 
   std::shared_ptr<ProcessQueue> process_queue = process_queue_.lock();
   if (!process_queue) {
@@ -76,7 +76,7 @@ void AsyncReceiveMessageCallback::onCompletion(
 
 const char* AsyncReceiveMessageCallback::RECEIVE_LATER_TASK_NAME = 
"receive-later-task";
 
-void AsyncReceiveMessageCallback::checkThrottleThenReceive(std::string 
attempt_id) {
+void AsyncReceiveMessageCallback::checkThrottleThenReceive(const std::string& 
attempt_id) {
   auto process_queue = process_queue_.lock();
   if (!process_queue) {
     SPDLOG_WARN("Process queue should have been destructed");
@@ -94,7 +94,7 @@ void 
AsyncReceiveMessageCallback::checkThrottleThenReceive(std::string attempt_i
   }
 }
 
-void 
AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds 
delay, std::string& attempt_id) {
+void 
AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds 
delay, const std::string& attempt_id) {
   auto process_queue = process_queue_.lock();
   if (!process_queue) {
     return;
@@ -103,7 +103,7 @@ void 
AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds
   auto client_instance = process_queue->getClientManager();
   std::weak_ptr<AsyncReceiveMessageCallback> 
receive_callback_weak_ptr(shared_from_this());
 
-  auto task = [receive_callback_weak_ptr, &attempt_id]() {
+  auto task = [receive_callback_weak_ptr, attempt_id]() {
     auto async_receive_ptr = receive_callback_weak_ptr.lock();
     if (async_receive_ptr) {
       async_receive_ptr->checkThrottleThenReceive(attempt_id);
@@ -114,7 +114,7 @@ void 
AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds
       task, RECEIVE_LATER_TASK_NAME, delay, std::chrono::seconds(0));
 }
 
-void AsyncReceiveMessageCallback::receiveMessageImmediately(std::string& 
attempt_id) {
+void AsyncReceiveMessageCallback::receiveMessageImmediately(const std::string& 
attempt_id) {
   auto process_queue_shared_ptr = process_queue_.lock();
   if (!process_queue_shared_ptr) {
     SPDLOG_INFO("ProcessQueue has been released. Ignore further receive 
message request-response cycles");
@@ -128,8 +128,9 @@ void 
AsyncReceiveMessageCallback::receiveMessageImmediately(std::string& attempt
     return;
   }
 
+  std::string attempt_id_copy = attempt_id;
   impl->receiveMessage(process_queue_shared_ptr->messageQueue(),
-                       process_queue_shared_ptr->getFilterExpression(), 
attempt_id);
+                       process_queue_shared_ptr->getFilterExpression(), 
attempt_id_copy);
 }
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/ProcessQueueImpl.cpp 
b/cpp/source/rocketmq/ProcessQueueImpl.cpp
index 59df8578..dc6f32d1 100644
--- a/cpp/source/rocketmq/ProcessQueueImpl.cpp
+++ b/cpp/source/rocketmq/ProcessQueueImpl.cpp
@@ -41,11 +41,11 @@ ProcessQueueImpl::ProcessQueueImpl(rmq::MessageQueue 
message_queue, FilterExpres
       invisible_time_(MixAll::millisecondsOf(MixAll::DEFAULT_INVISIBLE_TIME_)),
       simple_name_(simpleNameOf(message_queue_)), 
consumer_(std::move(consumer)),
       client_manager_(std::move(client_instance)), 
cached_message_quantity_(0), cached_message_memory_(0) {
-  SPDLOG_DEBUG("Created ProcessQueue={}", simpleName());
+  SPDLOG_DEBUG("Created ProcessQueue={}", simple_name_);
 }
 
 ProcessQueueImpl::~ProcessQueueImpl() {
-  SPDLOG_INFO("ProcessQueue={} should have been re-balanced away, thus, is 
destructed", simpleName());
+  SPDLOG_INFO("ProcessQueue={} should have been re-balanced away, thus, is 
destructed", simple_name_);
 }
 
 void ProcessQueueImpl::callback(std::shared_ptr<AsyncReceiveMessageCallback> 
callback) {
@@ -120,7 +120,7 @@ void ProcessQueueImpl::popMessage(std::string& attempt_id) {
 
   std::weak_ptr<AsyncReceiveMessageCallback> cb{receive_callback_};
   auto callback =
-      [cb, &attempt_id](const std::error_code& ec, const ReceiveMessageResult& 
result) {
+      [cb, attempt_id](const std::error_code& ec, const ReceiveMessageResult& 
result) {
     std::shared_ptr<AsyncReceiveMessageCallback> receive_cb = cb.lock();
     if (receive_cb) {
       receive_cb->onCompletion(ec, attempt_id, result);
diff --git a/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h 
b/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
index b19f097b..32a7a475 100644
--- a/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
+++ b/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
@@ -29,11 +29,11 @@ class AsyncReceiveMessageCallback : public 
std::enable_shared_from_this<AsyncRec
 public:
   explicit AsyncReceiveMessageCallback(std::weak_ptr<ProcessQueue> 
process_queue);
 
-  void onCompletion(const std::error_code& ec, std::string& attempt_id, const 
ReceiveMessageResult& result);
+  void onCompletion(const std::error_code& ec, const std::string& attempt_id, 
const ReceiveMessageResult& result);
 
-  void receiveMessageLater(std::chrono::milliseconds delay, std::string& 
attempt_id);
+  void receiveMessageLater(std::chrono::milliseconds delay, const std::string& 
attempt_id);
 
-  void receiveMessageImmediately(std::string& attempt_id);
+  void receiveMessageImmediately(const std::string& attempt_id);
 
 private:
   /**
@@ -44,7 +44,7 @@ private:
 
   std::function<void(std::string)> receive_message_later_;
 
-  void checkThrottleThenReceive(std::string attempt_id);
+  void checkThrottleThenReceive(const std::string& attempt_id);
 
   static const char* RECEIVE_LATER_TASK_NAME;
 };

Reply via email to