yannan-wyn commented on code in PR #3270:
URL: https://github.com/apache/brpc/pull/3270#discussion_r3090588935


##########
src/bthread/task_control.cpp:
##########
@@ -689,4 +749,117 @@ std::vector<bthread_t> TaskControl::get_living_bthreads() 
{
     return living_bthread_ids;
 }
 
+void TaskControl::push_priority_queue(bthread_tag_t tag, bthread_t tid) {
+    if (!_enable_priority_queue || _priority_shards[tag].empty()) {
+        fallback_enqueue(tag, tid);
+        return;
+    }
+    auto& shards = _priority_shards[tag];
+    const size_t nshard = shards.size();
+
+    // thread_local round-robin, zero contention
+    static BAIDU_THREAD_LOCAL size_t tl_rr = 0;
+    size_t start = tl_rr++ % nshard;
+
+    // Prefer shards that have an active owner (not draining)
+    for (size_t i = 0; i < nshard; ++i) {
+        size_t idx = (start + i) % nshard;
+        if (shards[idx]->owner.load(butil::memory_order_relaxed) != NULL &&
+            !shards[idx]->draining.load(butil::memory_order_relaxed)) {
+            shards[idx]->inbound.Enqueue(tid);
+            return;
+        }
+    }
+
+    // All shards ownerless, fallback to round-robin pick
+    shards[start]->inbound.Enqueue(tid);
+}
+
+void TaskControl::bind_priority_owner(TaskGroup* g, bthread_tag_t tag) {
+    auto& shards = _priority_shards[tag];
+    if (shards.empty()) {
+        return;
+    }
+    const size_t nshard = shards.size();
+    size_t start = butil::fast_rand() % nshard;
+    for (size_t i = 0; i < nshard; ++i) {
+        size_t idx = (start + i) % nshard;
+        // Skip shards being drained
+        if (shards[idx]->draining.load(butil::memory_order_acquire)) {
+            continue;
+        }
+        TaskGroup* expected = NULL;
+        if (shards[idx]->owner.compare_exchange_strong(
+                expected, g,
+                butil::memory_order_release,
+                butil::memory_order_relaxed)) {
+            g->_priority_shard_index = static_cast<int>(idx);
+            return;
+        }
+    }
+    // All shards occupied, this group won't own a shard (will only steal)
+}
+
+void TaskControl::unbind_priority_owner(TaskGroup* g, bthread_tag_t tag) {
+    const int idx = g->_priority_shard_index;
+    if (idx < 0) {
+        return;
+    }
+    auto& shards = _priority_shards[tag];
+    if ((size_t)idx >= shards.size()) {
+        return;
+    }
+    PriorityShard* shard = shards[idx].get();
+    if (shard->owner.load(butil::memory_order_relaxed) != g) {
+        g->_priority_shard_index = -1;
+        return;
+    }
+
+    // Mark draining to prevent new owner from binding
+    shard->draining.store(true, butil::memory_order_release);
+    shard->owner.store(NULL, butil::memory_order_release);
+
+    // Drain inbound
+    bthread_t tid;
+    while (shard->inbound.Dequeue(tid)) {
+        fallback_enqueue(tag, tid);
+    }
+    // Drain wsq, steal since we're no longer the owner
+    while (shard->wsq.steal(&tid)) {
+        fallback_enqueue(tag, tid);
+    }
+
+    // Allow new owner to bind
+    shard->draining.store(false, butil::memory_order_release);
+    g->_priority_shard_index = -1;
+}
+
+void TaskControl::flush_priority_inbound(PriorityShard* shard, size_t 
max_batch) {
+    bthread_t tid;
+    for (size_t i = 0; i < max_batch; ++i) {
+        if (!shard->inbound.Dequeue(tid)) {
+            break;
+        }
+        if (!shard->wsq.push(tid)) {
+            // wsq full, push back won't work; fallback this task
+            fallback_enqueue(tls_task_group->tag(), tid);
+            break;
+        }
+    }
+}
+
+void TaskControl::fallback_enqueue(bthread_tag_t tag, bthread_t tid) {
+    // Clear BTHREAD_GLOBAL_PRIORITY flag to prevent re-entering priority queue
+    TaskMeta* m = TaskGroup::address_meta(tid);
+    if (m) {
+        m->attr.flags &= ~BTHREAD_GLOBAL_PRIORITY;
+    }
+    // Enqueue to a random group's remote_rq, thenormal scheduling path
+    TaskGroup* g = choose_one_group(tag);
+    if (g) {
+        g->_remote_rq.push(tid);
+        signal_task(1, tag);

Review Comment:
   ok good catch;
   we use ready_to_run_remote instead of just _remote_rq.push



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to