This is an automated email from the ASF dual-hosted git repository.
chenBright pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 07215f22 bthread: support per-tag CPU affinity in --cpu_set (#3331)
07215f22 is described below
commit 07215f22f25fb16f878ff6f9ea9e49f7e4c9238a
Author: Yang,Liming <[email protected]>
AuthorDate: Sun Jun 21 16:07:34 2026 +0800
bthread: support per-tag CPU affinity in --cpu_set (#3331)
Previously --cpu_set accepted a single CPU list (e.g. "0-3,5,7") that
was applied uniformly to all bthread worker threads regardless of their
bthread_tag.
This change extends the flag to accept a per-tag format:
--cpu_set="0:0-3,5,7;1:6-9,4"
where each semicolon-separated segment is "<tag>:<cpu-list>". Tags not
mentioned in the string get no CPU binding. The legacy single-list
format continues to work unchanged and binds all tags to the same set.
Implementation:
- Replace _cpus (vector<unsigned>) with _tag_cpus
(vector<vector<unsigned>>),
sized to FLAGS_task_group_ntags in the constructor.
- Add parse_one_cpuset() (static helper) for the existing range-list
parsing logic; parse_cpuset() now dispatches between legacy and per-tag
formats based on the presence of ':' or ';'.
- worker_thread() looks up _tag_cpus[tag] and round-robins over the
per-tag CPU list using the global _next_worker_id counter.
---
src/bthread/task_control.cpp | 139 ++++++++++++++++++++++++++++++++-----------
src/bthread/task_control.h | 16 +++--
2 files changed, 117 insertions(+), 38 deletions(-)
diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp
index 347dbd24..5c6ca57c 100644
--- a/src/bthread/task_control.cpp
+++ b/src/bthread/task_control.cpp
@@ -48,8 +48,13 @@ DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped
by number ntags");
DEFINE_bool(task_group_set_worker_name, true,
"Whether to set the name of the worker thread");
DEFINE_string(cpu_set, "",
- "Set of CPUs to which cores are bound. "
- "for example, 0-3,5,7; default: disable");
+ "Set of CPUs to which worker threads are bound. "
+ "Two formats are supported:\n"
+ " Legacy (bind all tags to one set): \"0-3,5,7\"\n"
+ " Per-tag: \"0:0-3,5,7;1:6-9,4\" "
+ "where the number before ':' is the bthread_tag and the part "
+ "after ':' is a CPU list in the same format as the legacy value.
"
+ "Tags not mentioned get no CPU binding. Default: disable.");
DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher");
@@ -108,15 +113,17 @@ void* TaskControl::worker_thread(void* arg) {
}
g->_tid = pthread_self();
-
- int worker_id = c->_next_worker_id.fetch_add(
- 1, butil::memory_order_relaxed);
- if (!c->_cpus.empty()) {
- bind_thread_to_cpu(pthread_self(), c->_cpus[worker_id %
c->_cpus.size()]);
+ // tag_wid is a per-tag monotonic counter: same-tag workers get 0,1,2,...
+ // Used both for CPU round-robin affinity and the thread name suffix.
+ int tag_wid = c->_tag_next_worker_id[tag].fetch_add(
+ 1, butil::memory_order_relaxed);
+ if (!c->_tag_cpus[tag].empty()) {
+ const auto& cpus = c->_tag_cpus[tag];
+ bind_thread_to_cpu(pthread_self(), cpus[tag_wid % cpus.size()]);
}
if (FLAGS_task_group_set_worker_name) {
std::string worker_thread_name = butil::string_printf(
- "brpc_wkr:%d-%d", g->tag(), worker_id);
+ "brpc_wkr:%d-%d", g->tag(), tag_wid);
butil::PlatformThread::SetNameSimple(worker_thread_name.c_str());
}
BT_VLOG << "Created worker=" << pthread_self() << " tid=" << g->_tid
@@ -193,7 +200,6 @@ TaskControl::TaskControl()
, _init(false)
, _stop(false)
, _concurrency(0)
- , _next_worker_id(0)
, _nworkers("bthread_worker_count")
, _pending_time(NULL)
// Delay exposure of following two vars because they rely on TC which
@@ -212,6 +218,8 @@ TaskControl::TaskControl()
FLAGS_task_group_ntags * FLAGS_event_dispatcher_num)
, _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
, _tagged_pl(FLAGS_task_group_ntags)
+ , _tag_cpus(FLAGS_task_group_ntags)
+ , _tag_next_worker_id(FLAGS_task_group_ntags)
{}
int TaskControl::init_ed_priority_queues() {
@@ -244,8 +252,8 @@ int TaskControl::init(int concurrency) {
_concurrency = concurrency;
if (!FLAGS_cpu_set.empty()) {
- if (parse_cpuset(FLAGS_cpu_set, _cpus) == -1) {
- LOG(ERROR) << "invalid cpuset=" << FLAGS_cpu_set;
+ if (parse_cpuset(FLAGS_cpu_set) == -1) {
+ LOG(ERROR) << "invalid cpu_set=" << FLAGS_cpu_set;
return -1;
}
}
@@ -279,7 +287,7 @@ int TaskControl::init(int concurrency) {
}
#endif // BRPC_BTHREAD_TRACER
- _workers.resize(_concurrency);
+ _workers.resize(_concurrency);
for (int i = 0; i < _concurrency; ++i) {
auto arg = new WorkerThreadArgs(this, i % FLAGS_task_group_ntags);
const int rc = pthread_create(&_workers[i], NULL, worker_thread, arg);
@@ -350,40 +358,103 @@ TaskGroup* TaskControl::choose_one_group(bthread_tag_t
tag) {
return NULL;
}
-int TaskControl::parse_cpuset(std::string value, std::vector<unsigned>& cpus) {
+// Parse a single cpu-range-list such as "0-3,5,7" into a sorted, deduplicated
+// vector of CPU IDs. Returns 0 on success, -1 on error.
+static int parse_one_cpuset(const std::string& value, std::vector<unsigned>&
cpus) {
static std::regex r("(\\d+-)?(\\d+)(,(\\d+-)?(\\d+))*");
std::smatch match;
std::set<unsigned> cpuset;
if (value.empty()) {
return -1;
}
- if (std::regex_match(value, match, r)) {
- for (butil::StringSplitter split(value.data(), ','); split; ++split) {
- butil::StringPiece cpu_ids(split.field(), split.length());
- cpu_ids.trim_spaces();
- butil::StringPiece begin = cpu_ids;
- butil::StringPiece end = cpu_ids;
- auto dash = cpu_ids.find('-');
- if (dash != cpu_ids.npos) {
- begin = cpu_ids.substr(0, dash);
- end = cpu_ids.substr(dash + 1);
+ if (!std::regex_match(value, match, r)) {
+ return -1;
+ }
+ for (butil::StringSplitter split(value.data(), ','); split; ++split) {
+ butil::StringPiece cpu_ids(split.field(), split.length());
+ cpu_ids.trim_spaces();
+ butil::StringPiece begin = cpu_ids;
+ butil::StringPiece end = cpu_ids;
+ auto dash = cpu_ids.find('-');
+ if (dash != cpu_ids.npos) {
+ begin = cpu_ids.substr(0, dash);
+ end = cpu_ids.substr(dash + 1);
+ }
+ unsigned first = UINT_MAX;
+ unsigned last = 0;
+ int ret = butil::StringSplitter(begin, '\t').to_uint(&first);
+ ret = ret | butil::StringSplitter(end, '\t').to_uint(&last);
+ if (ret != 0 || first > last) {
+ return -1;
+ }
+ for (auto i = first; i <= last; ++i) {
+ cpuset.insert(i);
+ }
+ }
+ cpus.assign(cpuset.begin(), cpuset.end());
+ return 0;
+}
+
+int TaskControl::parse_cpuset(const std::string& value) {
+ if (value.empty()) {
+ return -1;
+ }
+ const int ntags = static_cast<int>(_tag_cpus.size());
+ // Detect per-tag format by the presence of ':' or ';'.
+ // Legacy format ("0-3,5,7") never contains these characters.
+ bool per_tag_format = (value.find(';') != std::string::npos ||
+ value.find(':') != std::string::npos);
+
+ if (per_tag_format) {
+ // Per-tag format: "0:0-3,5,7;1:6-9,4"
+ for (butil::StringSplitter seg_split(value.data(), ';'); seg_split;
++seg_split) {
+ std::string segment(seg_split.field(), seg_split.length());
+ // Trim leading/trailing spaces.
+ auto s = segment.find_first_not_of(' ');
+ auto e = segment.find_last_not_of(' ');
+ if (s == std::string::npos) { continue; } // blank segment
+ segment = segment.substr(s, e - s + 1);
+
+ auto colon = segment.find(':');
+ if (colon == std::string::npos) {
+ LOG(ERROR) << "cpu_set per-tag segment missing ':': " <<
segment;
+ return -1;
+ }
+ std::string tag_str = segment.substr(0, colon);
+ std::string cpus_str = segment.substr(colon + 1);
+
+ unsigned tag_id = 0;
+ butil::StringPiece tag_sp(tag_str);
+ if (butil::StringSplitter(tag_sp, '\t').to_uint(&tag_id) != 0) {
+ LOG(ERROR) << "cpu_set invalid tag '" << tag_str << "'";
+ return -1;
}
- unsigned first = UINT_MAX;
- unsigned last = 0;
- int ret;
- ret = butil::StringSplitter(begin, '\t').to_uint(&first);
- ret = ret | butil::StringSplitter(end, '\t').to_uint(&last);
- if (ret != 0 || first > last) {
+ if ((int)tag_id >= ntags) {
+ LOG(ERROR) << "cpu_set tag " << tag_id
+ << " >= task_group_ntags " << ntags;
return -1;
}
- for (auto i = first; i <= last; ++i) {
- cpuset.insert(i);
+
+ std::vector<unsigned> cpus;
+ if (parse_one_cpuset(cpus_str, cpus) != 0) {
+ LOG(ERROR) << "cpu_set invalid cpuset for tag " << tag_id
+ << ": " << cpus_str;
+ return -1;
}
+ _tag_cpus[tag_id] = std::move(cpus);
+ }
+ } else {
+ // Legacy format: one cpu-set shared by all tags.
+ std::vector<unsigned> cpus;
+ if (parse_one_cpuset(value, cpus) != 0) {
+ LOG(ERROR) << "cpu_set invalid cpuset: " << value;
+ return -1;
+ }
+ for (int i = 0; i < ntags; ++i) {
+ _tag_cpus[i] = cpus;
}
- cpus.assign(cpuset.begin(), cpuset.end());
- return 0;
}
- return -1;
+ return 0;
}
void TaskControl::bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id) {
diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h
index 93336199..1dd3dfc1 100644
--- a/src/bthread/task_control.h
+++ b/src/bthread/task_control.h
@@ -91,7 +91,12 @@ public:
// If this method is called after init(), it never returns NULL.
TaskGroup* choose_one_group(bthread_tag_t tag);
- static int parse_cpuset(std::string value, std::vector<unsigned>& cpus);
+ // Parse FLAGS_cpu_set into _tag_cpus. Two formats are accepted:
+ // Legacy (all tags share one set): "0-3,5,7"
+ // Per-tag: "0:0-3,5,7;1:6-9,4"
+ // Tags not mentioned get an empty cpu list (= no binding).
+ // Returns -1 on parse error.
+ int parse_cpuset(const std::string& value);
static void bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id);
@@ -154,9 +159,6 @@ private:
bool _stop;
butil::atomic<int> _concurrency;
std::vector<pthread_t> _workers;
- std::vector<unsigned> _cpus;
- butil::atomic<int> _next_worker_id;
-
bvar::Adder<int64_t> _nworkers;
butil::Mutex _pending_time_mutex;
butil::atomic<bvar::LatencyRecorder*> _pending_time;
@@ -180,6 +182,12 @@ private:
size_t _pl_num_of_each_tag;
std::vector<TaggedParkingLot> _tagged_pl;
+ // Per-tag CPU binding lists. _tag_cpus[tag] is the round-robin list of
+ // CPU IDs to which workers of that tag are bound. Empty means no binding.
+ std::vector<std::vector<unsigned>> _tag_cpus;
+ // Per-tag monotonic counter for round-robin CPU assignment.
+ // Incremented once per worker created for that tag (in worker_thread).
+ std::vector<butil::atomic<int>> _tag_next_worker_id;
#ifdef BRPC_BTHREAD_TRACER
TaskTracer _task_tracer;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]