This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new e1bf467b set tags workers unlimitedly (#2801)
e1bf467b is described below
commit e1bf467b20e1ca12023ed11b0aad567f6653a735
Author: Jade <[email protected]>
AuthorDate: Thu Oct 31 10:13:43 2024 +0800
set tags workers unlimitedly (#2801)
* set tags workers unlimitedly
* fix set concurrency test
---------
Co-authored-by: jiazheng.jia <[email protected]>
---
docs/cn/bthread_tagged_task_group.md | 6 +++---
example/bthread_tag_echo_c++/server.cpp | 4 ++--
src/brpc/server.cpp | 6 +-----
src/bthread/bthread.cpp | 20 +++++++++++---------
test/bthread_setconcurrency_unittest.cpp | 6 ++++--
5 files changed, 21 insertions(+), 21 deletions(-)
diff --git a/docs/cn/bthread_tagged_task_group.md
b/docs/cn/bthread_tagged_task_group.md
index bfdafd76..027bd4eb 100644
--- a/docs/cn/bthread_tagged_task_group.md
+++ b/docs/cn/bthread_tagged_task_group.md
@@ -11,19 +11,19 @@
```c++
服务端启动
-./echo_server -task_group_ntags 3 -tag1 0 -tag2 1 -bthread_concurrency 20
-bthread_min_concurrency 12 -event_dispatcher_num 1
+./echo_server -task_group_ntags 3 -tag1 0 -tag2 1 -bthread_concurrency 20
-bthread_min_concurrency 8 -event_dispatcher_num 1
客户端启动
./echo_client -dummy_port 8888 -server "0.0.0.0:8002" -use_bthread true
./echo_client -dummy_port 8889 -server "0.0.0.0:8003" -use_bthread true
```
-FLAGS_bthread_concurrency为所有分组的线程数的上限,FLAGS_bthread_min_concurrency为所有分组的线程数的下限,FLAGS_event_dispatcher_num为单个分组中事件驱动器的数量。FLAGS_bthread_current_tag为将要修改的分组的tag值,FLAGS_bthread_concurrency_by_tag设置这个分组的线程数。
+FLAGS_bthread_concurrency为所有线程的数,FLAGS_bthread_min_concurrency为所有分组的线程数的下限,FLAGS_event_dispatcher_num为单个分组中事件驱动器的数量。FLAGS_bthread_current_tag为将要修改的分组的tag值,FLAGS_bthread_concurrency_by_tag设置这个分组的线程数。
一般情况应用创建的bthread不需要设置bthread_attr_t的tag字段,创建的bthread会在当前tag上下文中执行;如果希望创建的bthread不在当前tag上下文中执行,可以设置bthread_attr_t的tag字段为希望的值,这么做会对性能有些损失,关键路径上应该避免这么做。
Q:如何动态改变分组线程的数量?
-A:server的线程数最少为4个,后台任务线程数最少为2个,所以上面的例子中,FLAGS_bthread_concurrency最小值为4+4+2=10,再设置FLAGS_bthread_min_concurrency=FLAGS_bthread_concurrency,之后再把FLAGS_bthread_concurrency改大一些,之后再设置FLAGS_bthread_current_tag和FLAGS_bthread_concurrency_by_tag来改变某个分组的线程数。对于server,如果设置了ServerOption.bthread_tag,num_threads的含义是这个分组的线程数;如果没有设置(相当于没有启用分组,默认值为BTHREAD_TAG_INVALID),num_thread的含义是所有分组的线程数。
+A:你可以根据你的服务更自由的设计你的每个分组的线程数,启动的时候会根据你设置的 bthread_concurrency 来初始化线程池,如果你设置了
bthread_min_concurrency,那么会根据 bthread_min_concurrency 来设置线程池,对于 server
来说,num_threads 就是该 tag 对应的 worker 数量。可以通过设置 FLAGS_bthread_current_tag 和
FLAGS_bthread_concurrency_by_tag
来改变某个分组的线程数。如果没有设置(相当于没有启用分组,默认值为BTHREAD_TAG_INVALID),num_threads的含义是所有分组的
worker 总数。
Q:不同分组之间有什么关系吗?
diff --git a/example/bthread_tag_echo_c++/server.cpp
b/example/bthread_tag_echo_c++/server.cpp
index bc717e25..ed4ba4d6 100644
--- a/example/bthread_tag_echo_c++/server.cpp
+++ b/example/bthread_tag_echo_c++/server.cpp
@@ -29,8 +29,8 @@ DEFINE_int32(port2, 8003, "TCP Port of this server");
DEFINE_int32(tag1, 0, "Server1 tag");
DEFINE_int32(tag2, 1, "Server2 tag");
DEFINE_int32(tag3, 2, "Background task tag");
-DEFINE_int32(num_threads1, 4, "Thread number of server1");
-DEFINE_int32(num_threads2, 4, "Thread number of server2");
+DEFINE_int32(num_threads1, 6, "Thread number of server1");
+DEFINE_int32(num_threads2, 16, "Thread number of server2");
DEFINE_int32(idle_timeout_s, -1,
"Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp
index fa3ab7d7..0110761a 100644
--- a/src/brpc/server.cpp
+++ b/src/brpc/server.cpp
@@ -1044,11 +1044,7 @@ int Server::StartInternal(const butil::EndPoint&
endpoint,
if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) {
_options.num_threads = BTHREAD_MIN_CONCURRENCY;
}
- if (original_bthread_tag == BTHREAD_TAG_INVALID) {
- bthread_setconcurrency(_options.num_threads);
- } else {
- bthread_setconcurrency_by_tag(_options.num_threads,
_options.bthread_tag);
- }
+ bthread_setconcurrency_by_tag(_options.num_threads,
_options.bthread_tag);
}
for (MethodMap::iterator it = _method_map.begin();
diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp
index dcd29c43..f963c4a6 100644
--- a/src/bthread/bthread.cpp
+++ b/src/bthread/bthread.cpp
@@ -397,20 +397,22 @@ int bthread_setconcurrency_by_tag(int num, bthread_tag_t
tag) {
}
auto c = bthread::get_or_new_task_control();
BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
- auto ngroup = c->concurrency();
auto tag_ngroup = c->concurrency(tag);
auto add = num - tag_ngroup;
- if (ngroup + add > bthread::FLAGS_bthread_concurrency) {
- LOG(ERROR) << "Fail to set concurrency by tag " << tag
- << ", Total concurrency larger than bthread_concurrency";
- return EPERM;
- }
- auto added = 0;
+
if (add > 0) {
- added = c->add_workers(add, tag);
+ auto added = c->add_workers(add, tag);
+ bthread::FLAGS_bthread_concurrency += added;
return (add == added ? 0 : EPERM);
+
+ } else if (add < 0){
+ LOG(WARNING) << "Fail to set concurrency by tag: " << tag
+ << ", tag concurrency must larger than old oncurrency.
old concurrency: "
+ << tag_ngroup << ", new concurrency: " << num;
+ return EPERM;
+ } else {
+ return 0;
}
- return (num == tag_ngroup ? 0 : EPERM);
}
int bthread_about_to_quit() {
diff --git a/test/bthread_setconcurrency_unittest.cpp
b/test/bthread_setconcurrency_unittest.cpp
index 7c8faf40..aa1d674c 100644
--- a/test/bthread_setconcurrency_unittest.cpp
+++ b/test/bthread_setconcurrency_unittest.cpp
@@ -214,9 +214,11 @@ int concurrency_by_tag(int num) {
TEST(BthreadTest, concurrency_by_tag) {
ASSERT_EQ(concurrency_by_tag(1), false);
- auto con = bthread_getconcurrency_by_tag(0);
+ auto tag_con = bthread_getconcurrency_by_tag(0);
+ auto con = bthread_getconcurrency();
ASSERT_EQ(concurrency_by_tag(con), true);
- ASSERT_EQ(concurrency_by_tag(con + 1), false);
+ ASSERT_EQ(concurrency_by_tag(con + 1), true);
+ ASSERT_EQ(bthread_getconcurrency(), con+1);
bthread_setconcurrency(con + 1);
ASSERT_EQ(concurrency_by_tag(con + 1), true);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]