wwbmmm commented on code in PR #2358: URL: https://github.com/apache/brpc/pull/2358#discussion_r1364810161
########## example/bthread_tag_echo_c++/cert.pem: ########## @@ -0,0 +1,26 @@ +-----BEGIN CERTIFICATE----- +MIIEUTCCAzmgAwIBAgIBADANBgkqhkiG9w0BAQQFADB9MQswCQYDVQQGEwJDTjER Review Comment: 这个不需要吧 ########## example/bthread_tag_echo_c++/CMakeLists.txt: ########## @@ -0,0 +1,150 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +cmake_minimum_required(VERSION 2.8.10) +project(multi_threaded_echo_c++ C CXX) Review Comment: 名字改一下 ########## src/brpc/acceptor.h: ########## @@ -112,6 +112,9 @@ friend class Server; // Whether to use rdma or not bool _use_rdma; + + // Acceptor belongs to this tag + bthread_tag_t _tag; Review Comment: 命名: _bthread_tag ########## src/brpc/socket.h: ########## @@ -750,6 +752,7 @@ friend void DereferenceSocket(Socket*); // [ Set in ResetFileDescriptor ] butil::atomic<int> _fd; // -1 when not connected. + bthread_tag_t _tag; // tag of this socket Review Comment: 命名: _bthread_tag ########## example/bthread_tag_echo_c++/client.cpp: ########## @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A client sending requests to server by multiple threads. + +#include <gflags/gflags.h> +#include <bthread/bthread.h> +#include <butil/logging.h> +#include <brpc/server.h> +#include <brpc/channel.h> +#include "echo.pb.h" +#include <bvar/bvar.h> + +DEFINE_int32(thread_num, 50, "Number of threads to send requests"); +DEFINE_bool(use_bthread, false, "Use bthread to send requests"); +DEFINE_int32(attachment_size, 0, "Carry so many byte attachment along with requests"); +DEFINE_int32(request_size, 16, "Bytes of each request"); +DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto"); +DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short"); +DEFINE_string(server, "0.0.0.0:8002", "IP Address of server"); +DEFINE_string(load_balancer, "", "The algorithm for load balancing"); +DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); +DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); +DEFINE_bool(dont_fail, false, "Print fatal when some call failed"); +DEFINE_bool(enable_ssl, false, "Use SSL connection"); +DEFINE_int32(dummy_port, -1, "Launch dummy server at this port"); +DEFINE_string(connection_group, "", "Connection group for channel"); +DEFINE_int32(bthread_tag, BTHREAD_TAG_DEFAULT, "bthread used tag"); + +std::string g_request; +std::string g_attachment; + +bvar::LatencyRecorder g_latency_recorder("client"); +bvar::Adder<int> g_error_count("client_error_count"); + +static void* sender(void* arg) { + // Normally, you should not call a Channel directly, but instead construct + // a stub Service wrapping it. stub can be shared by all threads as well. + example::EchoService_Stub stub(static_cast<google::protobuf::RpcChannel*>(arg)); + + int log_id = 0; + while (!brpc::IsAskedToQuit()) { + // We will receive response synchronously, safe to put variables + // on stack. + example::EchoRequest request; + example::EchoResponse response; + brpc::Controller cntl; + + request.set_message(g_request); + cntl.set_log_id(log_id++); // set by user + // Set attachment which is wired to network directly instead of + // being serialized into protobuf messages. + cntl.request_attachment().append(g_attachment); + + // Because `done'(last parameter) is NULL, this function waits until + // the response comes back or error occurs(including timedout). + stub.Echo(&cntl, &request, &response, NULL); + if (!cntl.Failed()) { + g_latency_recorder << cntl.latency_us(); + } else { + g_error_count << 1; + CHECK(brpc::IsAskedToQuit() || !FLAGS_dont_fail) + << "error=" << cntl.ErrorText() << " latency=" << cntl.latency_us(); + // We can't connect to the server, sleep a while. Notice that this + // is a specific sleeping to prevent this thread from spinning too + // fast. You should continue the business logic in a production + // server rather than sleeping. + bthread_usleep(50000); + } + } + return NULL; +} + +int main(int argc, char* argv[]) { + // Parse gflags. We recommend you to use gflags as well. + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + // A Channel represents a communication line to a Server. Notice that + // Channel is thread-safe and can be shared by all threads in your program. + brpc::Channel channel; + + // Initialize the channel, NULL means using default options. + brpc::ChannelOptions options; + if (FLAGS_enable_ssl) { + options.mutable_ssl_options(); + } + options.protocol = FLAGS_protocol; + options.connection_type = FLAGS_connection_type; + options.connect_timeout_ms = std::min(FLAGS_timeout_ms / 2, 100); + options.timeout_ms = FLAGS_timeout_ms; + options.max_retry = FLAGS_max_retry; + options.connection_group = FLAGS_connection_group; + if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + return -1; + } + + if (FLAGS_attachment_size > 0) { + g_attachment.resize(FLAGS_attachment_size, 'a'); + } + if (FLAGS_request_size <= 0) { + LOG(ERROR) << "Bad request_size=" << FLAGS_request_size; + return -1; + } + g_request.resize(FLAGS_request_size, 'r'); + + if (FLAGS_dummy_port >= 0) { + brpc::StartDummyServerAt(FLAGS_dummy_port); + } + + std::vector<bthread_t> bids; + std::vector<pthread_t> pids; + if (!FLAGS_use_bthread) { + pids.resize(FLAGS_thread_num); + for (int i = 0; i < FLAGS_thread_num; ++i) { + if (pthread_create(&pids[i], NULL, sender, &channel) != 0) { + LOG(ERROR) << "Fail to create pthread"; + return -1; + } + } + } else { + bids.resize(FLAGS_thread_num); + for (int i = 0; i < FLAGS_thread_num; ++i) { + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + attr.tag = FLAGS_bthread_tag; Review Comment: client不需要再指定这个了吧 ########## src/brpc/server.h: ########## @@ -262,6 +262,10 @@ struct ServerOptions { // Default: "" std::string server_info_name; + // Server will run in this tagged bthread worker group + // Default: BTHREAD_TAG_DEFAULT + bthread_tag_t tag; Review Comment: 命名: bthread_tag ########## src/bthread/task_control.cpp: ########## @@ -216,15 +271,37 @@ int TaskControl::add_workers(int num) { return _concurrency.load(butil::memory_order_relaxed) - old_concurency; } +int TaskControl::add_workers_with_tag(int num, bthread_tag_t tag) { + _add_workers_with_tag = tag; + auto rc = add_workers(num); + _add_workers_with_tag = BTHREAD_TAG_INVALID; + return rc; +} + TaskGroup* TaskControl::choose_one_group() { - const size_t ngroup = _ngroup.load(butil::memory_order_acquire); + DCHECK(tls_task_group == nullptr || tls_task_group->tag() == BTHREAD_TAG_DEFAULT) + << "There will be a performance penalty for choosing other tagged group"; + auto groups = tag_group(BTHREAD_TAG_DEFAULT); + const auto ngroup = tag_ngroup(BTHREAD_TAG_DEFAULT).load(butil::memory_order_acquire); if (ngroup != 0) { - return _groups[butil::fast_rand_less_than(ngroup)]; + return groups[butil::fast_rand_less_than(ngroup)]; } CHECK(false) << "Impossible: ngroup is 0"; return NULL; } +TaskGroup* TaskControl::choose_one_group_with_tag(bthread_tag_t tag) { Review Comment: 是不是可以把choose_one_group直接改成choose_one_group(bthread_tag_t tag = BTHREAD_TAG_DEFAULT) ########## src/brpc/socket.h: ########## @@ -218,6 +218,8 @@ struct SocketOptions { // Socket keepalive related options. // Refer to `SocketKeepaliveOptions' for details. std::shared_ptr<SocketKeepaliveOptions> keepalive_options; + // Tag of this socket + bthread_tag_t tag; Review Comment: 命名: bthread_tag ########## src/bthread/task_control.h: ########## @@ -67,32 +71,55 @@ class TaskControl { void print_rq_sizes(std::ostream& os); double get_cumulated_worker_time(); + double get_cumulated_worker_time_with_tag(bthread_tag_t tag); int64_t get_cumulated_switch_count(); int64_t get_cumulated_signal_count(); // [Not thread safe] Add more worker threads. // Return the number of workers actually added, which may be less than |num| int add_workers(int num); + int add_workers_with_tag(int num, bthread_tag_t tag); + // Choose one TaskGroup (randomly right now). // If this method is called after init(), it never returns NULL. TaskGroup* choose_one_group(); + // Choose one TaskGroup with tag + // If no tag found or tag index greater or equal ngroup will return NULL. + TaskGroup* choose_one_group_with_tag(bthread_tag_t tag); + private: + typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups; + static const int PARKING_LOT_NUM = 4; + typedef std::array<ParkingLot, PARKING_LOT_NUM> TaggedParkingLot; // Add/Remove a TaskGroup. // Returns 0 on success, -1 otherwise. - int _add_group(TaskGroup*); + int _add_group(TaskGroup*, bthread_tag_t tag); int _destroy_group(TaskGroup*); + // Tag group + TaggedGroups& tag_group(bthread_tag_t tag) { return _tagged_groups[tag]; } + + // Tag ngroup + butil::atomic<size_t>& tag_ngroup(int tag) { return _tagged_ngroup[tag]; } + + // Tag parking slot + TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _pl[tag]; } + static void delete_task_group(void* arg); static void* worker_thread(void* task_control); bvar::LatencyRecorder& exposed_pending_time(); bvar::LatencyRecorder* create_exposed_pending_time(); + bvar::Adder<int64_t>& tag_nworkers(bthread_tag_t tag); + bvar::Adder<int64_t>& tag_nbthreads(bthread_tag_t tag); butil::atomic<size_t> _ngroup; Review Comment: _ngroup和_groups还有用吗 ########## src/brpc/socket.cpp: ########## @@ -927,7 +929,6 @@ int Socket::isolated_times() const { int Socket::SetFailed(int error_code, const char* error_fmt, ...) { if (error_code == 0) { - CHECK(false) << "error_code is 0"; Review Comment: 这个为啥删了 ########## src/bthread/task_control.cpp: ########## @@ -216,15 +271,37 @@ int TaskControl::add_workers(int num) { return _concurrency.load(butil::memory_order_relaxed) - old_concurency; } +int TaskControl::add_workers_with_tag(int num, bthread_tag_t tag) { + _add_workers_with_tag = tag; + auto rc = add_workers(num); Review Comment: 是不是可以直接把add_workers方法改成 add_workers(int num, bthread_tag_t tag) ########## example/bthread_tag_echo_c++/cert.pem: ########## @@ -0,0 +1,26 @@ +-----BEGIN CERTIFICATE----- +MIIEUTCCAzmgAwIBAgIBADANBgkqhkiG9w0BAQQFADB9MQswCQYDVQQGEwJDTjER Review Comment: 这个不需要吧 ########## example/bthread_tag_echo_c++/CMakeLists.txt: ########## @@ -0,0 +1,150 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +cmake_minimum_required(VERSION 2.8.10) +project(multi_threaded_echo_c++ C CXX) Review Comment: 名字改一下 ########## src/brpc/server.h: ########## @@ -262,6 +262,10 @@ struct ServerOptions { // Default: "" std::string server_info_name; + // Server will run in this tagged bthread worker group + // Default: BTHREAD_TAG_DEFAULT + bthread_tag_t tag; Review Comment: 命名: bthread_tag ########## src/brpc/acceptor.h: ########## @@ -112,6 +112,9 @@ friend class Server; // Whether to use rdma or not bool _use_rdma; + + // Acceptor belongs to this tag + bthread_tag_t _tag; Review Comment: 命名: _bthread_tag ########## example/bthread_tag_echo_c++/client.cpp: ########## @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A client sending requests to server by multiple threads. + +#include <gflags/gflags.h> +#include <bthread/bthread.h> +#include <butil/logging.h> +#include <brpc/server.h> +#include <brpc/channel.h> +#include "echo.pb.h" +#include <bvar/bvar.h> + +DEFINE_int32(thread_num, 50, "Number of threads to send requests"); +DEFINE_bool(use_bthread, false, "Use bthread to send requests"); +DEFINE_int32(attachment_size, 0, "Carry so many byte attachment along with requests"); +DEFINE_int32(request_size, 16, "Bytes of each request"); +DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto"); +DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short"); +DEFINE_string(server, "0.0.0.0:8002", "IP Address of server"); +DEFINE_string(load_balancer, "", "The algorithm for load balancing"); +DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); +DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); +DEFINE_bool(dont_fail, false, "Print fatal when some call failed"); +DEFINE_bool(enable_ssl, false, "Use SSL connection"); +DEFINE_int32(dummy_port, -1, "Launch dummy server at this port"); +DEFINE_string(connection_group, "", "Connection group for channel"); +DEFINE_int32(bthread_tag, BTHREAD_TAG_DEFAULT, "bthread used tag"); + +std::string g_request; +std::string g_attachment; + +bvar::LatencyRecorder g_latency_recorder("client"); +bvar::Adder<int> g_error_count("client_error_count"); + +static void* sender(void* arg) { + // Normally, you should not call a Channel directly, but instead construct + // a stub Service wrapping it. stub can be shared by all threads as well. + example::EchoService_Stub stub(static_cast<google::protobuf::RpcChannel*>(arg)); + + int log_id = 0; + while (!brpc::IsAskedToQuit()) { + // We will receive response synchronously, safe to put variables + // on stack. + example::EchoRequest request; + example::EchoResponse response; + brpc::Controller cntl; + + request.set_message(g_request); + cntl.set_log_id(log_id++); // set by user + // Set attachment which is wired to network directly instead of + // being serialized into protobuf messages. + cntl.request_attachment().append(g_attachment); + + // Because `done'(last parameter) is NULL, this function waits until + // the response comes back or error occurs(including timedout). + stub.Echo(&cntl, &request, &response, NULL); + if (!cntl.Failed()) { + g_latency_recorder << cntl.latency_us(); + } else { + g_error_count << 1; + CHECK(brpc::IsAskedToQuit() || !FLAGS_dont_fail) + << "error=" << cntl.ErrorText() << " latency=" << cntl.latency_us(); + // We can't connect to the server, sleep a while. Notice that this + // is a specific sleeping to prevent this thread from spinning too + // fast. You should continue the business logic in a production + // server rather than sleeping. + bthread_usleep(50000); + } + } + return NULL; +} + +int main(int argc, char* argv[]) { + // Parse gflags. We recommend you to use gflags as well. + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + // A Channel represents a communication line to a Server. Notice that + // Channel is thread-safe and can be shared by all threads in your program. + brpc::Channel channel; + + // Initialize the channel, NULL means using default options. + brpc::ChannelOptions options; + if (FLAGS_enable_ssl) { + options.mutable_ssl_options(); + } + options.protocol = FLAGS_protocol; + options.connection_type = FLAGS_connection_type; + options.connect_timeout_ms = std::min(FLAGS_timeout_ms / 2, 100); + options.timeout_ms = FLAGS_timeout_ms; + options.max_retry = FLAGS_max_retry; + options.connection_group = FLAGS_connection_group; + if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + return -1; + } + + if (FLAGS_attachment_size > 0) { + g_attachment.resize(FLAGS_attachment_size, 'a'); + } + if (FLAGS_request_size <= 0) { + LOG(ERROR) << "Bad request_size=" << FLAGS_request_size; + return -1; + } + g_request.resize(FLAGS_request_size, 'r'); + + if (FLAGS_dummy_port >= 0) { + brpc::StartDummyServerAt(FLAGS_dummy_port); + } + + std::vector<bthread_t> bids; + std::vector<pthread_t> pids; + if (!FLAGS_use_bthread) { + pids.resize(FLAGS_thread_num); + for (int i = 0; i < FLAGS_thread_num; ++i) { + if (pthread_create(&pids[i], NULL, sender, &channel) != 0) { + LOG(ERROR) << "Fail to create pthread"; + return -1; + } + } + } else { + bids.resize(FLAGS_thread_num); + for (int i = 0; i < FLAGS_thread_num; ++i) { + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + attr.tag = FLAGS_bthread_tag; Review Comment: client不需要再指定这个了吧 ########## src/brpc/socket.cpp: ########## @@ -927,7 +929,6 @@ int Socket::isolated_times() const { int Socket::SetFailed(int error_code, const char* error_fmt, ...) { if (error_code == 0) { - CHECK(false) << "error_code is 0"; Review Comment: 这个为啥删了 ########## src/brpc/socket.h: ########## @@ -218,6 +218,8 @@ struct SocketOptions { // Socket keepalive related options. // Refer to `SocketKeepaliveOptions' for details. std::shared_ptr<SocketKeepaliveOptions> keepalive_options; + // Tag of this socket + bthread_tag_t tag; Review Comment: 命名: bthread_tag ########## src/bthread/task_control.cpp: ########## @@ -216,15 +271,37 @@ int TaskControl::add_workers(int num) { return _concurrency.load(butil::memory_order_relaxed) - old_concurency; } +int TaskControl::add_workers_with_tag(int num, bthread_tag_t tag) { + _add_workers_with_tag = tag; + auto rc = add_workers(num); Review Comment: 是不是可以直接把add_workers方法改成 add_workers(int num, bthread_tag_t tag) ########## src/bthread/task_control.cpp: ########## @@ -216,15 +271,37 @@ int TaskControl::add_workers(int num) { return _concurrency.load(butil::memory_order_relaxed) - old_concurency; } +int TaskControl::add_workers_with_tag(int num, bthread_tag_t tag) { + _add_workers_with_tag = tag; + auto rc = add_workers(num); + _add_workers_with_tag = BTHREAD_TAG_INVALID; + return rc; +} + TaskGroup* TaskControl::choose_one_group() { - const size_t ngroup = _ngroup.load(butil::memory_order_acquire); + DCHECK(tls_task_group == nullptr || tls_task_group->tag() == BTHREAD_TAG_DEFAULT) + << "There will be a performance penalty for choosing other tagged group"; + auto groups = tag_group(BTHREAD_TAG_DEFAULT); + const auto ngroup = tag_ngroup(BTHREAD_TAG_DEFAULT).load(butil::memory_order_acquire); if (ngroup != 0) { - return _groups[butil::fast_rand_less_than(ngroup)]; + return groups[butil::fast_rand_less_than(ngroup)]; } CHECK(false) << "Impossible: ngroup is 0"; return NULL; } +TaskGroup* TaskControl::choose_one_group_with_tag(bthread_tag_t tag) { Review Comment: 是不是可以把choose_one_group直接改成choose_one_group(bthread_tag_t tag = BTHREAD_TAG_DEFAULT) ########## src/brpc/socket.h: ########## @@ -750,6 +752,7 @@ friend void DereferenceSocket(Socket*); // [ Set in ResetFileDescriptor ] butil::atomic<int> _fd; // -1 when not connected. + bthread_tag_t _tag; // tag of this socket Review Comment: 命名: _bthread_tag ########## src/bthread/bthread.cpp: ########## @@ -306,8 +345,14 @@ int bthread_setconcurrency(int num) { } if (num > bthread::FLAGS_bthread_concurrency) { // Create more workers if needed. - bthread::FLAGS_bthread_concurrency += - c->add_workers(num - bthread::FLAGS_bthread_concurrency); + auto tag = bthread::FLAGS_bthread_tag_to_set; + auto add = num - bthread::FLAGS_bthread_concurrency; + if (tag == BTHREAD_TAG_INVALID) { + add = bthread::add_workers_for_each_tag(add); Review Comment: 如果原来每个tag的worker数量不一样,然后add_workers一样的数量,最终数量还是不一样,预期应该是最终数量都调整到num吧? 感觉应该是封装一个bthread_setconcurrency(int num, bthread_tag_t tag)方法,然后针对每个tag分别调用这个方法 ########## src/bthread/bthread.cpp: ########## @@ -111,38 +135,49 @@ static bool validate_bthread_min_concurrency(const char*, int32_t val) { BAIDU_SCOPED_LOCK(g_task_control_mutex); int concurrency = c->concurrency(); if (val > concurrency) { - int added = c->add_workers(val - concurrency); + int added = bthread::add_workers_for_each_tag(val - concurrency); return added == (val - concurrency); } else { return true; } } +static bool validate_bthread_tag_to_set(const char*, int32_t val) { + return val >= BTHREAD_TAG_INVALID && val < FLAGS_task_group_ntags; +} + __thread TaskGroup* tls_task_group_nosignal = NULL; BUTIL_FORCE_INLINE int start_from_non_worker(bthread_t* __restrict tid, const bthread_attr_t* __restrict attr, - void * (*fn)(void*), + void* (*fn)(void*), void* __restrict arg) { TaskControl* c = get_or_new_task_control(); if (NULL == c) { return ENOMEM; } + TaskGroup* g = NULL; if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) { // Remember the TaskGroup to insert NOSIGNAL tasks for 2 reasons: // 1. NOSIGNAL is often for creating many bthreads in batch, // inserting into the same TaskGroup maximizes the batch. // 2. bthread_flush() needs to know which TaskGroup to flush. - TaskGroup* g = tls_task_group_nosignal; + g = tls_task_group_nosignal; if (NULL == g) { - g = c->choose_one_group(); + g = c->choose_one_group_with_tag(attr->tag); tls_task_group_nosignal = g; } return g->start_background<true>(tid, attr, fn, arg); } - return c->choose_one_group()->start_background<true>( - tid, attr, fn, arg); + g = c->choose_one_group_with_tag(attr ? attr->tag : BTHREAD_TAG_DEFAULT); + return g->start_background<true>(tid, attr, fn, arg); +} + +// if tag is default or equal to thread local use thread local task group +BUTIL_FORCE_INLINE bool can_run_thread_local(const bthread_attr_t* __restrict attr) { + return attr == nullptr || attr->tag == BTHREAD_TAG_DEFAULT || Review Comment: BTHREAD_TAG_DEFAULT值是0, 如果我现在在tag 1上运行,然后我想启动一个bthread,指定它在tag 0上运行,应该怎样指定attr呢? 如果指定attr.tag = 0,会被当成BTHREAD_TAG_DEFAULT,然后仍然在tag 1上运行 ########## src/bthread/bthread.cpp: ########## @@ -111,38 +135,49 @@ static bool validate_bthread_min_concurrency(const char*, int32_t val) { BAIDU_SCOPED_LOCK(g_task_control_mutex); int concurrency = c->concurrency(); if (val > concurrency) { - int added = c->add_workers(val - concurrency); + int added = bthread::add_workers_for_each_tag(val - concurrency); return added == (val - concurrency); } else { return true; } } +static bool validate_bthread_tag_to_set(const char*, int32_t val) { + return val >= BTHREAD_TAG_INVALID && val < FLAGS_task_group_ntags; +} + __thread TaskGroup* tls_task_group_nosignal = NULL; BUTIL_FORCE_INLINE int start_from_non_worker(bthread_t* __restrict tid, const bthread_attr_t* __restrict attr, - void * (*fn)(void*), + void* (*fn)(void*), void* __restrict arg) { TaskControl* c = get_or_new_task_control(); if (NULL == c) { return ENOMEM; } + TaskGroup* g = NULL; if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) { // Remember the TaskGroup to insert NOSIGNAL tasks for 2 reasons: // 1. NOSIGNAL is often for creating many bthreads in batch, // inserting into the same TaskGroup maximizes the batch. // 2. bthread_flush() needs to know which TaskGroup to flush. - TaskGroup* g = tls_task_group_nosignal; + g = tls_task_group_nosignal; if (NULL == g) { - g = c->choose_one_group(); + g = c->choose_one_group_with_tag(attr->tag); tls_task_group_nosignal = g; } return g->start_background<true>(tid, attr, fn, arg); } - return c->choose_one_group()->start_background<true>( - tid, attr, fn, arg); + g = c->choose_one_group_with_tag(attr ? attr->tag : BTHREAD_TAG_DEFAULT); + return g->start_background<true>(tid, attr, fn, arg); +} + +// if tag is default or equal to thread local use thread local task group +BUTIL_FORCE_INLINE bool can_run_thread_local(const bthread_attr_t* __restrict attr) { + return attr == nullptr || attr->tag == BTHREAD_TAG_DEFAULT || Review Comment: BTHREAD_TAG_DEFAULT值是0, 如果我现在在tag 1上运行,然后我想启动一个bthread,指定它在tag 0上运行,应该怎样指定attr呢? 如果指定attr.tag = 0,会被当成BTHREAD_TAG_DEFAULT,然后仍然在tag 1上运行 ########## src/bthread/task_control.h: ########## @@ -67,32 +71,55 @@ class TaskControl { void print_rq_sizes(std::ostream& os); double get_cumulated_worker_time(); + double get_cumulated_worker_time_with_tag(bthread_tag_t tag); int64_t get_cumulated_switch_count(); int64_t get_cumulated_signal_count(); // [Not thread safe] Add more worker threads. // Return the number of workers actually added, which may be less than |num| int add_workers(int num); + int add_workers_with_tag(int num, bthread_tag_t tag); + // Choose one TaskGroup (randomly right now). // If this method is called after init(), it never returns NULL. TaskGroup* choose_one_group(); + // Choose one TaskGroup with tag + // If no tag found or tag index greater or equal ngroup will return NULL. + TaskGroup* choose_one_group_with_tag(bthread_tag_t tag); + private: + typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups; + static const int PARKING_LOT_NUM = 4; + typedef std::array<ParkingLot, PARKING_LOT_NUM> TaggedParkingLot; // Add/Remove a TaskGroup. // Returns 0 on success, -1 otherwise. - int _add_group(TaskGroup*); + int _add_group(TaskGroup*, bthread_tag_t tag); int _destroy_group(TaskGroup*); + // Tag group + TaggedGroups& tag_group(bthread_tag_t tag) { return _tagged_groups[tag]; } + + // Tag ngroup + butil::atomic<size_t>& tag_ngroup(int tag) { return _tagged_ngroup[tag]; } + + // Tag parking slot + TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _pl[tag]; } + static void delete_task_group(void* arg); static void* worker_thread(void* task_control); bvar::LatencyRecorder& exposed_pending_time(); bvar::LatencyRecorder* create_exposed_pending_time(); + bvar::Adder<int64_t>& tag_nworkers(bthread_tag_t tag); + bvar::Adder<int64_t>& tag_nbthreads(bthread_tag_t tag); butil::atomic<size_t> _ngroup; Review Comment: _ngroup和_groups还有用吗 ########## src/bthread/bthread.cpp: ########## @@ -306,8 +345,14 @@ int bthread_setconcurrency(int num) { } if (num > bthread::FLAGS_bthread_concurrency) { // Create more workers if needed. - bthread::FLAGS_bthread_concurrency += - c->add_workers(num - bthread::FLAGS_bthread_concurrency); + auto tag = bthread::FLAGS_bthread_tag_to_set; + auto add = num - bthread::FLAGS_bthread_concurrency; + if (tag == BTHREAD_TAG_INVALID) { + add = bthread::add_workers_for_each_tag(add); Review Comment: 如果原来每个tag的worker数量不一样,然后add_workers一样的数量,最终数量还是不一样,预期应该是最终数量都调整到num吧? 感觉应该是封装一个bthread_setconcurrency(int num, bthread_tag_t tag)方法,然后针对每个tag分别调用这个方法 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org