This is an automated email from the ASF dual-hosted git repository.

chenBright pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 07215f22 bthread: support per-tag CPU affinity in --cpu_set (#3331)
07215f22 is described below

commit 07215f22f25fb16f878ff6f9ea9e49f7e4c9238a
Author: Yang,Liming <[email protected]>
AuthorDate: Sun Jun 21 16:07:34 2026 +0800

    bthread: support per-tag CPU affinity in --cpu_set (#3331)
    
    Previously --cpu_set accepted a single CPU list (e.g. "0-3,5,7") that
    was applied uniformly to all bthread worker threads regardless of their
    bthread_tag.
    
    This change extends the flag to accept a per-tag format:
    
      --cpu_set="0:0-3,5,7;1:6-9,4"
    
    where each semicolon-separated segment is "<tag>:<cpu-list>".  Tags not
    mentioned in the string get no CPU binding.  The legacy single-list
    format continues to work unchanged and binds all tags to the same set.
    
    Implementation:
    - Replace _cpus (vector<unsigned>) with _tag_cpus 
(vector<vector<unsigned>>),
      sized to FLAGS_task_group_ntags in the constructor.
    - Add parse_one_cpuset() (static helper) for the existing range-list
      parsing logic; parse_cpuset() now dispatches between legacy and per-tag
      formats based on the presence of ':' or ';'.
    - worker_thread() looks up _tag_cpus[tag] and round-robins over the
      per-tag CPU list using the global _next_worker_id counter.
---
 src/bthread/task_control.cpp | 139 ++++++++++++++++++++++++++++++++-----------
 src/bthread/task_control.h   |  16 +++--
 2 files changed, 117 insertions(+), 38 deletions(-)

diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp
index 347dbd24..5c6ca57c 100644
--- a/src/bthread/task_control.cpp
+++ b/src/bthread/task_control.cpp
@@ -48,8 +48,13 @@ DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped 
by number ntags");
 DEFINE_bool(task_group_set_worker_name, true,
             "Whether to set the name of the worker thread");
 DEFINE_string(cpu_set, "",
-              "Set of CPUs to which cores are bound. "
-              "for example, 0-3,5,7; default: disable");
+              "Set of CPUs to which worker threads are bound. "
+              "Two formats are supported:\n"
+              "  Legacy (bind all tags to one set): \"0-3,5,7\"\n"
+              "  Per-tag: \"0:0-3,5,7;1:6-9,4\" "
+              "where the number before ':' is the bthread_tag and the part "
+              "after ':' is a CPU list in the same format as the legacy value. 
"
+              "Tags not mentioned get no CPU binding. Default: disable.");
 
 DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher");
 
@@ -108,15 +113,17 @@ void* TaskControl::worker_thread(void* arg) {
     }
 
     g->_tid = pthread_self();
-
-    int worker_id = c->_next_worker_id.fetch_add(
-                        1, butil::memory_order_relaxed);
-    if (!c->_cpus.empty()) {
-        bind_thread_to_cpu(pthread_self(), c->_cpus[worker_id % 
c->_cpus.size()]);
+    // tag_wid is a per-tag monotonic counter: same-tag workers get 0,1,2,...
+    // Used both for CPU round-robin affinity and the thread name suffix.
+    int tag_wid = c->_tag_next_worker_id[tag].fetch_add(
+                      1, butil::memory_order_relaxed);
+    if (!c->_tag_cpus[tag].empty()) {
+        const auto& cpus = c->_tag_cpus[tag];
+        bind_thread_to_cpu(pthread_self(), cpus[tag_wid % cpus.size()]);
     }
     if (FLAGS_task_group_set_worker_name) {
         std::string worker_thread_name = butil::string_printf(
-            "brpc_wkr:%d-%d", g->tag(), worker_id);
+            "brpc_wkr:%d-%d", g->tag(), tag_wid);
         butil::PlatformThread::SetNameSimple(worker_thread_name.c_str());
     }
     BT_VLOG << "Created worker=" << pthread_self() << " tid=" << g->_tid
@@ -193,7 +200,6 @@ TaskControl::TaskControl()
     , _init(false)
     , _stop(false)
     , _concurrency(0)
-    , _next_worker_id(0)
     , _nworkers("bthread_worker_count")
     , _pending_time(NULL)
       // Delay exposure of following two vars because they rely on TC which
@@ -212,6 +218,8 @@ TaskControl::TaskControl()
         FLAGS_task_group_ntags * FLAGS_event_dispatcher_num)
     , _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
     , _tagged_pl(FLAGS_task_group_ntags)
+    , _tag_cpus(FLAGS_task_group_ntags)
+    , _tag_next_worker_id(FLAGS_task_group_ntags)
 {}
 
 int TaskControl::init_ed_priority_queues() {
@@ -244,8 +252,8 @@ int TaskControl::init(int concurrency) {
     _concurrency = concurrency;
 
     if (!FLAGS_cpu_set.empty()) {
-        if (parse_cpuset(FLAGS_cpu_set, _cpus) == -1) {
-            LOG(ERROR) << "invalid cpuset=" << FLAGS_cpu_set;
+        if (parse_cpuset(FLAGS_cpu_set) == -1) {
+            LOG(ERROR) << "invalid cpu_set=" << FLAGS_cpu_set;
             return -1;
         }
     }
@@ -279,7 +287,7 @@ int TaskControl::init(int concurrency) {
     }
 #endif // BRPC_BTHREAD_TRACER
     
-    _workers.resize(_concurrency);   
+    _workers.resize(_concurrency);
     for (int i = 0; i < _concurrency; ++i) {
         auto arg = new WorkerThreadArgs(this, i % FLAGS_task_group_ntags);
         const int rc = pthread_create(&_workers[i], NULL, worker_thread, arg);
@@ -350,40 +358,103 @@ TaskGroup* TaskControl::choose_one_group(bthread_tag_t 
tag) {
     return NULL;
 }
 
-int TaskControl::parse_cpuset(std::string value, std::vector<unsigned>& cpus) {
+// Parse a single cpu-range-list such as "0-3,5,7" into a sorted, deduplicated
+// vector of CPU IDs.  Returns 0 on success, -1 on error.
+static int parse_one_cpuset(const std::string& value, std::vector<unsigned>& 
cpus) {
     static std::regex r("(\\d+-)?(\\d+)(,(\\d+-)?(\\d+))*");
     std::smatch match;
     std::set<unsigned> cpuset;
     if (value.empty()) {
         return -1;
     }
-    if (std::regex_match(value, match, r)) {
-        for (butil::StringSplitter split(value.data(), ','); split; ++split) {
-            butil::StringPiece cpu_ids(split.field(), split.length());
-            cpu_ids.trim_spaces();
-            butil::StringPiece begin = cpu_ids;
-            butil::StringPiece end = cpu_ids;
-            auto dash = cpu_ids.find('-');
-            if (dash != cpu_ids.npos) {
-                begin = cpu_ids.substr(0, dash);
-                end = cpu_ids.substr(dash + 1);
+    if (!std::regex_match(value, match, r)) {
+        return -1;
+    }
+    for (butil::StringSplitter split(value.data(), ','); split; ++split) {
+        butil::StringPiece cpu_ids(split.field(), split.length());
+        cpu_ids.trim_spaces();
+        butil::StringPiece begin = cpu_ids;
+        butil::StringPiece end = cpu_ids;
+        auto dash = cpu_ids.find('-');
+        if (dash != cpu_ids.npos) {
+            begin = cpu_ids.substr(0, dash);
+            end = cpu_ids.substr(dash + 1);
+        }
+        unsigned first = UINT_MAX;
+        unsigned last = 0;
+        int ret = butil::StringSplitter(begin, '\t').to_uint(&first);
+        ret = ret | butil::StringSplitter(end, '\t').to_uint(&last);
+        if (ret != 0 || first > last) {
+            return -1;
+        }
+        for (auto i = first; i <= last; ++i) {
+            cpuset.insert(i);
+        }
+    }
+    cpus.assign(cpuset.begin(), cpuset.end());
+    return 0;
+}
+
+int TaskControl::parse_cpuset(const std::string& value) {
+    if (value.empty()) {
+        return -1;
+    }
+    const int ntags = static_cast<int>(_tag_cpus.size());
+    // Detect per-tag format by the presence of ':' or ';'.
+    // Legacy format ("0-3,5,7") never contains these characters.
+    bool per_tag_format = (value.find(';') != std::string::npos ||
+                           value.find(':') != std::string::npos);
+
+    if (per_tag_format) {
+        // Per-tag format: "0:0-3,5,7;1:6-9,4"
+        for (butil::StringSplitter seg_split(value.data(), ';'); seg_split; 
++seg_split) {
+            std::string segment(seg_split.field(), seg_split.length());
+            // Trim leading/trailing spaces.
+            auto s = segment.find_first_not_of(' ');
+            auto e = segment.find_last_not_of(' ');
+            if (s == std::string::npos) { continue; }  // blank segment
+            segment = segment.substr(s, e - s + 1);
+
+            auto colon = segment.find(':');
+            if (colon == std::string::npos) {
+                LOG(ERROR) << "cpu_set per-tag segment missing ':': " << 
segment;
+                return -1;
+            }
+            std::string tag_str  = segment.substr(0, colon);
+            std::string cpus_str = segment.substr(colon + 1);
+
+            unsigned tag_id = 0;
+            butil::StringPiece tag_sp(tag_str);
+            if (butil::StringSplitter(tag_sp, '\t').to_uint(&tag_id) != 0) {
+                LOG(ERROR) << "cpu_set invalid tag '" << tag_str << "'";
+                return -1;
             }
-            unsigned first = UINT_MAX;
-            unsigned last = 0;
-            int ret;
-            ret = butil::StringSplitter(begin, '\t').to_uint(&first);
-            ret = ret | butil::StringSplitter(end, '\t').to_uint(&last);
-            if (ret != 0 || first > last) {
+            if ((int)tag_id >= ntags) {
+                LOG(ERROR) << "cpu_set tag " << tag_id
+                           << " >= task_group_ntags " << ntags;
                 return -1;
             }
-            for (auto i = first; i <= last; ++i) {
-                cpuset.insert(i);
+
+            std::vector<unsigned> cpus;
+            if (parse_one_cpuset(cpus_str, cpus) != 0) {
+                LOG(ERROR) << "cpu_set invalid cpuset for tag " << tag_id
+                           << ": " << cpus_str;
+                return -1;
             }
+            _tag_cpus[tag_id] = std::move(cpus);
+        }
+    } else {
+        // Legacy format: one cpu-set shared by all tags.
+        std::vector<unsigned> cpus;
+        if (parse_one_cpuset(value, cpus) != 0) {
+            LOG(ERROR) << "cpu_set invalid cpuset: " << value;
+            return -1;
+        }
+        for (int i = 0; i < ntags; ++i) {
+            _tag_cpus[i] = cpus;
         }
-        cpus.assign(cpuset.begin(), cpuset.end());
-        return 0;
     }
-    return -1;
+    return 0;
 }
 
 void TaskControl::bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id) {
diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h
index 93336199..1dd3dfc1 100644
--- a/src/bthread/task_control.h
+++ b/src/bthread/task_control.h
@@ -91,7 +91,12 @@ public:
     // If this method is called after init(), it never returns NULL.
     TaskGroup* choose_one_group(bthread_tag_t tag);
 
-    static int parse_cpuset(std::string value, std::vector<unsigned>& cpus);
+    // Parse FLAGS_cpu_set into _tag_cpus.  Two formats are accepted:
+    //   Legacy (all tags share one set): "0-3,5,7"
+    //   Per-tag:  "0:0-3,5,7;1:6-9,4"
+    // Tags not mentioned get an empty cpu list (= no binding).
+    // Returns -1 on parse error.
+    int parse_cpuset(const std::string& value);
 
     static void bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id);
 
@@ -154,9 +159,6 @@ private:
     bool _stop;
     butil::atomic<int> _concurrency;
     std::vector<pthread_t> _workers;
-    std::vector<unsigned> _cpus;
-    butil::atomic<int> _next_worker_id;
-
     bvar::Adder<int64_t> _nworkers;
     butil::Mutex _pending_time_mutex;
     butil::atomic<bvar::LatencyRecorder*> _pending_time;
@@ -180,6 +182,12 @@ private:
 
     size_t _pl_num_of_each_tag;
     std::vector<TaggedParkingLot> _tagged_pl;
+    // Per-tag CPU binding lists.  _tag_cpus[tag] is the round-robin list of
+    // CPU IDs to which workers of that tag are bound.  Empty means no binding.
+    std::vector<std::vector<unsigned>> _tag_cpus;
+    // Per-tag monotonic counter for round-robin CPU assignment.
+    // Incremented once per worker created for that tag (in worker_thread).
+    std::vector<butil::atomic<int>> _tag_next_worker_id;
 
 #ifdef BRPC_BTHREAD_TRACER
     TaskTracer _task_tracer;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to