This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new da39ef79 fix #521 sync state of consume-message-service according to 
start/shutdown lifecycle management (#618)
da39ef79 is described below

commit da39ef79838d56a1880083ce8e3fe7bd6660b94f
Author: Zhanhui Li <[email protected]>
AuthorDate: Sun Oct 8 20:42:44 2023 +0800

    fix #521 sync state of consume-message-service according to start/shutdown 
lifecycle management (#618)
    
    Signed-off-by: Li Zhanhui <[email protected]>
---
 cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp  | 16 ++++++---
 .../rocketmq/include/ConsumeMessageServiceImpl.h   | 41 +++++++++++++---------
 cpp/source/rocketmq/tests/BUILD.bazel              |  8 +++++
 .../rocketmq/tests/ConsumeMessageServiceTest.cpp   | 38 ++++++++++++++++++++
 4 files changed, 83 insertions(+), 20 deletions(-)

diff --git a/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp 
b/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
index 11e14ce3..2b5e7c70 100644
--- a/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
+++ b/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
@@ -16,14 +16,16 @@
  */
 #include "ConsumeMessageServiceImpl.h"
 
+#include <atomic>
+
 #include "ConsumeStats.h"
 #include "ConsumeTask.h"
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
 #include "PushConsumerImpl.h"
 #include "Tag.h"
 #include "ThreadPoolImpl.h"
 #include "rocketmq/ErrorCode.h"
+#include "rocketmq/Logger.h"
+#include "spdlog/spdlog.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -41,16 +43,22 @@ void ConsumeMessageServiceImpl::start() {
   State expected = State::CREATED;
   if (state_.compare_exchange_strong(expected, State::STARTING, 
std::memory_order_relaxed)) {
     pool_->start();
+    state_.store(State::STARTED, std::memory_order_relaxed);
   }
 }
 
 void ConsumeMessageServiceImpl::shutdown() {
-  State expected = State::STOPPING;
-  if (state_.compare_exchange_strong(expected, State::STOPPED, 
std::memory_order_relaxed)) {
+  State expected = State::STARTED;
+  if (state_.compare_exchange_strong(expected, State::STOPPING, 
std::memory_order_relaxed)) {
     pool_->shutdown();
+    state_.store(State::STOPPED, std::memory_order_relaxed);
   }
 }
 
+State ConsumeMessageServiceImpl::state() const {
+  return state_.load(std::memory_order_relaxed);
+}
+
 void ConsumeMessageServiceImpl::dispatch(std::shared_ptr<ProcessQueue> 
process_queue,
                                          std::vector<MessageConstSharedPtr> 
messages) {
   auto consumer = consumer_.lock();
diff --git a/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h 
b/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h
index 91979d22..f2f54bbd 100644
--- a/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h
+++ b/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h
@@ -30,49 +30,58 @@ ROCKETMQ_NAMESPACE_BEGIN
 
 class PushConsumerImpl;
 
-class ConsumeMessageServiceImpl : public ConsumeMessageService,
-                                  public 
std::enable_shared_from_this<ConsumeMessageServiceImpl> {
+class ConsumeMessageServiceImpl
+    : public ConsumeMessageService,
+      public std::enable_shared_from_this<ConsumeMessageServiceImpl> {
 public:
   ConsumeMessageServiceImpl(std::weak_ptr<PushConsumerImpl> consumer,
-                            int thread_count,
-                            MessageListener message_listener);
+                            int thread_count, MessageListener 
message_listener);
 
   ~ConsumeMessageServiceImpl() override = default;
 
   /**
    * Make it noncopyable.
    */
-  ConsumeMessageServiceImpl(const ConsumeMessageServiceImpl& other) = delete;
-  ConsumeMessageServiceImpl& operator=(const ConsumeMessageServiceImpl& other) 
= delete;
+  ConsumeMessageServiceImpl(const ConsumeMessageServiceImpl &other) = delete;
+  ConsumeMessageServiceImpl &
+  operator=(const ConsumeMessageServiceImpl &other) = delete;
 
   void start() override;
 
   void shutdown() override;
 
-  MessageListener& listener() override {
-    return message_listener_;
-  }
+  MessageListener &listener() override { return message_listener_; }
 
-  bool preHandle(const Message& message) override;
+  bool preHandle(const Message &message) override;
 
-  bool postHandle(const Message& message, ConsumeResult result) override;
+  bool postHandle(const Message &message, ConsumeResult result) override;
 
   void submit(std::shared_ptr<ConsumeTask> task) override;
 
-  void dispatch(std::shared_ptr<ProcessQueue> process_queue, 
std::vector<MessageConstSharedPtr> messages) override;
+  void dispatch(std::shared_ptr<ProcessQueue> process_queue,
+                std::vector<MessageConstSharedPtr> messages) override;
 
-  void ack(const Message& message, std::function<void(const std::error_code&)> 
cb) override;
+  void ack(const Message &message,
+           std::function<void(const std::error_code &)> cb) override;
 
-  void nack(const Message& message, std::function<void(const 
std::error_code&)> cb) override;
+  void nack(const Message &message,
+            std::function<void(const std::error_code &)> cb) override;
 
-  void forward(const Message& message, std::function<void(const 
std::error_code&)> cb) override;
+  void forward(const Message &message,
+               std::function<void(const std::error_code &)> cb) override;
 
-  void schedule(std::shared_ptr<ConsumeTask> task, std::chrono::milliseconds 
delay) override;
+  void schedule(std::shared_ptr<ConsumeTask> task,
+                std::chrono::milliseconds delay) override;
 
   std::size_t maxDeliveryAttempt() override;
 
   std::weak_ptr<PushConsumerImpl> consumer() override;
 
+  /**
+   * Current state of the consume message service.
+   */
+  State state() const;
+
 protected:
   std::atomic<State> state_;
 
diff --git a/cpp/source/rocketmq/tests/BUILD.bazel 
b/cpp/source/rocketmq/tests/BUILD.bazel
index 99f2fb2a..a8d10e92 100644
--- a/cpp/source/rocketmq/tests/BUILD.bazel
+++ b/cpp/source/rocketmq/tests/BUILD.bazel
@@ -51,4 +51,12 @@ cc_test(
         "StaticNameServerResolverTest.cpp",
     ],
     deps = base_deps,
+)
+
+cc_test(
+    name = "consume_message_service_test",
+    srcs = [
+        "ConsumeMessageServiceTest.cpp",
+    ],
+    deps = base_deps,
 )
\ No newline at end of file
diff --git a/cpp/source/rocketmq/tests/ConsumeMessageServiceTest.cpp 
b/cpp/source/rocketmq/tests/ConsumeMessageServiceTest.cpp
new file mode 100644
index 00000000..0b858271
--- /dev/null
+++ b/cpp/source/rocketmq/tests/ConsumeMessageServiceTest.cpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+
+#include "ConsumeMessageServiceImpl.h"
+#include "PushConsumerImpl.h"
+#include "gtest/gtest.h"
+#include "rocketmq/ConsumeResult.h"
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+TEST(ConsumeMessageServiceTest, testLifecycle) {
+  auto listener = [](const Message&) { return ConsumeResult::SUCCESS; };
+  std::weak_ptr<PushConsumerImpl> consumer;
+  auto svc = std::make_shared<ConsumeMessageServiceImpl>(consumer, 2, 
listener);
+  svc->start();
+  ASSERT_EQ(State::STARTED, svc->state());
+
+  svc->shutdown();
+  ASSERT_EQ(State::STOPPED, svc->state());
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file

Reply via email to