yannan-wyn commented on code in PR #3270:
URL: https://github.com/apache/brpc/pull/3270#discussion_r3090586891


##########
src/bthread/task_control.cpp:
##########
@@ -528,8 +549,47 @@ int TaskControl::_destroy_group(TaskGroup* g) {
 bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
     auto tag = tls_task_group->tag();
 
-    if (_priority_queues[tag].steal(tid)) {
-        return true;
+    // priority queue: owner-first, then steal from other shards
+    if (_enable_priority_queue && !_priority_shards[tag].empty()) {
+        auto& shards = _priority_shards[tag];
+        const size_t nshard = shards.size();
+
+        // Owner-first: if current TaskGroup owns a shard, flush and pop
+        const int my_shard = tls_task_group->_priority_shard_index;
+        if (my_shard >= 0 && (size_t)my_shard < nshard) {
+            PriorityShard* shard = shards[my_shard].get();
+            if (shard->owner.load(butil::memory_order_relaxed) == 
tls_task_group) {
+                static const size_t kFlushBatch = 8;
+                flush_priority_inbound(shard, kFlushBatch);
+                if (shard->wsq.pop(tid)) {
+                    return true;
+                }
+            }
+        }
+
+        // Steal from all shards (random start to avoid hot spot)
+        size_t start = butil::fast_rand() % nshard;
+        for (size_t i = 0; i < nshard; ++i) {
+            size_t idx = (start + i) % nshard;
+            if (shards[idx]->wsq.steal(tid)) {
+                return true;
+            }
+        }
+
+        // Salvage: drain ownerless shards' inbound to prevent task starvation.
+        // This handles the TOCTOU race where a producer enqueues after unbind
+        // finishes draining but before a new owner binds.
+        for (size_t i = 0; i < nshard; ++i) {
+            size_t idx = (start + i) % nshard;
+            PriorityShard* shard = shards[idx].get();
+            if (shard->owner.load(butil::memory_order_relaxed) == NULL &&
+                !shard->draining.load(butil::memory_order_relaxed)) {
+                bthread_t salvaged;
+                if (shard->inbound.Dequeue(salvaged)) {
+                    fallback_enqueue(tag, salvaged);

Review Comment:
   enhance the MPSC with CAS
   and by the way MPMC maybe too heavy



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to