This is an automated email from the ASF dual-hosted git repository.
chenBright pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 4dc6ac8b Bugfix: bthread_interrupt was scheduled to the wrong tag
group (#3351)
4dc6ac8b is described below
commit 4dc6ac8be12eb65b2ff960d148f9fe405d0fcb1b
Author: Bright Chen <[email protected]>
AuthorDate: Mon Jun 22 09:56:10 2026 +0800
Bugfix: bthread_interrupt was scheduled to the wrong tag group (#3351)
---
src/bthread/bthread.cpp | 28 +++++++++++++--------------
src/bthread/butex.cpp | 18 +++++++++---------
src/bthread/task_control.cpp | 2 +-
src/bthread/task_group.cpp | 45 ++++++++++++++++++++++++--------------------
src/bthread/task_group.h | 2 +-
5 files changed, 50 insertions(+), 45 deletions(-)
diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp
index 9b0f4599..5bcda94d 100644
--- a/src/bthread/bthread.cpp
+++ b/src/bthread/bthread.cpp
@@ -274,7 +274,7 @@ start_from_non_worker(bthread_t* __restrict tid,
if (NULL == c) {
return ENOMEM;
}
- auto tag = BTHREAD_TAG_DEFAULT;
+ bthread_tag_t tag = BTHREAD_TAG_DEFAULT;
if (attr != NULL && attr->tag != BTHREAD_TAG_INVALID) {
tag = attr->tag;
}
@@ -283,7 +283,7 @@ start_from_non_worker(bthread_t* __restrict tid,
// 1. NOSIGNAL is often for creating many bthreads in batch,
// inserting into the same TaskGroup maximizes the batch.
// 2. bthread_flush() needs to know which TaskGroup to flush.
- auto g = tls_task_group_nosignal;
+ TaskGroup* g = tls_task_group_nosignal;
if (NULL == g) {
g = c->choose_one_group(tag);
tls_task_group_nosignal = g;
@@ -298,7 +298,7 @@ start_from_non_worker(bthread_t* __restrict tid,
// tag equal to thread local
// tag equal to BTHREAD_TAG_INVALID
BUTIL_FORCE_INLINE bool can_run_thread_local(const bthread_attr_t* __restrict
attr) {
- return attr == nullptr || attr->tag == bthread::tls_task_group->tag() ||
+ return attr == nullptr || attr->tag == tls_task_group->tag() ||
attr->tag == BTHREAD_TAG_INVALID;
}
@@ -331,7 +331,7 @@ int bthread_start_urgent(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
- bthread::TaskGroup* g = bthread::tls_task_group;
+ bthread::TaskGroup* g =
bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g) {
// if attribute is null use thread local task group
if (bthread::can_run_thread_local(attr)) {
@@ -345,7 +345,7 @@ int bthread_start_background(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
- bthread::TaskGroup* g = bthread::tls_task_group;
+ bthread::TaskGroup* g =
bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g) {
// if attribute is null use thread local task group
if (bthread::can_run_thread_local(attr)) {
@@ -356,7 +356,7 @@ int bthread_start_background(bthread_t* __restrict tid,
}
void bthread_flush() {
- bthread::TaskGroup* g = bthread::tls_task_group;
+ bthread::TaskGroup* g =
bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g) {
return g->flush_nosignal_tasks();
}
@@ -368,8 +368,8 @@ void bthread_flush() {
}
}
-int bthread_interrupt(bthread_t tid, bthread_tag_t tag) {
- return bthread::TaskGroup::interrupt(tid, bthread::get_task_control(),
tag);
+int bthread_interrupt(bthread_t tid, bthread_tag_t /*tag*/) {
+ return bthread::TaskGroup::interrupt(tid, bthread::get_task_control());
}
int bthread_stop(bthread_t tid) {
@@ -382,7 +382,7 @@ int bthread_stopped(bthread_t tid) {
}
bthread_t bthread_self(void) {
- bthread::TaskGroup* g = bthread::tls_task_group;
+ bthread::TaskGroup* g =
bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
// note: return 0 for main tasks now, which include main thread and
// all work threads. So that we can identify main tasks from logs
// more easily. This is probably questionable in the future.
@@ -397,7 +397,7 @@ int bthread_equal(bthread_t t1, bthread_t t2) {
}
void bthread_exit(void* retval) {
- bthread::TaskGroup* g = bthread::tls_task_group;
+ bthread::TaskGroup* g =
bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g != NULL && !g->is_current_main_task()) {
throw bthread::ExitException(retval);
} else {
@@ -511,7 +511,7 @@ int bthread_setconcurrency_by_tag(int num, bthread_tag_t
tag) {
}
int bthread_about_to_quit() {
- bthread::TaskGroup* g = bthread::tls_task_group;
+ bthread::TaskGroup* g =
bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g != NULL) {
bthread::TaskMeta* current_task = g->current_task();
if(!(current_task->attr.flags & BTHREAD_NEVER_QUIT)) {
@@ -640,12 +640,12 @@ int bthread_list_join(bthread_list_t* list) {
}
bthread_tag_t bthread_self_tag(void) {
- return bthread::tls_task_group != nullptr ? bthread::tls_task_group->tag()
- : BTHREAD_TAG_DEFAULT;
+ bthread::TaskGroup* g =
bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
+ return g != NULL ? g->tag() : BTHREAD_TAG_DEFAULT;
}
uint64_t bthread_cpu_clock_ns(void) {
- bthread::TaskGroup* g = bthread::tls_task_group;
+ bthread::TaskGroup* g =
bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g != NULL && !g->is_current_main_task()) {
return g->current_task_cpu_clock_ns();
}
diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp
index aca12816..92de900c 100644
--- a/src/bthread/butex.cpp
+++ b/src/bthread/butex.cpp
@@ -97,7 +97,6 @@ struct ButexBthreadWaiter : public ButexWaiter {
Butex* initial_butex;
TaskControl* control;
const timespec* abstime;
- bthread_tag_t tag;
};
// pthread_task or main_task allocates this structure on stack and queue it
@@ -320,7 +319,7 @@ int butex_wake(void* arg, bool nosignal) {
}
ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
unsleep_if_necessary(bbw, get_global_timer_thread());
- TaskGroup* g = get_task_group(bbw->control, bbw->tag);
+ TaskGroup* g = get_task_group(bbw->control, bbw->task_meta->attr.tag);
if (g == tls_task_group) {
run_in_local_task_group(g, bbw->task_meta, nosignal);
} else {
@@ -373,7 +372,7 @@ int butex_wake_n(void* arg, size_t n, bool nosignal) {
bthread_waiters.tail()->value());
w->RemoveFromList();
unsleep_if_necessary(w, get_global_timer_thread());
- auto g = get_task_group(w->control, w->tag);
+ auto g = get_task_group(w->control, w->task_meta->attr.tag);
g->ready_to_run_general(w->task_meta, true);
nwakeups[g->tag()] = g;
++nwakeup;
@@ -384,7 +383,7 @@ int butex_wake_n(void* arg, size_t n, bool nosignal) {
g->flush_nosignal_tasks_general();
}
}
- auto g = get_task_group(next->control, next->tag);
+ auto g = get_task_group(next->control, next->task_meta->attr.tag);
if (g == tls_task_group) {
run_in_local_task_group(g, next->task_meta, nosignal);
} else {
@@ -446,7 +445,7 @@ int butex_wake_except(void* arg, bthread_t
excluded_bthread) {
ButexBthreadWaiter* w =
static_cast<ButexBthreadWaiter*>(bthread_waiters.tail()->value());
w->RemoveFromList();
unsleep_if_necessary(w, get_global_timer_thread());
- auto g = get_task_group(w->control, w->tag);
+ auto g = get_task_group(w->control, w->task_meta->attr.tag);
g->ready_to_run_general(w->task_meta, true);
nwakeups[g->tag()] = g;
++nwakeup;
@@ -489,11 +488,12 @@ int butex_requeue(void* arg, void* arg2) {
}
ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
unsleep_if_necessary(bbw, get_global_timer_thread());
- auto g = is_same_tag(bbw->tag) ? tls_task_group : NULL;
+ auto g = is_same_tag(bbw->task_meta->attr.tag) ? tls_task_group : NULL;
if (g) {
TaskGroup::exchange(&g, bbw->task_meta);
} else {
-
bbw->control->choose_one_group(bbw->tag)->ready_to_run_remote(bbw->task_meta);
+ g = bbw->control->choose_one_group(bbw->task_meta->attr.tag);
+ g->ready_to_run_remote(bbw->task_meta);
}
return 1;
}
@@ -531,7 +531,8 @@ inline bool erase_from_butex(ButexWaiter* bw, bool wakeup,
WaiterState state) {
if (erased && wakeup) {
if (bw->tid) {
ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(bw);
- get_task_group(bbw->control,
bbw->tag)->ready_to_run_general(bbw->task_meta);
+ auto g = get_task_group(bbw->control, bbw->task_meta->attr.tag);
+ g->ready_to_run_general(bbw->task_meta);
} else {
ButexPthreadWaiter* pw = static_cast<ButexPthreadWaiter*>(bw);
wakeup_pthread(pw);
@@ -691,7 +692,6 @@ int butex_wait(void* arg, int expected_value, const
timespec* abstime, bool prep
bbw.initial_butex = b;
bbw.control = g->control();
bbw.abstime = abstime;
- bbw.tag = g->tag();
if (abstime != NULL) {
// Schedule timer before queueing. If the timer is triggered before
diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp
index 5c6ca57c..beb41306 100644
--- a/src/bthread/task_control.cpp
+++ b/src/bthread/task_control.cpp
@@ -348,7 +348,7 @@ int TaskControl::add_workers(int num, bthread_tag_t tag) {
}
TaskGroup* TaskControl::choose_one_group(bthread_tag_t tag) {
- CHECK(tag >= BTHREAD_TAG_DEFAULT && tag < FLAGS_task_group_ntags);
+ CHECK(tag >= BTHREAD_TAG_DEFAULT && tag < FLAGS_task_group_ntags) << tag;
auto& groups = tag_group(tag);
const auto ngroup = tag_ngroup(tag).load(butil::memory_order_acquire);
if (ngroup != 0) {
diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp
index c0804c9a..0fd09955 100644
--- a/src/bthread/task_group.cpp
+++ b/src/bthread/task_group.cpp
@@ -351,7 +351,7 @@ void TaskGroup::asan_task_runner(intptr_t) {
void TaskGroup::task_runner(intptr_t skip_remained) {
// NOTE: tls_task_group is volatile since tasks are moved around
// different groups.
- TaskGroup* g = tls_task_group;
+ TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
#ifdef BRPC_BTHREAD_TRACER
TaskTracer::set_running_status(g->tid(), g->_cur_meta);
#endif // BRPC_BTHREAD_TRACER
@@ -526,13 +526,15 @@ int TaskGroup::start_foreground(TaskGroup** pg,
m->cpuwide_start_ns = start_ns;
m->stat = EMPTY_STAT;
m->tid = make_tid(*m->version_butex, slot);
- m->priority_index = pg ? (*pg)->_cur_meta->priority_index : -1;
+
+ TaskGroup* g = *pg;
+ m->priority_index = g->_cur_meta->priority_index;
+ m->attr.tag = g->tag();
*th = m->tid;
if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
LOG(INFO) << "Started bthread " << m->tid;
}
- TaskGroup* g = *pg;
g->_control->_nbthreads << 1;
g->_control->tag_nbthreads(g->tag()) << 1;
#ifdef BRPC_BTHREAD_TRACER
@@ -601,6 +603,7 @@ int TaskGroup::start_background(bthread_t* __restrict th,
if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
LOG(INFO) << "Started bthread " << m->tid;
}
+ m->attr.tag = tag();
_control->_nbthreads << 1;
_control->tag_nbthreads(tag()) << 1;
#ifdef BRPC_BTHREAD_TRACER
@@ -635,7 +638,7 @@ int TaskGroup::join(bthread_t tid, void** return_value) {
// The bthread is not created yet, this join is definitely wrong.
return EINVAL;
}
- TaskGroup* g = tls_task_group;
+ TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g != NULL && g->current_tid() == tid) {
// joining self causes indefinite waiting.
return EINVAL;
@@ -912,14 +915,14 @@ void
TaskGroup::flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex) {
}
void TaskGroup::ready_to_run_general(TaskMeta* meta, bool nosignal) {
- if (tls_task_group == this) {
+ if (BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group) == this) {
return ready_to_run(meta, nosignal);
}
return ready_to_run_remote(meta, nosignal);
}
void TaskGroup::flush_nosignal_tasks_general() {
- if (tls_task_group == this) {
+ if (BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group) == this) {
return flush_nosignal_tasks();
}
return flush_nosignal_tasks_remote();
@@ -927,28 +930,30 @@ void TaskGroup::flush_nosignal_tasks_general() {
void TaskGroup::ready_to_run_in_worker(void* args_in) {
ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
- return tls_task_group->ready_to_run(args->meta, args->nosignal);
+ return BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group)->
+ ready_to_run(args->meta, args->nosignal);
}
void TaskGroup::ready_to_run_in_worker_ignoresignal(void* args_in) {
ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
+ TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
+
#ifdef BRPC_BTHREAD_TRACER
- tls_task_group->_control->_task_tracer.set_status(
- TASK_STATUS_READY, args->meta);
+ g->_control->_task_tracer.set_status(TASK_STATUS_READY, args->meta);
#endif // BRPC_BTHREAD_TRACER
- return tls_task_group->push_rq(args->meta->tid);
+ return g->push_rq(args->meta->tid);
}
void TaskGroup::priority_to_run(void* args_in) {
ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
+ TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
#ifdef BRPC_BTHREAD_TRACER
- tls_task_group->_control->_task_tracer.set_status(
- TASK_STATUS_READY, args->meta);
+ g->_control->_task_tracer.set_status(TASK_STATUS_READY, args->meta);
#endif // BRPC_BTHREAD_TRACER
if (args->meta->priority_index < 0) {
- return tls_task_group->push_rq(args->meta->tid);
+ return g->push_rq(args->meta->tid);
}
- return tls_task_group->control()->push_ed_priority_queue(
+ return g->control()->push_ed_priority_queue(
args->tag, args->meta->priority_index, args->meta->tid);
}
@@ -960,10 +965,10 @@ struct SleepArgs {
};
static void ready_to_run_from_timer_thread(void* arg) {
- CHECK(tls_task_group == NULL);
+ CHECK(BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group) == NULL);
const SleepArgs* e = static_cast<const SleepArgs*>(arg);
- auto g = e->group;
- auto tag = g->tag();
+ TaskGroup* g = e->group;
+ bthread_tag_t tag = g->tag();
g->control()->choose_one_group(tag)->ready_to_run_remote(e->meta);
}
@@ -1089,7 +1094,7 @@ static int set_butex_waiter(bthread_t tid, ButexWaiter*
w) {
// by race conditions.
// TODO: bthreads created by BTHREAD_ATTR_PTHREAD blocking on bthread_usleep()
// can't be interrupted.
-int TaskGroup::interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag) {
+int TaskGroup::interrupt(bthread_t tid, TaskControl* c) {
// Consume current_waiter in the TaskMeta, wake it up then set it back.
ButexWaiter* w = NULL;
uint64_t sleep_id = 0;
@@ -1110,7 +1115,7 @@ int TaskGroup::interrupt(bthread_t tid, TaskControl* c,
bthread_tag_t tag) {
}
} else if (sleep_id != 0) {
if (get_global_timer_thread()->unschedule(sleep_id) == 0) {
- TaskGroup* g = tls_task_group;
+ TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
TaskMeta* m = address_meta(tid);
if (g) {
g->ready_to_run(m);
@@ -1118,7 +1123,7 @@ int TaskGroup::interrupt(bthread_t tid, TaskControl* c,
bthread_tag_t tag) {
if (!c) {
return EINVAL;
}
- c->choose_one_group(tag)->ready_to_run_remote(m);
+ c->choose_one_group(m->attr.tag)->ready_to_run_remote(m);
}
}
}
diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h
index 54140c0d..fc0c5cb4 100644
--- a/src/bthread/task_group.h
+++ b/src/bthread/task_group.h
@@ -203,7 +203,7 @@ public:
// Wake up blocking ops in the thread.
// Returns 0 on success, errno otherwise.
- static int interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag);
+ static int interrupt(bthread_t tid, TaskControl* c);
// Get the meta associate with the task.
static TaskMeta* address_meta(bthread_t tid);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]