wwbmmm commented on code in PR #2358:
URL: https://github.com/apache/brpc/pull/2358#discussion_r1336896700


##########
docs/cn/bthread_tagged_task_group.md:
##########
@@ -0,0 +1,34 @@
+
+# Bthread tagged task group
+
+在很多应用开发过程中都会有线程资源隔离的需求,比如服务分为控制层和数据层,数据层的请求压力大,不需要控制层受到影响;再比如,服务有多个磁盘,希望磁盘之间没有什么影响资源上的影响等。bthread的任务组打标签就是实现bthread的worker线程池按照tag分组,让不同分组之间达到没有互相影响的目的。客户端需要配合服务端使用,客户端配置channel的connection_group选项将请求分类,这些分类请求将自然的分配到服务端不同的线程池tag分组里面去。
+
+# 使用方式
+
+在example/bthread_tag_echo_c++里面有一个实例代码,分别启动服务端和客户端,服务端将worker划分成3个tag(分组),客户端可以设置不同的connection_group来访问不同的分组,例子里面设置为Test1、Test2、Test3。
+
+```c++
+服务端启动
+./echo_server -task_group_ntags 3 -bthread_concurrency 20 
-bthread_min_concurrency 12 -event_dispatcher_num 2
+
+客户端启动
+./echo_client -dummy_port 8888  -use_bthread true -connection_group="Test1"
+./echo_client -dummy_port 8888  -use_bthread true -connection_group="Test2"
+./echo_client -dummy_port 8888  -use_bthread true -connection_group="Test3"
+```
+
+一般情况应用创建的bthread并需要设置bthread_attr_t的tag字段,创建的bthread会在当前tag上下文中执行,如果希望创建的bthread不在当前tag上下文中执行,可以设置bthread_attr_t的tag字段为希望的值。
+
+# 监控
+
+目前监控上按照tag划分的指标有,线程的数量、线程的使用量、bthread_count、连接信息
+
+线程使用量:![img](../images/bthread_tag_worker_usage.png)
+
+worker线程动态调整,使用该功能需要将bthread_min_concurrency配置成非0。![img](../images/bthread_tag_add_worker.png)
+
+connections:![img](../images/bthread_tag_connections.png)

Review Comment:
   这个图里哪里体现tag了



##########
example/bthread_tag_echo_c++/server.cpp:
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A server to receive EchoRequest and send back EchoResponse.
+
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <brpc/server.h>
+#include "echo.pb.h"
+
+DEFINE_bool(echo_attachment, true, "Echo attachment as well");
+DEFINE_int32(port, 8002, "TCP Port of this server");
+DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
+             "read/write operations during the last `idle_timeout_s'");
+DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
+DEFINE_int32(internal_port, -1, "Only allow builtin services at this port");

Review Comment:
   没有看到bthread tag相关的参数



##########
src/bthread/task_control.cpp:
##########
@@ -202,8 +247,11 @@ int TaskControl::add_workers(int num) {
         // Worker will add itself to _idle_workers, so we have to add
         // _concurrency before create a worker.
         _concurrency.fetch_add(1);
+        auto tag = (_add_workers_with_tag != BTHREAD_TAG_INVALID) ? 
_add_workers_with_tag

Review Comment:
   这个add_workers(int num)的行为应该是为每个tag都增加num个worker?



##########
docs/cn/bthread_tagged_task_group.md:
##########
@@ -0,0 +1,34 @@
+
+# Bthread tagged task group
+
+在很多应用开发过程中都会有线程资源隔离的需求,比如服务分为控制层和数据层,数据层的请求压力大,不需要控制层受到影响;再比如,服务有多个磁盘,希望磁盘之间没有什么影响资源上的影响等。bthread的任务组打标签就是实现bthread的worker线程池按照tag分组,让不同分组之间达到没有互相影响的目的。客户端需要配合服务端使用,客户端配置channel的connection_group选项将请求分类,这些分类请求将自然的分配到服务端不同的线程池tag分组里面去。
+
+# 使用方式
+
+在example/bthread_tag_echo_c++里面有一个实例代码,分别启动服务端和客户端,服务端将worker划分成3个tag(分组),客户端可以设置不同的connection_group来访问不同的分组,例子里面设置为Test1、Test2、Test3。
+
+```c++
+服务端启动
+./echo_server -task_group_ntags 3 -bthread_concurrency 20 
-bthread_min_concurrency 12 -event_dispatcher_num 2
+
+客户端启动
+./echo_client -dummy_port 8888  -use_bthread true -connection_group="Test1"
+./echo_client -dummy_port 8888  -use_bthread true -connection_group="Test2"
+./echo_client -dummy_port 8888  -use_bthread true -connection_group="Test3"
+```
+
+一般情况应用创建的bthread并需要设置bthread_attr_t的tag字段,创建的bthread会在当前tag上下文中执行,如果希望创建的bthread不在当前tag上下文中执行,可以设置bthread_attr_t的tag字段为希望的值。

Review Comment:
   并需要->不需要



##########
docs/cn/bthread_tagged_task_group.md:
##########
@@ -0,0 +1,34 @@
+
+# Bthread tagged task group
+
+在很多应用开发过程中都会有线程资源隔离的需求,比如服务分为控制层和数据层,数据层的请求压力大,不需要控制层受到影响;再比如,服务有多个磁盘,希望磁盘之间没有什么影响资源上的影响等。bthread的任务组打标签就是实现bthread的worker线程池按照tag分组,让不同分组之间达到没有互相影响的目的。客户端需要配合服务端使用,客户端配置channel的connection_group选项将请求分类,这些分类请求将自然的分配到服务端不同的线程池tag分组里面去。
+
+# 使用方式
+
+在example/bthread_tag_echo_c++里面有一个实例代码,分别启动服务端和客户端,服务端将worker划分成3个tag(分组),客户端可以设置不同的connection_group来访问不同的分组,例子里面设置为Test1、Test2、Test3。
+
+```c++
+服务端启动
+./echo_server -task_group_ntags 3 -bthread_concurrency 20 
-bthread_min_concurrency 12 -event_dispatcher_num 2
+
+客户端启动
+./echo_client -dummy_port 8888  -use_bthread true -connection_group="Test1"
+./echo_client -dummy_port 8888  -use_bthread true -connection_group="Test2"
+./echo_client -dummy_port 8888  -use_bthread true -connection_group="Test3"
+```
+
+一般情况应用创建的bthread并需要设置bthread_attr_t的tag字段,创建的bthread会在当前tag上下文中执行,如果希望创建的bthread不在当前tag上下文中执行,可以设置bthread_attr_t的tag字段为希望的值。
+
+# 监控
+
+目前监控上按照tag划分的指标有,线程的数量、线程的使用量、bthread_count、连接信息
+
+线程使用量:![img](../images/bthread_tag_worker_usage.png)
+
+worker线程动态调整,使用该功能需要将bthread_min_concurrency配置成非0。![img](../images/bthread_tag_add_worker.png)
+
+connections:![img](../images/bthread_tag_connections.png)

Review Comment:
   这个图里哪里体现tag了



##########
example/bthread_tag_echo_c++/server.cpp:
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A server to receive EchoRequest and send back EchoResponse.
+
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <brpc/server.h>
+#include "echo.pb.h"
+
+DEFINE_bool(echo_attachment, true, "Echo attachment as well");
+DEFINE_int32(port, 8002, "TCP Port of this server");
+DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
+             "read/write operations during the last `idle_timeout_s'");
+DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
+DEFINE_int32(internal_port, -1, "Only allow builtin services at this port");

Review Comment:
   没有看到bthread tag相关的参数



##########
src/brpc/acceptor.cpp:
##########
@@ -40,7 +40,8 @@ Acceptor::Acceptor(bthread_keytable_pool_t* pool)
     , _empty_cond(&_map_mutex)
     , _force_ssl(false)
     , _ssl_ctx(NULL) 
-    , _use_rdma(false) {
+    , _use_rdma(false)
+    , _tag_state(SocketTagOptions::TAG_RECV) {

Review Comment:
   _tag_state的值还有哪个地方会修改吗?没有的话直接用常量是不是就可以了



##########
src/bthread/unstable.h:
##########
@@ -82,7 +82,7 @@ extern int bthread_connect(int sockfd, const struct sockaddr* 
serv_addr,
 // Add a startup function that each pthread worker will run at the beginning
 // To run code at the end, use butil::thread_atexit()
 // Returns 0 on success, error code otherwise.
-extern int bthread_set_worker_startfn(void (*start_fn)());
+extern int bthread_set_worker_startfn(void (*start_fn)(bthread_tag_t));

Review Comment:
   这个接口和以前不兼容了
   建议不要改这个接口,新增一个bthread_set_tagged_worker_startfn



##########
src/bthread/task_control.cpp:
##########
@@ -202,8 +247,11 @@ int TaskControl::add_workers(int num) {
         // Worker will add itself to _idle_workers, so we have to add
         // _concurrency before create a worker.
         _concurrency.fetch_add(1);
+        auto tag = (_add_workers_with_tag != BTHREAD_TAG_INVALID) ? 
_add_workers_with_tag

Review Comment:
   这个add_workers(int num)的行为应该是为每个tag都增加num个worker?



##########
example/bthread_tag_echo_c++/client.cpp:
##########
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A client sending requests to server by multiple threads.
+
+#include <gflags/gflags.h>
+#include <bthread/bthread.h>
+#include <butil/logging.h>
+#include <brpc/server.h>
+#include <brpc/channel.h>
+#include "echo.pb.h"
+#include <bvar/bvar.h>
+
+DEFINE_int32(thread_num, 50, "Number of threads to send requests");
+DEFINE_bool(use_bthread, false, "Use bthread to send requests");
+DEFINE_int32(attachment_size, 0, "Carry so many byte attachment along with 
requests");
+DEFINE_int32(request_size, 16, "Bytes of each request");
+DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in 
src/brpc/options.proto");
+DEFINE_string(connection_type, "", "Connection type. Available values: single, 
pooled, short");
+DEFINE_string(server, "0.0.0.0:8002", "IP Address of server");
+DEFINE_string(load_balancer, "", "The algorithm for load balancing");
+DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
+DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); 
+DEFINE_bool(dont_fail, false, "Print fatal when some call failed");
+DEFINE_bool(enable_ssl, false, "Use SSL connection");
+DEFINE_int32(dummy_port, -1, "Launch dummy server at this port");
+DEFINE_string(connection_group, "", "Connection group for channel");
+DEFINE_int32(bthread_tag, BTHREAD_TAG_DEFAULT, "bthread used tag");

Review Comment:
   如果这个代码大部分和echo_c++相同,是不是可以直接修改echo_c++中的代码



##########
docs/cn/bthread_tagged_task_group.md:
##########
@@ -0,0 +1,34 @@
+
+# Bthread tagged task group
+
+在很多应用开发过程中都会有线程资源隔离的需求,比如服务分为控制层和数据层,数据层的请求压力大,不需要控制层受到影响;再比如,服务有多个磁盘,希望磁盘之间没有什么影响资源上的影响等。bthread的任务组打标签就是实现bthread的worker线程池按照tag分组,让不同分组之间达到没有互相影响的目的。客户端需要配合服务端使用,客户端配置channel的connection_group选项将请求分类,这些分类请求将自然的分配到服务端不同的线程池tag分组里面去。
+
+# 使用方式
+
+在example/bthread_tag_echo_c++里面有一个实例代码,分别启动服务端和客户端,服务端将worker划分成3个tag(分组),客户端可以设置不同的connection_group来访问不同的分组,例子里面设置为Test1、Test2、Test3。
+
+```c++
+服务端启动
+./echo_server -task_group_ntags 3 -bthread_concurrency 20 
-bthread_min_concurrency 12 -event_dispatcher_num 2
+
+客户端启动
+./echo_client -dummy_port 8888  -use_bthread true -connection_group="Test1"
+./echo_client -dummy_port 8888  -use_bthread true -connection_group="Test2"
+./echo_client -dummy_port 8888  -use_bthread true -connection_group="Test3"
+```
+
+一般情况应用创建的bthread并需要设置bthread_attr_t的tag字段,创建的bthread会在当前tag上下文中执行,如果希望创建的bthread不在当前tag上下文中执行,可以设置bthread_attr_t的tag字段为希望的值。

Review Comment:
   并需要->不需要



##########
src/bthread/bthread.cpp:
##########
@@ -104,6 +107,9 @@ static bool validate_bthread_min_concurrency(const char*, 
int32_t val) {
     if (val < BTHREAD_MIN_CONCURRENCY || val > FLAGS_bthread_concurrency) {
         return false;
     }
+    if (val < FLAGS_task_group_ntags * BTHREAD_MIN_CONCURRENCY) {

Review Comment:
   这个concurrency的值设置的是每个tag的worker数,还是所有tag的worker数呢?



##########
src/brpc/acceptor.cpp:
##########
@@ -40,7 +40,8 @@ Acceptor::Acceptor(bthread_keytable_pool_t* pool)
     , _empty_cond(&_map_mutex)
     , _force_ssl(false)
     , _ssl_ctx(NULL) 
-    , _use_rdma(false) {
+    , _use_rdma(false)
+    , _tag_state(SocketTagOptions::TAG_RECV) {

Review Comment:
   _tag_state的值还有哪个地方会修改吗?没有的话直接用常量是不是就可以了



##########
src/bthread/bthread.cpp:
##########
@@ -187,8 +202,14 @@ int bthread_start_background(bthread_t* __restrict tid,
                              void* __restrict arg) {
     bthread::TaskGroup* g = bthread::tls_task_group;
     if (g) {
-        // start from worker
-        return g->start_background<false>(tid, attr, fn, arg);
+        // if attribute is null use thread local task group
+        if (attr == nullptr) {
+            return g->start_background<false>(tid, attr, fn, arg);
+        }
+        // if attribute tag is null or default use thread local task group
+        if (attr->tag == BTHREAD_TAG_DEFAULT || attr->tag == g->tag()) {

Review Comment:
   同上



##########
src/bthread/bthread.cpp:
##########
@@ -175,8 +184,14 @@ int bthread_start_urgent(bthread_t* __restrict tid,
                          void* __restrict arg) {
     bthread::TaskGroup* g = bthread::tls_task_group;
     if (g) {
-        // start from worker
-        return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg);
+        // if attribute is null use thread local task group
+        if (attr == nullptr) {
+            return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, 
arg);
+        }
+        // if attribute tag is null or default use thread local task group
+        if (attr->tag == BTHREAD_TAG_DEFAULT || attr->tag == g->tag()) {

Review Comment:
   这个判断条件和上面的可以合并吧



##########
src/brpc/socket_map_key.h:
##########
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   这个文件独立出来是有什么考虑么



##########
src/bthread/butex.cpp:
##########
@@ -273,15 +273,16 @@ void butex_destroy(void* butex) {
 }
 
 inline TaskGroup* get_task_group(TaskControl* c, bool nosignal = false) {
-    TaskGroup* g;
+    TaskGroup* g = tls_task_group;
     if (nosignal) {
-        g = tls_task_group_nosignal;
-        if (NULL == g) {
-            g = c->choose_one_group();
+        if (NULL == tls_task_group_nosignal) {
+            g = g ? g : c->choose_one_group();

Review Comment:
   这里的行为和原来不一样了,改动的原因是什么?



##########
src/bthread/butex.cpp:
##########
@@ -273,15 +273,16 @@ void butex_destroy(void* butex) {
 }
 
 inline TaskGroup* get_task_group(TaskControl* c, bool nosignal = false) {
-    TaskGroup* g;
+    TaskGroup* g = tls_task_group;
     if (nosignal) {
-        g = tls_task_group_nosignal;
-        if (NULL == g) {
-            g = c->choose_one_group();
+        if (NULL == tls_task_group_nosignal) {
+            g = g ? g : c->choose_one_group();

Review Comment:
   这里的行为和原来不一样了,改动的原因是什么?



##########
src/bthread/bthread.cpp:
##########
@@ -175,8 +184,14 @@ int bthread_start_urgent(bthread_t* __restrict tid,
                          void* __restrict arg) {
     bthread::TaskGroup* g = bthread::tls_task_group;
     if (g) {
-        // start from worker
-        return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg);
+        // if attribute is null use thread local task group
+        if (attr == nullptr) {
+            return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, 
arg);
+        }
+        // if attribute tag is null or default use thread local task group
+        if (attr->tag == BTHREAD_TAG_DEFAULT || attr->tag == g->tag()) {

Review Comment:
   这个判断条件和上面的可以合并吧



##########
src/bthread/bthread.cpp:
##########
@@ -104,6 +107,9 @@ static bool validate_bthread_min_concurrency(const char*, 
int32_t val) {
     if (val < BTHREAD_MIN_CONCURRENCY || val > FLAGS_bthread_concurrency) {
         return false;
     }
+    if (val < FLAGS_task_group_ntags * BTHREAD_MIN_CONCURRENCY) {

Review Comment:
   这个concurrency的值设置的是每个tag的worker数,还是所有tag的worker数呢?



##########
example/bthread_tag_echo_c++/client.cpp:
##########
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A client sending requests to server by multiple threads.
+
+#include <gflags/gflags.h>
+#include <bthread/bthread.h>
+#include <butil/logging.h>
+#include <brpc/server.h>
+#include <brpc/channel.h>
+#include "echo.pb.h"
+#include <bvar/bvar.h>
+
+DEFINE_int32(thread_num, 50, "Number of threads to send requests");
+DEFINE_bool(use_bthread, false, "Use bthread to send requests");
+DEFINE_int32(attachment_size, 0, "Carry so many byte attachment along with 
requests");
+DEFINE_int32(request_size, 16, "Bytes of each request");
+DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in 
src/brpc/options.proto");
+DEFINE_string(connection_type, "", "Connection type. Available values: single, 
pooled, short");
+DEFINE_string(server, "0.0.0.0:8002", "IP Address of server");
+DEFINE_string(load_balancer, "", "The algorithm for load balancing");
+DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
+DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); 
+DEFINE_bool(dont_fail, false, "Print fatal when some call failed");
+DEFINE_bool(enable_ssl, false, "Use SSL connection");
+DEFINE_int32(dummy_port, -1, "Launch dummy server at this port");
+DEFINE_string(connection_group, "", "Connection group for channel");
+DEFINE_int32(bthread_tag, BTHREAD_TAG_DEFAULT, "bthread used tag");

Review Comment:
   如果这个代码大部分和echo_c++相同,是不是可以直接修改echo_c++中的代码



##########
src/bthread/bthread.cpp:
##########
@@ -187,8 +202,14 @@ int bthread_start_background(bthread_t* __restrict tid,
                              void* __restrict arg) {
     bthread::TaskGroup* g = bthread::tls_task_group;
     if (g) {
-        // start from worker
-        return g->start_background<false>(tid, attr, fn, arg);
+        // if attribute is null use thread local task group
+        if (attr == nullptr) {
+            return g->start_background<false>(tid, attr, fn, arg);
+        }
+        // if attribute tag is null or default use thread local task group
+        if (attr->tag == BTHREAD_TAG_DEFAULT || attr->tag == g->tag()) {

Review Comment:
   同上



##########
src/bthread/unstable.h:
##########
@@ -82,7 +82,7 @@ extern int bthread_connect(int sockfd, const struct sockaddr* 
serv_addr,
 // Add a startup function that each pthread worker will run at the beginning
 // To run code at the end, use butil::thread_atexit()
 // Returns 0 on success, error code otherwise.
-extern int bthread_set_worker_startfn(void (*start_fn)());
+extern int bthread_set_worker_startfn(void (*start_fn)(bthread_tag_t));

Review Comment:
   这个接口和以前不兼容了
   建议不要改这个接口,新增一个bthread_set_tagged_worker_startfn



##########
src/brpc/socket_map_key.h:
##########
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   这个文件独立出来是有什么考虑么



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to