MalikHou commented on code in PR #3232:
URL: https://github.com/apache/brpc/pull/3232#discussion_r2869059669


##########
src/bthread/butex.cpp:
##########
@@ -295,13 +324,95 @@ inline TaskGroup* get_task_group(TaskControl* c, 
bthread_tag_t tag) {
 }
 
 inline void run_in_local_task_group(TaskGroup* g, TaskMeta* next_meta, bool 
nosignal) {
+    // Pinned tasks must go through pin-aware routing even on same-tag local 
fast
+    // paths, otherwise TaskGroup::exchange() may resume them on the wrong 
worker.
+    if (next_meta->local_pin_enabled && next_meta->local_pin_depth > 0) {
+        g->ready_to_run(next_meta, nosignal);
+        return;
+    }
     if (!nosignal) {
         TaskGroup::exchange(&g, next_meta);
     } else {
         g->ready_to_run(next_meta, nosignal);
     }
 }
 
+inline bool is_pinned_waiter(const ButexWaiter* bw) {
+    if (bw == NULL || bw->tid == 0) {
+        return false;
+    }
+    const ButexBthreadWaiter* bbw = static_cast<const ButexBthreadWaiter*>(bw);
+    const TaskMeta* meta = bbw->task_meta;
+    return meta != NULL && meta->local_pin_enabled && meta->local_pin_depth > 
0;
+}
+
+template <typename ShouldCheck>
+inline bool reject_if_selected_contains_pinned(ButexWaiterList* waiters,
+                                                const ShouldCheck& 
should_check) {
+    size_t index = 0;
+    for (butil::LinkNode<ButexWaiter>* p = waiters->head();
+         p != waiters->end(); p = p->next(), ++index) {
+        ButexWaiter* bw = p->value();
+        if (should_check(bw, index) && is_pinned_waiter(bw)) {
+            butex_strict_reject_count() << 1;
+            errno = EINVAL;
+            return true;
+        }
+    }
+    return false;
+}
+
+int butex_wake_to_task_group(void* arg, TaskGroup* target_group) {
+    if (arg == NULL || target_group == NULL) {
+        butex_within_invalid_count() << 1;
+        errno = EINVAL;
+        return -1;
+    }
+    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, 
value);
+    ButexBthreadWaiter* bbw = NULL;
+    {
+        BAIDU_SCOPED_LOCK(b->waiter_lock);
+        if (b->waiters.empty()) {
+            butex_within_no_waiter_count() << 1;
+            return 0;
+        }
+        butil::LinkNode<ButexWaiter>* head = b->waiters.head();
+        if (head->next() != b->waiters.end()) {
+            butex_within_invalid_count() << 1;
+            errno = EINVAL;
+            return -1;
+        }
+        ButexWaiter* bw = head->value();
+        if (bw->tid == 0) {
+            butex_within_invalid_count() << 1;
+            errno = EINVAL;
+            return -1;
+        }
+        bbw = static_cast<ButexBthreadWaiter*>(bw);
+        if (bbw->home_group != target_group ||
+            bbw->control != target_group->control() ||
+            bbw->tag != target_group->tag()) {
+            butex_within_invalid_count() << 1;
+            errno = EINVAL;
+            return -1;
+        }
+        // wake_within() runs on target_group's owner worker. For pinned 
waiters,
+        // if owner-local pinned runqueue is already full, report EAGAIN and
+        // keep waiter on butex list for next harvest retry, instead of
+        // blocking/spinning here.
+        if (is_pinned_waiter(bw) && target_group->pinned_rq_full()) {
+            errno = EAGAIN;
+            return -1;
+        }
+        bw->RemoveFromList();
+        bw->container.store(NULL, butil::memory_order_relaxed);
+    }
+
+    unsleep_if_necessary(bbw, get_global_timer_thread());
+    target_group->ready_to_run(bbw->task_meta, true);
+    return 1;

Review Comment:
   in this code path, `pinned_rq_full()` and the following enqueue are executed 
by the same owner worker, so the TOCTOU case you mentioned does not occur here.
   
   `butex_wake_within_active_task()` is only allowed from the active-task hook 
of the current `TaskGroup` (validated in `validate_active_task_hook_ctx`). For 
pinned waiters, `ready_to_run()` routes to `ready_to_run_pinned_local()` on the 
same home worker. Also, `_pinned_rq` is owner-local (not used by steal path, 
and remote routing for pinned tasks goes to `_pinned_remote_rq`), so there is 
no concurrent producer that can make `_pinned_rq` become full between the check 
and enqueue.
   
   so this check is intended as a fast fail (`EAGAIN`) when already full, not 
as a cross-thread non-blocking CAS gate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to