This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new f949962e Support custom ParkingLot number (#3033)
f949962e is described below
commit f949962e141fecba92bbd399d1da75fe2ac6862d
Author: Bright Chen <[email protected]>
AuthorDate: Mon Aug 18 10:18:27 2025 +0800
Support custom ParkingLot number (#3033)
* Support custom ParkingLot number
* Validate bthread_parking_lot_of_each_tag flag
* Fast path, no need to futex_wait
---
src/bthread/bthread.cpp | 17 ++++++++++++++++-
src/bthread/parking_lot.h | 4 ++++
src/bthread/task_control.cpp | 21 ++++++++++-----------
src/bthread/task_control.h | 8 ++++----
src/bthread/types.h | 3 +++
5 files changed, 37 insertions(+), 16 deletions(-)
diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp
index 35d87477..a5f178e3 100644
--- a/src/bthread/bthread.cpp
+++ b/src/bthread/bthread.cpp
@@ -59,6 +59,21 @@ DEFINE_int32(bthread_concurrency_by_tag, 8 +
BTHREAD_EPOLL_THREAD_NUM,
"Number of pthread workers of FLAGS_bthread_current_tag");
BUTIL_VALIDATE_GFLAG(bthread_concurrency_by_tag,
validate_bthread_concurrency_by_tag);
+DEFINE_int32(bthread_parking_lot_of_each_tag, 4, "Number of parking lots of
each tag");
+BUTIL_VALIDATE_GFLAG(bthread_parking_lot_of_each_tag, [](const char*, int32_t
val) {
+ if (val < BTHREAD_MIN_PARKINGLOT) {
+ LOG(ERROR) << "bthread_parking_lot_of_each_tag must be greater than or
equal to "
+ << BTHREAD_MIN_PARKINGLOT;
+ return false;
+ }
+ if (val > BTHREAD_MAX_PARKINGLOT) {
+ LOG(ERROR) << "bthread_parking_lot_of_each_tag must be less than or
equal to "
+ << BTHREAD_MAX_PARKINGLOT;
+ return false;
+ }
+ return true;
+});
+
static bool never_set_bthread_concurrency = true;
BAIDU_CASSERT(sizeof(TaskControl*) == sizeof(butil::atomic<TaskControl*>),
atomic_size_match);
@@ -216,7 +231,7 @@ static bool validate_bthread_current_tag(const char*,
int32_t val) {
return false;
}
BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
- auto c = bthread::get_task_control();
+ auto c = get_task_control();
if (c == NULL) {
FLAGS_bthread_concurrency_by_tag = 8 + BTHREAD_EPOLL_THREAD_NUM;
return true;
diff --git a/src/bthread/parking_lot.h b/src/bthread/parking_lot.h
index 620e3c89..315e9956 100644
--- a/src/bthread/parking_lot.h
+++ b/src/bthread/parking_lot.h
@@ -60,6 +60,10 @@ public:
// Wait for tasks.
// If the `expected_state' does not match, wait() may finish directly.
void wait(const State& expected_state) {
+ if (get_state().val != expected_state.val) {
+ // Fast path, no need to futex_wait.
+ return;
+ }
_waiter_num.fetch_add(1, butil::memory_order_relaxed);
futex_wait_private(&_pending_signal, expected_state.val, NULL);
_waiter_num.fetch_sub(1, butil::memory_order_relaxed);
diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp
index 0cc3754a..05ceec09 100644
--- a/src/bthread/task_control.cpp
+++ b/src/bthread/task_control.cpp
@@ -46,6 +46,7 @@ namespace bthread {
DECLARE_int32(bthread_concurrency);
DECLARE_int32(bthread_min_concurrency);
+DECLARE_int32(bthread_parking_lot_of_each_tag);
extern pthread_mutex_t g_task_control_mutex;
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
@@ -186,7 +187,8 @@ TaskControl::TaskControl()
, _status(print_rq_sizes_in_the_tc, this)
, _nbthreads("bthread_count")
, _priority_queues(FLAGS_task_group_ntags)
- , _pl(FLAGS_task_group_ntags)
+ , _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
+ , _tagged_pl(FLAGS_task_group_ntags)
{}
int TaskControl::init(int concurrency) {
@@ -326,7 +328,7 @@ void TaskControl::stop_and_join() {
[](butil::atomic<size_t>& index) { index.store(0,
butil::memory_order_relaxed); });
}
for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
- for (auto& pl : _pl[i]) {
+ for (auto& pl : _tagged_pl[i]) {
pl.stop();
}
}
@@ -367,7 +369,7 @@ int TaskControl::_add_group(TaskGroup* g, bthread_tag_t
tag) {
return -1;
}
g->set_tag(tag);
- g->set_pl(&_pl[tag][butil::fmix64(pthread_numeric_id()) %
PARKING_LOT_NUM]);
+ g->set_pl(&_tagged_pl[tag][butil::fmix64(pthread_numeric_id()) %
_pl_num_of_each_tag]);
size_t ngroup = _tagged_ngroup[tag].load(butil::memory_order_relaxed);
if (ngroup < (size_t)BTHREAD_MAX_CONCURRENCY) {
_tagged_groups[tag][ngroup] = g;
@@ -482,14 +484,11 @@ void TaskControl::signal_task(int num_task, bthread_tag_t
tag) {
num_task = 2;
}
auto& pl = tag_pl(tag);
- int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM;
- num_task -= pl[start_index].signal(1);
- if (num_task > 0) {
- for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
- if (++start_index >= PARKING_LOT_NUM) {
- start_index = 0;
- }
- num_task -= pl[start_index].signal(1);
+ size_t start_index = butil::fmix64(pthread_numeric_id()) %
_pl_num_of_each_tag;
+ for (size_t i = 0; i < _pl_num_of_each_tag && num_task > 0; ++i) {
+ num_task -= pl[start_index].signal(1);
+ if (++start_index >= _pl_num_of_each_tag) {
+ start_index = 0;
}
}
if (num_task > 0 &&
diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h
index 11587b29..2426b00c 100644
--- a/src/bthread/task_control.h
+++ b/src/bthread/task_control.h
@@ -103,8 +103,7 @@ public:
private:
typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
- static const int PARKING_LOT_NUM = 4;
- typedef std::array<ParkingLot, PARKING_LOT_NUM> TaggedParkingLot;
+ typedef std::array<ParkingLot, BTHREAD_MAX_PARKINGLOT> TaggedParkingLot;
// Add/Remove a TaskGroup.
// Returns 0 on success, -1 otherwise.
int _add_group(TaskGroup*, bthread_tag_t tag);
@@ -117,7 +116,7 @@ private:
butil::atomic<size_t>& tag_ngroup(int tag) { return _tagged_ngroup[tag]; }
// Tag parking slot
- TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _pl[tag]; }
+ TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _tagged_pl[tag]; }
static void delete_task_group(void* arg);
@@ -159,7 +158,8 @@ private:
std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;
std::vector<WorkStealingQueue<bthread_t>> _priority_queues;
- std::vector<TaggedParkingLot> _pl;
+ size_t _pl_num_of_each_tag;
+ std::vector<TaggedParkingLot> _tagged_pl;
#ifdef BRPC_BTHREAD_TRACER
TaskTracer _task_tracer;
diff --git a/src/bthread/types.h b/src/bthread/types.h
index c0f23f1c..30368f68 100644
--- a/src/bthread/types.h
+++ b/src/bthread/types.h
@@ -154,6 +154,9 @@ static const bthread_t BTHREAD_ATOMIC_INIT = 0;
// Min/Max number of work pthreads.
static const int BTHREAD_MIN_CONCURRENCY = 3 + BTHREAD_EPOLL_THREAD_NUM;
static const int BTHREAD_MAX_CONCURRENCY = 1024;
+// Min/max number of ParkingLot.
+static const int BTHREAD_MIN_PARKINGLOT = 4;
+static const int BTHREAD_MAX_PARKINGLOT = 1024;
typedef struct {
void* impl;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]