This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 05ec537a Bugfix: bthread_worker_usage could exceed
bthread_worker_count (#3009)
05ec537a is described below
commit 05ec537ae67f9a1b2ea28f14e33d18d7155ce152
Author: Bright Chen <[email protected]>
AuthorDate: Wed Jul 23 10:15:36 2025 +0800
Bugfix: bthread_worker_usage could exceed bthread_worker_count (#3009)
* Bugfix: bthread_worker_usage would be greater than bthread_worker_count
* Use CPU atomic 128-bit aligned loads and stores
* Encapsulating AtomicInteger128 class
* Remove unused header files
---
src/brpc/load_balancer.h | 7 --
src/brpc/policy/randomized_load_balancer.cpp | 3 +-
src/brpc/policy/round_robin_load_balancer.cpp | 3 +-
.../policy/weighted_randomized_load_balancer.cpp | 3 +-
src/bthread/prime_offset.h | 39 ++++++
src/bthread/task_control.cpp | 12 +-
src/bthread/task_control.h | 2 +-
src/bthread/task_group.cpp | 113 +++++++++++------
src/bthread/task_group.h | 140 +++++++++++++++++----
9 files changed, 240 insertions(+), 82 deletions(-)
diff --git a/src/brpc/load_balancer.h b/src/brpc/load_balancer.h
index a32b298d..cda0517e 100644
--- a/src/brpc/load_balancer.h
+++ b/src/brpc/load_balancer.h
@@ -184,13 +184,6 @@ inline Extension<const LoadBalancer>*
LoadBalancerExtension() {
return Extension<const LoadBalancer>::instance();
}
-inline uint32_t GenRandomStride() {
- uint32_t prime_offset[] = {
- #include "bthread/offset_inl.list"
- };
- return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))];
-}
-
} // namespace brpc
diff --git a/src/brpc/policy/randomized_load_balancer.cpp
b/src/brpc/policy/randomized_load_balancer.cpp
index 5c4ba447..65cfdee9 100644
--- a/src/brpc/policy/randomized_load_balancer.cpp
+++ b/src/brpc/policy/randomized_load_balancer.cpp
@@ -18,6 +18,7 @@
#include "butil/macros.h"
#include "butil/fast_rand.h"
+#include "bthread/prime_offset.h"
#include "brpc/socket.h"
#include "brpc/policy/randomized_load_balancer.h"
#include "butil/strings/string_number_conversions.h"
@@ -118,7 +119,7 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn&
in, SelectOut* out) {
return 0;
}
if (stride == 0) {
- stride = GenRandomStride();
+ stride = bthread::prime_offset();
}
// If `Address' failed, use `offset+stride' to retry so that
// this failed server won't be visited again inside for
diff --git a/src/brpc/policy/round_robin_load_balancer.cpp
b/src/brpc/policy/round_robin_load_balancer.cpp
index 1d16131a..fa69aa86 100644
--- a/src/brpc/policy/round_robin_load_balancer.cpp
+++ b/src/brpc/policy/round_robin_load_balancer.cpp
@@ -18,6 +18,7 @@
#include "butil/macros.h"
#include "butil/fast_rand.h"
+#include "bthread/prime_offset.h"
#include "brpc/socket.h"
#include "brpc/policy/round_robin_load_balancer.h"
@@ -108,7 +109,7 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn&
in, SelectOut* out) {
}
TLS tls = s.tls();
if (tls.stride == 0) {
- tls.stride = GenRandomStride();
+ tls.stride = bthread::prime_offset();
// use random at first time, for the case of
// use rr lb every time in new thread
tls.offset = butil::fast_rand_less_than(n);
diff --git a/src/brpc/policy/weighted_randomized_load_balancer.cpp
b/src/brpc/policy/weighted_randomized_load_balancer.cpp
index 28cd7e3f..819c550c 100644
--- a/src/brpc/policy/weighted_randomized_load_balancer.cpp
+++ b/src/brpc/policy/weighted_randomized_load_balancer.cpp
@@ -19,6 +19,7 @@
#include <algorithm>
#include "butil/fast_rand.h"
+#include "bthread/prime_offset.h"
#include "brpc/socket.h"
#include "brpc/policy/weighted_randomized_load_balancer.h"
#include "butil/strings/string_number_conversions.h"
@@ -152,7 +153,7 @@ int WeightedRandomizedLoadBalancer::SelectServer(const
SelectIn& in, SelectOut*
if (random_traversed.size() == n) {
// Try to traverse the remaining servers to find an available server.
uint32_t offset = butil::fast_rand_less_than(n);
- uint32_t stride = GenRandomStride();
+ uint32_t stride = bthread::prime_offset();
for (size_t i = 0; i < n; ++i) {
offset = (offset + stride) % n;
SocketId id = s->server_list[offset].id;
diff --git a/src/bthread/prime_offset.h b/src/bthread/prime_offset.h
new file mode 100644
index 00000000..7137a68c
--- /dev/null
+++ b/src/bthread/prime_offset.h
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BTHREAD_PRIME_OFFSET_H
+#define BTHREAD_PRIME_OFFSET_H
+
+#include "butil/fast_rand.h"
+#include "butil/macros.h"
+
+namespace bthread {
+// Prime number offset for hash function.
+inline size_t prime_offset(size_t seed) {
+ uint32_t offsets[] = {
+ #include "bthread/offset_inl.list"
+ };
+ return offsets[seed % ARRAY_SIZE(offsets)];
+}
+
+inline size_t prime_offset() {
+ return prime_offset(butil::fast_rand());
+}
+}
+
+
+#endif // BTHREAD_PRIME_OFFSET_H
\ No newline at end of file
diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp
index 66307d32..94be504f 100644
--- a/src/bthread/task_control.cpp
+++ b/src/bthread/task_control.cpp
@@ -152,7 +152,7 @@ static double
get_cumulated_worker_time_from_this_with_tag(void* arg) {
auto a = static_cast<CumulatedWithTagArgs*>(arg);
auto c = a->c;
auto t = a->t;
- return c->get_cumulated_worker_time_with_tag(t);
+ return c->get_cumulated_worker_time(t);
}
static int64_t get_cumulated_switch_count_from_this(void *arg) {
@@ -526,22 +526,18 @@ double TaskControl::get_cumulated_worker_time() {
int64_t cputime_ns = 0;
BAIDU_SCOPED_LOCK(_modify_group_mutex);
for_each_task_group([&](TaskGroup* g) {
- if (g) {
- cputime_ns += g->_cumulated_cputime_ns;
- }
+ cputime_ns += g->cumulated_cputime_ns();
});
return cputime_ns / 1000000000.0;
}
-double TaskControl::get_cumulated_worker_time_with_tag(bthread_tag_t tag) {
+double TaskControl::get_cumulated_worker_time(bthread_tag_t tag) {
int64_t cputime_ns = 0;
BAIDU_SCOPED_LOCK(_modify_group_mutex);
const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_relaxed);
auto& groups = tag_group(tag);
for (size_t i = 0; i < ngroup; ++i) {
- if (groups[i]) {
- cputime_ns += groups[i]->_cumulated_cputime_ns;
- }
+ cputime_ns += groups[i]->cumulated_cputime_ns();
}
return cputime_ns / 1000000000.0;
}
diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h
index 2a2b76d6..11587b29 100644
--- a/src/bthread/task_control.h
+++ b/src/bthread/task_control.h
@@ -79,7 +79,7 @@ public:
void print_rq_sizes(std::ostream& os);
double get_cumulated_worker_time();
- double get_cumulated_worker_time_with_tag(bthread_tag_t tag);
+ double get_cumulated_worker_time(bthread_tag_t tag);
int64_t get_cumulated_switch_count();
int64_t get_cumulated_signal_count();
diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp
index 0d3e473e..d4cb81f6 100644
--- a/src/bthread/task_group.cpp
+++ b/src/bthread/task_group.cpp
@@ -37,6 +37,14 @@
#include "bthread/task_group.h"
#include "bthread/timer_thread.h"
+#ifdef __x86_64__
+#include <x86intrin.h>
+#endif // __x86_64__
+
+#ifdef __ARM_NEON
+#include <arm_neon.h>
+#endif // __ARM_NEON
+
namespace bthread {
static const bthread_attr_t BTHREAD_ATTR_TASKGROUP = {
@@ -69,10 +77,6 @@ BAIDU_VOLATILE_THREAD_LOCAL(void*, tls_unique_user_ptr,
NULL);
const TaskStatistics EMPTY_STAT = { 0, 0, 0 };
-const size_t OFFSET_TABLE[] = {
-#include "bthread/offset_inl.list"
-};
-
void* (*g_create_span_func)() = NULL;
void* run_create_span_func() {
@@ -82,6 +86,39 @@ void* run_create_span_func() {
return BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls).rpcz_parent_span;
}
+AtomicInteger128::Value AtomicInteger128::load() const {
+#if __x86_64__ || __ARM_NEON
+ // Supress compiler warning.
+ (void)_mutex;
+#endif // __x86_64__ || __ARM_NEON
+
+#if __x86_64__ || __ARM_NEON
+#ifdef __x86_64__
+ __m128i value = _mm_load_si128(reinterpret_cast<const __m128i*>(&_value));
+#else // __ARM_NEON
+ int64x2_t value = vld1q_s64(reinterpret_cast<const int64_t*>(&_value));
+#endif // __x86_64__
+ return {value[0], value[1]};
+#else // __x86_64__ || __ARM_NEON
+ BAIDU_SCOPED_LOCK(_mutex);
+ return _value;
+#endif // __x86_64__ || __ARM_NEON
+}
+
+void AtomicInteger128::store(Value value) {
+#if __x86_64__
+ __m128i v = _mm_load_si128(reinterpret_cast<__m128i*>(&value));
+ _mm_store_si128(reinterpret_cast<__m128i*>(&_value), v);
+#elif __ARM_NEON
+ int64x2_t v = vld1q_s64(reinterpret_cast<int64_t*>(&value));
+ vst1q_s64(reinterpret_cast<int64_t*>(&_value), v);
+#else
+ BAIDU_SCOPED_LOCK(_mutex);
+ _value = value;
+#endif // __x86_64__ || __ARM_NEON
+}
+
+
int TaskGroup::get_attr(bthread_t tid, bthread_attr_t* out) {
TaskMeta* const m = address_meta(tid);
if (m != NULL) {
@@ -148,6 +185,16 @@ static double get_cumulated_cputime_from_this(void* arg) {
return static_cast<TaskGroup*>(arg)->cumulated_cputime_ns() / 1000000000.0;
}
+int64_t TaskGroup::cumulated_cputime_ns() const {
+ CPUTimeStat cpu_time_stat = _cpu_time_stat.load();
+ // Add the elapsed time of running bthread.
+ int64_t cumulated_cputime_ns = cpu_time_stat.cumulated_cputime_ns();
+ if (!cpu_time_stat.is_main_task()) {
+ cumulated_cputime_ns += butil::cpuwide_time_ns() -
cpu_time_stat.last_run_ns();
+ }
+ return cumulated_cputime_ns;
+}
+
void TaskGroup::run_main_task() {
bvar::PassiveStatus<double> cumulated_cputime(
get_cumulated_cputime_from_this, this);
@@ -156,11 +203,11 @@ void TaskGroup::run_main_task() {
TaskGroup* dummy = this;
bthread_t tid;
while (wait_task(&tid)) {
- TaskGroup::sched_to(&dummy, tid);
+ sched_to(&dummy, tid);
DCHECK_EQ(this, dummy);
DCHECK_EQ(_cur_meta->stack, _main_stack);
if (_cur_meta->tid != _main_tid) {
- TaskGroup::task_runner(1/*skip remained*/);
+ task_runner(1/*skip remained*/);
}
if (FLAGS_show_per_worker_usage_in_vars && !usage_bvar) {
char name[32];
@@ -176,31 +223,12 @@ void TaskGroup::run_main_task() {
}
}
// Don't forget to add elapse of last wait_task.
- current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns;
+ current_task()->stat.cputime_ns +=
+ butil::cpuwide_time_ns() - _cpu_time_stat.load_unsafe().last_run_ns();
}
TaskGroup::TaskGroup(TaskControl* c)
- : _cur_meta(NULL)
- , _control(c)
- , _num_nosignal(0)
- , _nsignaled(0)
- , _last_run_ns(butil::cpuwide_time_ns())
- , _cumulated_cputime_ns(0)
- , _nswitch(0)
- , _last_context_remained(NULL)
- , _last_context_remained_arg(NULL)
- , _pl(NULL)
- , _main_stack(NULL)
- , _main_tid(0)
- , _remote_num_nosignal(0)
- , _remote_nsignaled(0)
-#ifndef NDEBUG
- , _sched_recursive_guard(0)
-#endif
- , _tag(BTHREAD_TAG_DEFAULT)
- , _tid(-1) {
- _steal_seed = butil::fast_rand();
- _steal_offset = OFFSET_TABLE[_steal_seed % ARRAY_SIZE(OFFSET_TABLE)];
+ : _control(c) {
CHECK(c);
}
@@ -292,8 +320,12 @@ int TaskGroup::init(size_t runqueue_capacity) {
_cur_meta = m;
_main_tid = m->tid;
_main_stack = stk;
- _last_run_ns = butil::cpuwide_time_ns();
+
+ CPUTimeStat cpu_time_stat;
+ cpu_time_stat.set_last_run_ns(m->cpuwide_start_ns, true);
+ _cpu_time_stat.store(cpu_time_stat);
_last_cpu_clock_ns = 0;
+
return 0;
}
@@ -414,7 +446,7 @@ void TaskGroup::task_runner(intptr_t skip_remained) {
g->_control->_nbthreads << -1;
g->_control->tag_nbthreads(g->tag()) << -1;
- g->set_remained(TaskGroup::_release_last_context, m);
+ g->set_remained(_release_last_context, m);
ending_sched(&g);
} while (g->_cur_meta->tid != g->_main_tid);
@@ -491,9 +523,7 @@ int TaskGroup::start_foreground(TaskGroup** pg,
fn = ready_to_run_in_worker;
}
ReadyToRunArgs args = {
- g->tag(),
- g->_cur_meta,
- (bool)(using_attr.flags & BTHREAD_NOSIGNAL)
+ g->tag(), g->_cur_meta, (bool)(using_attr.flags & BTHREAD_NOSIGNAL)
};
g->set_remained(fn, &args);
sched_to(pg, m->tid);
@@ -678,14 +708,18 @@ void TaskGroup::sched_to(TaskGroup** pg, TaskMeta*
next_meta, bool cur_ending) {
}
#endif
// Save errno so that errno is bthread-specific.
- const int saved_errno = errno;
+ int saved_errno = errno;
void* saved_unique_user_ptr = tls_unique_user_ptr;
TaskMeta* const cur_meta = g->_cur_meta;
- const int64_t now = butil::cpuwide_time_ns();
- const int64_t elp_ns = now - g->_last_run_ns;
- g->_last_run_ns = now;
+ int64_t now = butil::cpuwide_time_ns();
+ CPUTimeStat cpu_time_stat = g->_cpu_time_stat.load_unsafe();
+ int64_t elp_ns = now - cpu_time_stat.last_run_ns();
cur_meta->stat.cputime_ns += elp_ns;
+ // Update cpu_time_stat.
+ cpu_time_stat.set_last_run_ns(now, is_main_task(g, next_meta->tid));
+ cpu_time_stat.add_cumulated_cputime_ns(elp_ns, is_main_task(g,
cur_meta->tid));
+ g->_cpu_time_stat.store(cpu_time_stat);
if (FLAGS_bthread_enable_cpu_clock_stat) {
const int64_t cpu_thread_time = butil::cputhread_time_ns();
@@ -696,10 +730,7 @@ void TaskGroup::sched_to(TaskGroup** pg, TaskMeta*
next_meta, bool cur_ending) {
} else {
g->_last_cpu_clock_ns = 0;
}
-
- if (cur_meta->tid != g->main_tid()) {
- g->_cumulated_cputime_ns += elp_ns;
- }
+
++cur_meta->stat.nswitch;
++ g->_nswitch;
// Switch to the task
diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h
index ccba9ba3..b79e69d8 100644
--- a/src/bthread/task_group.h
+++ b/src/bthread/task_group.h
@@ -29,6 +29,7 @@
#include "bthread/remote_task_queue.h" // RemoteTaskQueue
#include "butil/resource_pool.h" // ResourceId
#include "bthread/parking_lot.h"
+#include "bthread/prime_offset.h"
namespace bthread {
@@ -47,6 +48,35 @@ private:
void* _value;
};
+// Refer to https://rigtorp.se/isatomic/, On the modern CPU microarchitectures
+// (Skylake and Zen 2) AVX/AVX2 128b/256b aligned loads and stores are atomic
+// even though Intel and AMD officially doesn’t guarantee this.
+// On X86, SSE instructions can ensure atomic loads and stores.
+// Starting from Armv8.4-A, neon can ensure atomic loads and stores.
+// Otherwise, use mutex to guarantee atomicity.
+class AtomicInteger128 {
+public:
+ struct Value {
+ int64_t v1;
+ int64_t v2;
+ };
+
+ AtomicInteger128() = default;
+ explicit AtomicInteger128(Value value) : _value(value) {}
+
+ Value load() const;
+ Value load_unsafe() const {
+ return _value;
+ }
+
+ void store(Value value);
+
+private:
+ Value BAIDU_CACHELINE_ALIGNMENT _value{};
+ // Used to protect `_cpu_time_stat' when __x86_64__ and __ARM_NEON is not
defined.
+ FastPthreadMutex _mutex;
+};
+
// Thread-local group of tasks.
// Notice that most methods involving context switching are static otherwise
// pointer `this' may change after wakeup. The **pg parameters in following
@@ -148,7 +178,7 @@ public:
{ return _cur_meta->stack == _main_stack; }
// Active time in nanoseconds spent by this TaskGroup.
- int64_t cumulated_cputime_ns() const { return _cumulated_cputime_ns; }
+ int64_t cumulated_cputime_ns() const;
// Push a bthread into the runqueue
void ready_to_run(TaskMeta* meta, bool nosignal = false);
@@ -203,6 +233,70 @@ public:
private:
friend class TaskControl;
+ // Last scheduling time, task type and cumulated CPU time.
+ class CPUTimeStat {
+ static constexpr int64_t LAST_SCHEDULING_TIME_MASK =
0x7FFFFFFFFFFFFFFFLL;
+ static constexpr int64_t TASK_TYPE_MASK = 0x8000000000000000LL;
+ public:
+ CPUTimeStat() : _last_run_ns_and_type(0), _cumulated_cputime_ns(0) {}
+ CPUTimeStat(AtomicInteger128::Value value)
+ : _last_run_ns_and_type(value.v1), _cumulated_cputime_ns(value.v2)
{}
+
+ // Convert to AtomicInteger128::Value for atomic operations.
+ explicit operator AtomicInteger128::Value() const {
+ return {_last_run_ns_and_type, _cumulated_cputime_ns};
+ }
+
+ void set_last_run_ns(int64_t last_run_ns, bool main_task) {
+ _last_run_ns_and_type = (last_run_ns & LAST_SCHEDULING_TIME_MASK) |
+ (static_cast<int64_t>(main_task) << 63);
+ }
+ int64_t last_run_ns() const {
+ return _last_run_ns_and_type & LAST_SCHEDULING_TIME_MASK;
+ }
+ int64_t last_run_ns_and_type() const {
+ return _last_run_ns_and_type;
+ }
+
+ bool is_main_task() const {
+ return _last_run_ns_and_type & TASK_TYPE_MASK;
+ }
+
+ void add_cumulated_cputime_ns(int64_t cputime_ns, bool main_task) {
+ if (main_task) {
+ return;
+ }
+ _cumulated_cputime_ns += cputime_ns;
+ }
+ int64_t cumulated_cputime_ns() const {
+ return _cumulated_cputime_ns;
+ }
+
+ private:
+ // The higher bit for task type, main task is 1, otherwise 0.
+ // Lowest 63 bits for last scheduling time.
+ int64_t _last_run_ns_and_type;
+ // Cumulated CPU time in nanoseconds.
+ int64_t _cumulated_cputime_ns;
+ };
+
+ class AtomicCPUTimeStat {
+ public:
+ CPUTimeStat load() const {
+ return _cpu_time_stat.load();
+ }
+ CPUTimeStat load_unsafe() const {
+ return _cpu_time_stat.load_unsafe();
+ }
+
+ void store(CPUTimeStat cpu_time_stat) {
+ _cpu_time_stat.store(AtomicInteger128::Value(cpu_time_stat));
+ }
+
+ private:
+ AtomicInteger128 _cpu_time_stat;
+ };
+
// You shall use TaskControl::create_group to create new instance.
explicit TaskGroup(TaskControl* c);
@@ -248,41 +342,43 @@ friend class TaskControl;
void set_pl(ParkingLot* pl) { _pl = pl; }
- TaskMeta* _cur_meta;
+ static bool is_main_task(TaskGroup* g, bthread_t tid) {
+ return g->_main_tid == tid;
+ }
+
+ TaskMeta* _cur_meta{NULL};
// the control that this group belongs to
- TaskControl* _control;
- int _num_nosignal;
- int _nsignaled;
- // last scheduling time
- int64_t _last_run_ns;
- int64_t _cumulated_cputime_ns;
+ TaskControl* _control{NULL};
+ int _num_nosignal{0};
+ int _nsignaled{0};
+ AtomicCPUTimeStat _cpu_time_stat;
// last thread cpu clock
- int64_t _last_cpu_clock_ns;
+ int64_t _last_cpu_clock_ns{0};
- size_t _nswitch;
- RemainedFn _last_context_remained;
- void* _last_context_remained_arg;
+ size_t _nswitch{0};
+ RemainedFn _last_context_remained{NULL};
+ void* _last_context_remained_arg{NULL};
- ParkingLot* _pl;
+ ParkingLot* _pl{NULL};
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
ParkingLot::State _last_pl_state;
#endif
- size_t _steal_seed;
- size_t _steal_offset;
- ContextualStack* _main_stack;
- bthread_t _main_tid;
+ size_t _steal_seed{butil::fast_rand()};
+ size_t _steal_offset{prime_offset(_steal_seed)};
+ ContextualStack* _main_stack{NULL};
+ bthread_t _main_tid{INVALID_BTHREAD};
WorkStealingQueue<bthread_t> _rq;
RemoteTaskQueue _remote_rq;
- int _remote_num_nosignal;
- int _remote_nsignaled;
+ int _remote_num_nosignal{0};
+ int _remote_nsignaled{0};
- int _sched_recursive_guard;
+ int _sched_recursive_guard{0};
// tag of this taskgroup
- bthread_tag_t _tag;
+ bthread_tag_t _tag{BTHREAD_TAG_DEFAULT};
// Worker thread id.
- pid_t _tid;
+ pid_t _tid{-1};
};
} // namespace bthread
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]