This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch cpp/visibility-timeout
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 5d111fa4eca0aa87b52762fd8586b883e76c4f15
Author: lizhimins <[email protected]>
AuthorDate: Fri Jul 11 10:14:35 2025 +0800

    [ISSUE #1035] [C++] Support continuous visibility timeout adjustment for 
SimpleConsumer
---
 cpp/examples/ExampleSimpleConsumer.cpp           | 34 +++++++++++++++---------
 cpp/include/rocketmq/SimpleConsumer.h            |  6 ++---
 cpp/source/client/ClientManagerImpl.cpp          |  7 ++---
 cpp/source/client/include/ClientManager.h        |  2 +-
 cpp/source/client/include/ClientManagerImpl.h    |  2 +-
 cpp/source/rocketmq/PushConsumerImpl.cpp         |  7 ++++-
 cpp/source/rocketmq/SimpleConsumer.cpp           | 13 +++++----
 cpp/source/rocketmq/SimpleConsumerImpl.cpp       | 15 ++++++++---
 cpp/source/rocketmq/include/SimpleConsumerImpl.h |  4 +--
 9 files changed, 58 insertions(+), 32 deletions(-)

diff --git a/cpp/examples/ExampleSimpleConsumer.cpp 
b/cpp/examples/ExampleSimpleConsumer.cpp
index 2d93d239..d89d0b13 100644
--- a/cpp/examples/ExampleSimpleConsumer.cpp
+++ b/cpp/examples/ExampleSimpleConsumer.cpp
@@ -21,6 +21,7 @@
 #include "rocketmq/ErrorCode.h"
 #include "rocketmq/Logger.h"
 #include "rocketmq/SimpleConsumer.h"
+#include "spdlog/spdlog.h"
 
 using namespace ROCKETMQ_NAMESPACE;
 
@@ -58,34 +59,43 @@ int main(int argc, char* argv[]) {
                              .subscribe(FLAGS_topic, tag)
                              .withAwaitDuration(std::chrono::seconds(10))
                              .build();
+  std::size_t total = 0;
 
   // Should use while (true) instead
   for (int j = 0; j < 30; j++) {
     std::vector<MessageConstSharedPtr> messages;
     std::error_code ec;
     simple_consumer.receive(4, std::chrono::seconds(15), ec, messages);
+
     if (ec) {
       std::cerr << "Failed to receive messages. Cause: " << ec.message() << 
std::endl;
+    } else {
+      std::cout << "Received " << messages.size() << " messages" << std::endl;
     }
 
-    std::cout << "Received " << messages.size() << " messages" << std::endl;
-    std::size_t i = 0;
-
     for (const auto& message : messages) {
-      std::cout << "Received a message[topic=" << message->topic()
-                << ", message-id=" << message->id()
-                << ", receipt-handle='" << message->extension().receipt_handle
-                << "']" << std::endl;
+      std::string receipt_handle = message->extension().receipt_handle;
+      SPDLOG_INFO("Receive message, topic={}, message-id={}, 
receipt-handle={}]", message->topic(), message->id(), receipt_handle);
 
-      if (++i % 2 == 0) {
+      if (total++ % 2 == 0) {
+        // Consume message successfully then ack it
         simple_consumer.ack(*message, ec);
         if (ec) {
-          std::cerr << "Failed to ack message. Cause: " << ec.message() << 
std::endl;
+          SPDLOG_ERROR("Failed to ack message. Cause: {}", ec.message());
+        } else {
+          SPDLOG_INFO("Ack message, topic={}, message-id={}, 
receipt-handle={}]", message->topic(), message->id(), receipt_handle);
         }
       } else {
-        simple_consumer.changeInvisibleDuration(*message, 
std::chrono::seconds(3), ec);
-        if (ec) {
-          std::cerr << "Failed to change invisible duration of message. Cause: 
" << ec.message() << std::endl;
+        // Extend the message consumption time by modifying the invisible 
duration API
+        for (int k = 0; k < 3; k++) {
+          simple_consumer.changeInvisibleDuration(
+              *message, receipt_handle, std::chrono::seconds(15), ec);
+          if (ec) {
+            SPDLOG_WARN("Failed to change invisible duration of message. 
Cause: ", ec.message());
+          } else {
+            SPDLOG_INFO("Change invisible duration, topic={}, message-id={}, 
times={}, receipt-handle={}]",
+                        message->topic(), message->id(), k, receipt_handle);
+          }
         }
       }
     }
diff --git a/cpp/include/rocketmq/SimpleConsumer.h 
b/cpp/include/rocketmq/SimpleConsumer.h
index cb489c91..dff1158a 100644
--- a/cpp/include/rocketmq/SimpleConsumer.h
+++ b/cpp/include/rocketmq/SimpleConsumer.h
@@ -35,7 +35,7 @@ using ReceiveCallback = std::function<void(const 
std::error_code&, const std::ve
 
 using AckCallback = std::function<void(const std::error_code&)>;
 
-using ChangeInvisibleDurationCallback = std::function<void(const 
std::error_code&)>;
+using ChangeInvisibleDurationCallback = std::function<void(const 
std::error_code&, std::string& receipt_handle)>;
 
 class SimpleConsumerImpl;
 
@@ -60,9 +60,9 @@ public:
 
   void asyncAck(const Message& message, AckCallback callback);
 
-  void changeInvisibleDuration(const Message& message, 
std::chrono::milliseconds duration, std::error_code& ec);
+  void changeInvisibleDuration(const Message& message, std::string& 
receipt_handle, std::chrono::milliseconds duration, std::error_code& ec);
 
-  void asyncChangeInvisibleDuration(const Message& message,
+  void asyncChangeInvisibleDuration(const Message& message, std::string& 
receipt_handle,
                                     std::chrono::milliseconds duration,
                                     ChangeInvisibleDurationCallback callback);
 
diff --git a/cpp/source/client/ClientManagerImpl.cpp 
b/cpp/source/client/ClientManagerImpl.cpp
index 7c3498e9..adcd0708 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -1100,7 +1100,8 @@ void ClientManagerImpl::changeInvisibleDuration(
     const Metadata& metadata,
     const ChangeInvisibleDurationRequest& request,
     std::chrono::milliseconds timeout,
-    const std::function<void(const std::error_code&)>& completion_callback) {
+    const std::function<void(const std::error_code&, const 
ChangeInvisibleDurationResponse&)>& completion_callback) {
+
   RpcClientSharedPtr client = getRpcClient(target_host);
   assert(client);
   auto invocation_context = new 
InvocationContext<ChangeInvisibleDurationResponse>();
@@ -1118,7 +1119,7 @@ void ClientManagerImpl::changeInvisibleDuration(
       SPDLOG_WARN("Failed to write Nack request to wire. gRPC-code: {}, 
gRPC-message: {}",
                   invocation_context->status.error_code(), 
invocation_context->status.error_message());
       std::error_code ec = ErrorCode::RequestTimeout;
-      completion_callback(ec);
+      completion_callback(ec, invocation_context->response);
       return;
     }
 
@@ -1185,7 +1186,7 @@ void ClientManagerImpl::changeInvisibleDuration(
         break;
       }
     }
-    completion_callback(ec);
+    completion_callback(ec, invocation_context->response);
   };
   invocation_context->callback = callback;
   client->asyncChangeInvisibleDuration(request, invocation_context);
diff --git a/cpp/source/client/include/ClientManager.h 
b/cpp/source/client/include/ClientManager.h
index c67c6ea8..d6e20ace 100644
--- a/cpp/source/client/include/ClientManager.h
+++ b/cpp/source/client/include/ClientManager.h
@@ -70,7 +70,7 @@ public:
 
   virtual void changeInvisibleDuration(const std::string& target_host, const 
Metadata& metadata,
                                        const ChangeInvisibleDurationRequest&, 
std::chrono::milliseconds timeout,
-                                       const std::function<void(const 
std::error_code&)>&) = 0;
+                                       const std::function<void(const 
std::error_code&, const ChangeInvisibleDurationResponse&)>&) = 0;
 
   virtual void forwardMessageToDeadLetterQueue(
       const std::string& target_host, const Metadata& metadata, const 
ForwardMessageToDeadLetterQueueRequest& request,
diff --git a/cpp/source/client/include/ClientManagerImpl.h 
b/cpp/source/client/include/ClientManagerImpl.h
index cd862154..c6e60064 100644
--- a/cpp/source/client/include/ClientManagerImpl.h
+++ b/cpp/source/client/include/ClientManagerImpl.h
@@ -148,7 +148,7 @@ public:
                                const Metadata& metadata,
                                const ChangeInvisibleDurationRequest&,
                                std::chrono::milliseconds timeout,
-                               const std::function<void(const 
std::error_code&)>&) override;
+                               const std::function<void(const 
std::error_code&, const ChangeInvisibleDurationResponse&)>&) override;
 
   void forwardMessageToDeadLetterQueue(const std::string& target_host,
                                        const Metadata& metadata,
diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp 
b/cpp/source/rocketmq/PushConsumerImpl.cpp
index 4ba038b2..45f40541 100644
--- a/cpp/source/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/source/rocketmq/PushConsumerImpl.cpp
@@ -413,8 +413,13 @@ void PushConsumerImpl::nack(const Message& message, const 
std::function<void(con
   request.set_message_id(message.id());
   request.mutable_invisible_duration()->CopyFrom(
       
google::protobuf::util::TimeUtil::MillisecondsToDuration(duration.count()));
+
+  auto cb =
+      [callback](const std::error_code& ec, const 
ChangeInvisibleDurationResponse& response) {
+        callback(ec);
+      };
   client_manager_->changeInvisibleDuration(target_host, metadata, request,
-                                           
absl::ToChronoMilliseconds(client_config_.request_timeout), callback);
+                                           
absl::ToChronoMilliseconds(client_config_.request_timeout), cb);
 }
 
 void PushConsumerImpl::forwardToDeadLetterQueue(const Message& message,
diff --git a/cpp/source/rocketmq/SimpleConsumer.cpp 
b/cpp/source/rocketmq/SimpleConsumer.cpp
index 8acf16ac..a6a834a6 100644
--- a/cpp/source/rocketmq/SimpleConsumer.cpp
+++ b/cpp/source/rocketmq/SimpleConsumer.cpp
@@ -89,22 +89,25 @@ void SimpleConsumer::asyncAck(const Message& message, 
AckCallback callback) {
   impl_->ackAsync(message, callback);
 }
 
-void SimpleConsumer::changeInvisibleDuration(const Message& message,
+void SimpleConsumer::changeInvisibleDuration(const Message& message, 
std::string& receipt_handle,
                                              std::chrono::milliseconds 
duration,
                                              std::error_code& ec) {
   auto mtx = std::make_shared<absl::Mutex>();
   auto cv = std::make_shared<absl::CondVar>();
   bool completed = false;
-  auto callback = [&, mtx, cv](const std::error_code& code) {
+
+  auto callback =
+      [&, mtx, cv](const std::error_code& code, std::string& 
server_receipt_handle) {
     {
       absl::MutexLock lk(mtx.get());
       completed = true;
+      receipt_handle = server_receipt_handle;
       ec = code;
     }
     cv->Signal();
   };
 
-  impl_->changeInvisibleDuration(message, duration, callback);
+  impl_->changeInvisibleDuration(message, receipt_handle, duration, callback);
 
   {
     absl::MutexLock lk(mtx.get());
@@ -114,10 +117,10 @@ void SimpleConsumer::changeInvisibleDuration(const 
Message& message,
   }
 }
 
-void SimpleConsumer::asyncChangeInvisibleDuration(const Message& message,
+void SimpleConsumer::asyncChangeInvisibleDuration(const Message& message, 
std::string& receipt_handle,
                                                   std::chrono::milliseconds 
duration,
                                                   
ChangeInvisibleDurationCallback callback) {
-  impl_->changeInvisibleDuration(message, duration, callback);
+  impl_->changeInvisibleDuration(message, receipt_handle, duration, callback);
 }
 
 SimpleConsumer SimpleConsumerBuilder::build() {
diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp 
b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
index 2441bb2d..25803429 100644
--- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
+++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
@@ -409,9 +409,9 @@ void SimpleConsumerImpl::ackAsync(const Message& message, 
AckCallback callback)
                  absl::ToChronoMilliseconds(client_config_.request_timeout), 
callback);
 }
 
-void SimpleConsumerImpl::changeInvisibleDuration(const Message& message,
+void SimpleConsumerImpl::changeInvisibleDuration(const Message& message, 
std::string& receipt_handle,
                                                  std::chrono::milliseconds 
duration,
-                                                 
ChangeInvisibleDurationCallback callback) {
+                                                 const 
ChangeInvisibleDurationCallback callback) {
   Metadata metadata;
   Signature::sign(client_config_, metadata);
 
@@ -420,11 +420,18 @@ void SimpleConsumerImpl::changeInvisibleDuration(const 
Message& message,
   request.mutable_topic()->set_resource_namespace(resourceNamespace());
   request.mutable_topic()->set_name(message.topic());
   request.set_message_id(message.id());
-  request.set_receipt_handle(message.extension().receipt_handle);
+  request.set_receipt_handle(receipt_handle);
   auto d = 
google::protobuf::util::TimeUtil::MillisecondsToDuration(duration.count());
   request.mutable_invisible_duration()->CopyFrom(d);
 
-  manager()->changeInvisibleDuration(message.extension().target_endpoint, 
metadata, request, duration, callback);
+  auto cb =
+      [callback](const std::error_code& ec, const 
ChangeInvisibleDurationResponse& response) {
+    std::string server_receipt_handle = response.receipt_handle();
+    callback(ec, server_receipt_handle);
+  };
+
+  manager()->changeInvisibleDuration(
+      message.extension().target_endpoint, metadata, request, duration, cb);
 }
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h 
b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
index 39e6523e..0bcb7fca 100644
--- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
+++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
@@ -53,9 +53,9 @@ public:
 
   void ackAsync(const Message& message, AckCallback callback);
 
-  void changeInvisibleDuration(const Message& message,
+  void changeInvisibleDuration(const Message& message, std::string& 
receipt_handle,
                                std::chrono::milliseconds duration,
-                               ChangeInvisibleDurationCallback callback);
+                               const ChangeInvisibleDurationCallback callback);
 
   void withReceiveMessageTimeout(std::chrono::milliseconds receive_timeout) {
     long_polling_duration_ = receive_timeout;

Reply via email to