This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new a9bb6ffb [ISSUE #1035] [C++] Supports configuring the number of client 
threads (#1155)
a9bb6ffb is described below

commit a9bb6ffb3ee85d4d0b9a23a2f22a80d4000cd71c
Author: lizhimins <[email protected]>
AuthorDate: Wed Dec 24 10:04:05 2025 +0800

    [ISSUE #1035] [C++] Supports configuring the number of client threads 
(#1155)
---
 cpp/include/rocketmq/Configuration.h          | 7 +++++++
 cpp/include/rocketmq/Producer.h               | 2 +-
 cpp/source/base/Configuration.cpp             | 5 +++++
 cpp/source/base/ThreadPoolImpl.cpp            | 3 +--
 cpp/source/client/ClientManagerImpl.cpp       | 7 +++----
 cpp/source/client/include/ClientConfig.h      | 1 +
 cpp/source/client/include/ClientManagerImpl.h | 2 +-
 cpp/source/rocketmq/ClientImpl.cpp            | 9 ++++++++-
 cpp/source/rocketmq/Producer.cpp              | 5 +++--
 cpp/source/rocketmq/PushConsumer.cpp          | 3 ++-
 cpp/source/rocketmq/SimpleConsumer.cpp        | 1 +
 cpp/source/rocketmq/include/ClientImpl.h      | 4 ++++
 cpp/source/scheduler/SchedulerImpl.cpp        | 5 +----
 13 files changed, 38 insertions(+), 16 deletions(-)

diff --git a/cpp/include/rocketmq/Configuration.h 
b/cpp/include/rocketmq/Configuration.h
index a653c87a..b748ae24 100644
--- a/cpp/include/rocketmq/Configuration.h
+++ b/cpp/include/rocketmq/Configuration.h
@@ -51,6 +51,10 @@ public:
     return tls_;
   }
 
+  std::uint32_t callbackThreads() const {
+    return callback_threads_;
+  }
+
 protected:
   friend class ConfigurationBuilder;
 
@@ -62,6 +66,7 @@ private:
   CredentialsProviderPtr    credentials_provider_;
   std::chrono::milliseconds 
request_timeout_{ConfigurationDefaults::RequestTimeout};
   bool tls_ = true;
+  std::uint32_t callback_threads_{2};
 };
 
 class ConfigurationBuilder {
@@ -76,6 +81,8 @@ public:
 
   ConfigurationBuilder& withSsl(bool with_ssl);
 
+  ConfigurationBuilder& withCallbackThreads(std::uint32_t callback_threads);
+
   Configuration build();
 
 private:
diff --git a/cpp/include/rocketmq/Producer.h b/cpp/include/rocketmq/Producer.h
index be1026f8..e1c093c3 100644
--- a/cpp/include/rocketmq/Producer.h
+++ b/cpp/include/rocketmq/Producer.h
@@ -94,7 +94,7 @@ class ProducerBuilder {
 public:
   ProducerBuilder();
 
-  ProducerBuilder& withConfiguration(Configuration configuration);
+  ProducerBuilder& withConfiguration(const Configuration& configuration);
 
   ProducerBuilder& withTopics(const std::vector<std::string>& topics);
 
diff --git a/cpp/source/base/Configuration.cpp 
b/cpp/source/base/Configuration.cpp
index 13330261..a790f536 100644
--- a/cpp/source/base/Configuration.cpp
+++ b/cpp/source/base/Configuration.cpp
@@ -48,6 +48,11 @@ ConfigurationBuilder& ConfigurationBuilder::withSsl(bool 
with_ssl) {
   return *this;
 }
 
+ConfigurationBuilder& ConfigurationBuilder::withCallbackThreads(std::uint32_t 
callback_threads){
+  configuration_.callback_threads_ = callback_threads;
+  return *this;
+}
+
 Configuration ConfigurationBuilder::build() {
   return std::move(configuration_);
 }
diff --git a/cpp/source/base/ThreadPoolImpl.cpp 
b/cpp/source/base/ThreadPoolImpl.cpp
index f07ba8d7..f19ee1d9 100644
--- a/cpp/source/base/ThreadPoolImpl.cpp
+++ b/cpp/source/base/ThreadPoolImpl.cpp
@@ -22,8 +22,6 @@
 #include "rocketmq/RocketMQ.h"
 #include "rocketmq/State.h"
 #include "spdlog/spdlog.h"
-#include <atomic>
-#include <cstdint>
 #include <exception>
 #include <system_error>
 
@@ -33,6 +31,7 @@ ThreadPoolImpl::ThreadPoolImpl(std::uint16_t workers)
     : work_guard_(
           
absl::make_unique<asio::executor_work_guard<asio::io_context::executor_type>>(context_.get_executor())),
       workers_(workers) {
+    SPDLOG_INFO("ThreadPoolImpl created worker threads {}", workers);
 }
 
 void ThreadPoolImpl::start() {
diff --git a/cpp/source/client/ClientManagerImpl.cpp 
b/cpp/source/client/ClientManagerImpl.cpp
index adcd0708..9c6b7a0e 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -32,7 +32,6 @@
 #include "ReceiveMessageContext.h"
 #include "RpcClient.h"
 #include "RpcClientImpl.h"
-#include "Scheduler.h"
 #include "SchedulerImpl.h"
 #include "UtilAll.h"
 #include "google/protobuf/util/time_util.h"
@@ -42,11 +41,11 @@
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool 
with_ssl)
-    : scheduler_(std::make_shared<SchedulerImpl>()),
+ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool 
with_ssl, int thread_count)
+    : scheduler_(std::make_shared<SchedulerImpl>(2)),
       resource_namespace_(std::move(resource_namespace)),
       state_(State::CREATED),
-      
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
+      callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(thread_count)),
       with_ssl_(with_ssl) {
 
   certificate_verifier_ = 
grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>();
diff --git a/cpp/source/client/include/ClientConfig.h 
b/cpp/source/client/include/ClientConfig.h
index 542cd116..dddcc4be 100644
--- a/cpp/source/client/include/ClientConfig.h
+++ b/cpp/source/client/include/ClientConfig.h
@@ -62,6 +62,7 @@ struct ClientConfig {
   SubscriberConfig subscriber;
   Metric metric;
   bool withSsl;
+  std::uint32_t callback_threads{2};
   std::unique_ptr<opencensus::trace::Sampler> sampler_;
 };
 
diff --git a/cpp/source/client/include/ClientManagerImpl.h 
b/cpp/source/client/include/ClientManagerImpl.h
index c6e60064..08b5afa8 100644
--- a/cpp/source/client/include/ClientManagerImpl.h
+++ b/cpp/source/client/include/ClientManagerImpl.h
@@ -48,7 +48,7 @@ public:
    * effectively.
    * @param resource_namespace Abstract resource namespace, in which this 
client manager lives.
    */
-  explicit ClientManagerImpl(std::string resource_namespace, bool with_ssl = 
true);
+  explicit ClientManagerImpl(std::string resource_namespace, bool with_ssl = 
true, int thread_count = 1);
 
   ~ClientManagerImpl() override;
 
diff --git a/cpp/source/rocketmq/ClientImpl.cpp 
b/cpp/source/rocketmq/ClientImpl.cpp
index a5cfae8d..19b4e460 100644
--- a/cpp/source/rocketmq/ClientImpl.cpp
+++ b/cpp/source/rocketmq/ClientImpl.cpp
@@ -108,7 +108,9 @@ void ClientImpl::start() {
   client_config_.client_id = clientId();
   if (!client_manager_) {
     client_manager_ = std::make_shared<ClientManagerImpl>(
-        client_config_.resource_namespace, client_config_.withSsl);
+        client_config_.resource_namespace,
+        client_config_.withSsl,
+        client_config_.callback_threads);
     client_manager_->start();
   }
 
@@ -118,6 +120,11 @@ void ClientImpl::start() {
     return;
   }
 
+  // A gRPC I/O thread pool is created upon establishing a connection.
+  //   - https://github.com/grpc/grpc/issues/28642
+  //   - https://github.com/grpc/grpc/pull/31662
+  // The source code initializes the number of I/O threads as follows:
+  //   int num_io_threads = grpc_core::Clamp(gpr_cpu_num_cores() / 2, 2u, 16u);
   while (true) {
     createSession(endpoint, false);
     {
diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp
index 916c47a4..ebccf60e 100644
--- a/cpp/source/rocketmq/Producer.cpp
+++ b/cpp/source/rocketmq/Producer.cpp
@@ -76,14 +76,15 @@ ProducerBuilder Producer::newBuilder() {
   return {};
 }
 
-ProducerBuilder::ProducerBuilder() : impl_(std::make_shared<ProducerImpl>()){};
+ProducerBuilder::ProducerBuilder() : impl_(std::make_shared<ProducerImpl>()){}
 
-ProducerBuilder& ProducerBuilder::withConfiguration(Configuration 
configuration) {
+ProducerBuilder& ProducerBuilder::withConfiguration(const Configuration& 
configuration) {
   auto name_server_resolver = 
std::make_shared<StaticNameServerResolver>(configuration.endpoints());
   impl_->withNameServerResolver(std::move(name_server_resolver));
   impl_->withResourceNamespace(configuration.resourceNamespace());
   impl_->withCredentialsProvider(configuration.credentialsProvider());
   impl_->withRequestTimeout(configuration.requestTimeout());
+  impl_->withCallbackThreads(configuration.callbackThreads());
   impl_->withSsl(configuration.withSsl());
   return *this;
 }
diff --git a/cpp/source/rocketmq/PushConsumer.cpp 
b/cpp/source/rocketmq/PushConsumer.cpp
index a2f22803..a6dd3d35 100644
--- a/cpp/source/rocketmq/PushConsumer.cpp
+++ b/cpp/source/rocketmq/PushConsumer.cpp
@@ -44,10 +44,11 @@ PushConsumer PushConsumerBuilder::build() {
   impl->consumeThreadPoolSize(consume_thread_);
   
impl->withNameServerResolver(std::make_shared<StaticNameServerResolver>(configuration_.endpoints()));
   impl->withResourceNamespace(configuration_.resourceNamespace());
-  impl->withSsl(configuration_.withSsl());
   impl->withCredentialsProvider(configuration_.credentialsProvider());
   impl->withRequestTimeout(configuration_.requestTimeout());
   impl->withFifoConsumeAccelerator(fifo_consume_accelerator_);
+  impl->withCallbackThreads(configuration_.callbackThreads());
+  impl->withSsl(configuration_.withSsl());
   impl->start();
   return PushConsumer(impl);
 }
diff --git a/cpp/source/rocketmq/SimpleConsumer.cpp 
b/cpp/source/rocketmq/SimpleConsumer.cpp
index f321247f..152c0537 100644
--- a/cpp/source/rocketmq/SimpleConsumer.cpp
+++ b/cpp/source/rocketmq/SimpleConsumer.cpp
@@ -133,6 +133,7 @@ SimpleConsumer SimpleConsumerBuilder::build() {
   
simple_consumer.impl_->withResourceNamespace(configuration_.resourceNamespace());
   
simple_consumer.impl_->withCredentialsProvider(configuration_.credentialsProvider());
   simple_consumer.impl_->withReceiveMessageTimeout(await_duration_);
+  simple_consumer.impl_->withCallbackThreads(configuration_.callbackThreads());
   simple_consumer.impl_->withSsl(configuration_.withSsl());
 
   for (const auto& entry : subscriptions_) {
diff --git a/cpp/source/rocketmq/include/ClientImpl.h 
b/cpp/source/rocketmq/include/ClientImpl.h
index 96865399..01967384 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -102,6 +102,10 @@ public:
     client_config_.withSsl = with_ssl;
   }
 
+  void withCallbackThreads(std::uint32_t callback_threads) {
+    client_config_.callback_threads = callback_threads;
+  }
+
   void withFifoConsumeAccelerator(bool fifo_consume_accelerator) {
     client_config_.subscriber.fifo_consume_accelerator = 
fifo_consume_accelerator;
   }
diff --git a/cpp/source/scheduler/SchedulerImpl.cpp 
b/cpp/source/scheduler/SchedulerImpl.cpp
index 1c036df2..cb05718b 100644
--- a/cpp/source/scheduler/SchedulerImpl.cpp
+++ b/cpp/source/scheduler/SchedulerImpl.cpp
@@ -16,10 +16,7 @@
  */
 #include "SchedulerImpl.h"
 
-#include <atomic>
 #include <cassert>
-#include <cstdint>
-#include <cstdlib>
 #include <exception>
 #include <functional>
 #include <memory>
@@ -27,7 +24,6 @@
 #include <thread>
 
 #include "absl/memory/memory.h"
-#include "asio/error_code.hpp"
 #include "asio/executor_work_guard.hpp"
 #include "asio/io_context.hpp"
 #include "asio/steady_timer.hpp"
@@ -39,6 +35,7 @@ SchedulerImpl::SchedulerImpl(std::uint32_t worker_num)
     : work_guard_(
           
absl::make_unique<asio::executor_work_guard<asio::io_context::executor_type>>(context_.get_executor())),
       worker_num_(worker_num) {
+    SPDLOG_INFO("SchedulerImpl created worker thread {}", worker_num);
 }
 
 SchedulerImpl::SchedulerImpl() : 
SchedulerImpl(std::thread::hardware_concurrency()) {

Reply via email to