This is an automated email from the ASF dual-hosted git repository.
guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 018ce775 Add flag for bthread priority queue, the default is false
(#3078)
018ce775 is described below
commit 018ce775f2914f35b74436107f0d647538757e16
Author: Bright Chen <[email protected]>
AuthorDate: Mon Sep 1 14:09:51 2025 +0800
Add flag for bthread priority queue, the default is false (#3078)
* Add flag for bthread priority queue, the default is false
* Only EventDispatcher thread has flag BTHREAD_GLOBAL_PRIORITY
---
src/brpc/event_dispatcher.h | 2 +-
src/brpc/event_dispatcher_epoll.cpp | 12 +++++++-----
src/brpc/event_dispatcher_kqueue.cpp | 8 ++++++--
src/brpc/socket.cpp | 2 --
src/bthread/task_control.cpp | 3 +++
src/bthread/task_control.h | 2 ++
src/bthread/task_group.cpp | 2 +-
src/bthread/types.h | 4 ----
8 files changed, 20 insertions(+), 15 deletions(-)
diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h
index 51c404e2..fd91d3c5 100644
--- a/src/brpc/event_dispatcher.h
+++ b/src/brpc/event_dispatcher.h
@@ -99,7 +99,7 @@ public:
virtual ~EventDispatcher();
// Start this dispatcher in a bthread.
- // Use |*consumer_thread_attr| (if it's not NULL) as the attribute to
+ // Use |*thread_attr| (if it's not NULL) as the attribute to
// create bthreads running user callbacks.
// Returns 0 on success, -1 otherwise.
virtual int Start(const bthread_attr_t* thread_attr);
diff --git a/src/brpc/event_dispatcher_epoll.cpp
b/src/brpc/event_dispatcher_epoll.cpp
index 005119f7..5a4a6370 100644
--- a/src/brpc/event_dispatcher_epoll.cpp
+++ b/src/brpc/event_dispatcher_epoll.cpp
@@ -26,7 +26,7 @@ EventDispatcher::EventDispatcher()
: _event_dispatcher_fd(-1)
, _stop(false)
, _tid(0)
- , _thread_attr(BTHREAD_ATTR_EPOLL) {
+ , _thread_attr(BTHREAD_ATTR_NORMAL) {
_event_dispatcher_fd = epoll_create(1024 * 1024);
if (_event_dispatcher_fd < 0) {
PLOG(FATAL) << "Fail to create epoll";
@@ -55,7 +55,7 @@ EventDispatcher::~EventDispatcher() {
}
}
-int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
+int EventDispatcher::Start(const bthread_attr_t* thread_attr) {
if (_event_dispatcher_fd < 0) {
LOG(FATAL) << "epoll was not created";
return -1;
@@ -69,13 +69,15 @@ int EventDispatcher::Start(const bthread_attr_t*
consumer_thread_attr) {
// Set _thread_attr before creating epoll thread to make sure
// everyting seems sane to the thread.
- if (consumer_thread_attr) {
- _thread_attr = *consumer_thread_attr | BTHREAD_GLOBAL_PRIORITY;
+ if (thread_attr) {
+ _thread_attr = *thread_attr;
}
//_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it
will cause new bthread
// that created by epoll_wait() never to quit.
- bthread_attr_t epoll_thread_attr = _thread_attr | BTHREAD_NEVER_QUIT;
+ // Only event dispatcher thread has flag BTHREAD_GLOBAL_PRIORITY.
+ bthread_attr_t epoll_thread_attr =
+ _thread_attr | BTHREAD_NEVER_QUIT | BTHREAD_GLOBAL_PRIORITY;
// Polling thread uses the same attr for consumer threads (NORMAL right
// now). Previously, we used small stack (32KB) which may be overflowed
diff --git a/src/brpc/event_dispatcher_kqueue.cpp
b/src/brpc/event_dispatcher_kqueue.cpp
index a1790486..48b28147 100644
--- a/src/brpc/event_dispatcher_kqueue.cpp
+++ b/src/brpc/event_dispatcher_kqueue.cpp
@@ -69,11 +69,15 @@ int EventDispatcher::Start(const bthread_attr_t*
thread_attr) {
// Set _thread_attr before creating kqueue thread to make sure
// everyting seems sane to the thread.
- _thread_attr = (thread_attr ? *thread_attr : BTHREAD_ATTR_NORMAL);
+ if (thread_attr) {
+ _thread_attr = *thread_attr;
+ }
//_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it
will cause new bthread
// that created by kevent() never to quit.
- bthread_attr_t kqueue_thread_attr = _thread_attr | BTHREAD_NEVER_QUIT;
+ // Only event dispatcher thread has flag BTHREAD_GLOBAL_PRIORITY.
+ bthread_attr_t kqueue_thread_attr =
+ _thread_attr | BTHREAD_NEVER_QUIT | BTHREAD_GLOBAL_PRIORITY;
// Polling thread uses the same attr for consumer threads (NORMAL right
// now). Previously, we used small stack (32KB) which may be overflowed
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index 075e03ac..6de247c3 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -2261,8 +2261,6 @@ int Socket::OnInputEvent(void* user_data, uint32_t events,
bthread_attr_t attr = thread_attr;
attr.keytable_pool = p->_keytable_pool;
attr.tag = bthread_self_tag();
- // Only event dispatcher thread has flag BTHREAD_GLOBAL_PRIORITY
- attr.flags = attr.flags & (~BTHREAD_GLOBAL_PRIORITY);
if (FLAGS_usercode_in_coroutine) {
ProcessEvent(p);
#if BRPC_WITH_RDMA
diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp
index 1bdcf0c0..7d121349 100644
--- a/src/bthread/task_control.cpp
+++ b/src/bthread/task_control.cpp
@@ -45,6 +45,8 @@ DEFINE_bool(task_group_set_worker_name, true,
namespace bthread {
+DEFINE_bool(enable_bthread_priority_queue, false, "Whether to enable priority
queue");
+
DECLARE_int32(bthread_concurrency);
DECLARE_int32(bthread_min_concurrency);
DECLARE_int32(bthread_parking_lot_of_each_tag);
@@ -187,6 +189,7 @@ TaskControl::TaskControl()
, _signal_per_second(&_cumulated_signal_count)
, _status(print_rq_sizes_in_the_tc, this)
, _nbthreads("bthread_count")
+ , _enable_priority_queue(FLAGS_enable_bthread_priority_queue)
, _priority_queues(FLAGS_task_group_ntags)
, _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
, _tagged_pl(FLAGS_task_group_ntags)
diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h
index 2426b00c..4d666025 100644
--- a/src/bthread/task_control.h
+++ b/src/bthread/task_control.h
@@ -156,6 +156,8 @@ private:
std::vector<bvar::PassiveStatus<double>*> _tagged_cumulated_worker_time;
std::vector<bvar::PerSecond<bvar::PassiveStatus<double>>*>
_tagged_worker_usage_second;
std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;
+
+ bool _enable_priority_queue;
std::vector<WorkStealingQueue<bthread_t>> _priority_queues;
size_t _pl_num_of_each_tag;
diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp
index 773a442b..67f029a0 100644
--- a/src/bthread/task_group.cpp
+++ b/src/bthread/task_group.cpp
@@ -515,7 +515,7 @@ int TaskGroup::start_foreground(TaskGroup** pg,
// NOSIGNAL affects current task, not the new task.
RemainedFn fn = NULL;
auto& cur_attr = g->_cur_meta->attr;
- if (cur_attr.flags & BTHREAD_GLOBAL_PRIORITY) {
+ if (g->_control->_enable_priority_queue && cur_attr.flags &
BTHREAD_GLOBAL_PRIORITY) {
fn = priority_to_run;
} else if (g->current_task()->about_to_quit) {
fn = ready_to_run_in_worker_ignoresignal;
diff --git a/src/bthread/types.h b/src/bthread/types.h
index 30368f68..a09c2e38 100644
--- a/src/bthread/types.h
+++ b/src/bthread/types.h
@@ -138,10 +138,6 @@ static const bthread_attr_t BTHREAD_ATTR_NORMAL =
{BTHREAD_STACKTYPE_NORMAL, 0,
static const bthread_attr_t BTHREAD_ATTR_LARGE = {BTHREAD_STACKTYPE_LARGE, 0,
NULL,
BTHREAD_TAG_INVALID};
-// epoll bthread
-static const bthread_attr_t BTHREAD_ATTR_EPOLL = {
- BTHREAD_STACKTYPE_NORMAL, BTHREAD_GLOBAL_PRIORITY, NULL,
BTHREAD_TAG_INVALID};
-
// bthreads created with this attribute will print log when it's started,
// context-switched, finished.
static const bthread_attr_t BTHREAD_ATTR_DEBUG = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]