Copilot commented on code in PR #3270: URL: https://github.com/apache/brpc/pull/3270#discussion_r3085203162
########## test/bthread_priority_queue_unittest.cpp: ########## @@ -0,0 +1,248 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> +#include <gflags/gflags.h> +#include <atomic> +#include <vector> +#include <set> +#include <mutex> +#include "bthread/bthread.h" +#include "bthread/processor.h" + +namespace { + +// Counter incremented by priority bthreads to verify execution +std::atomic<int> g_priority_count(0); +// Mutex + set for collecting executed tids to verify no loss +std::mutex g_tid_mutex; +std::set<int> g_executed_ids; + +void reset_globals() { + g_priority_count.store(0); + std::lock_guard<std::mutex> lk(g_tid_mutex); + g_executed_ids.clear(); +} + +struct TaskArg { + int id; +}; + +void* priority_task_fn(void* arg) { + TaskArg* ta = static_cast<TaskArg*>(arg); + g_priority_count.fetch_add(1, std::memory_order_relaxed); + { + std::lock_guard<std::mutex> lk(g_tid_mutex); + g_executed_ids.insert(ta->id); + } + delete ta; + return NULL; +} + +void* normal_task_fn(void* /*arg*/) { + // Just a normal task that does nothing, used as a filler + bthread_usleep(1000); + return NULL; +} + +class PriorityQueueTest : public ::testing::Test { +protected: + void SetUp() override { + reset_globals(); + } +}; + +// Test 1: End-to-end priority task submission and execution. +// Multiple producers submit priority tasks, verify all tasks are executed. +TEST_F(PriorityQueueTest, e2e_priority_tasks_all_executed) { + const int N = 200; + + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::vector<bthread_t> tids(N); + for (int i = 0; i < N; ++i) { + TaskArg* arg = new TaskArg{i}; + ASSERT_EQ(0, bthread_start_background(&tids[i], &attr, + priority_task_fn, arg)); + } + + for (int i = 0; i < N; ++i) { + bthread_join(tids[i], NULL); + } + + ASSERT_EQ(N, g_priority_count.load()); + std::lock_guard<std::mutex> lk(g_tid_mutex); + ASSERT_EQ((size_t)N, g_executed_ids.size()); + for (int i = 0; i < N; ++i) { + ASSERT_TRUE(g_executed_ids.count(i)) << "Missing task id=" << i; + } +} + +// Test 2: Mix of priority and normal tasks, all complete correctly. +TEST_F(PriorityQueueTest, mixed_priority_and_normal_tasks) { + const int N_PRIORITY = 100; + const int N_NORMAL = 100; + + bthread_attr_t priority_attr = BTHREAD_ATTR_NORMAL; + priority_attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::vector<bthread_t> tids; + tids.reserve(N_PRIORITY + N_NORMAL); + + for (int i = 0; i < N_PRIORITY + N_NORMAL; ++i) { + bthread_t tid; + if (i % 2 == 0 && (i / 2) < N_PRIORITY) { + // Priority task + TaskArg* arg = new TaskArg{i / 2}; + ASSERT_EQ(0, bthread_start_background(&tid, &priority_attr, + priority_task_fn, arg)); + } else { + // Normal task + ASSERT_EQ(0, bthread_start_background(&tid, NULL, + normal_task_fn, NULL)); + } + tids.push_back(tid); + } + + for (auto tid : tids) { + bthread_join(tid, NULL); + } + + ASSERT_EQ(N_PRIORITY, g_priority_count.load()); +} + +// Test 3: Concurrent producers submitting priority tasks from multiple pthreads. +// Simulates multiple event dispatchers pushing to the priority queue. +TEST_F(PriorityQueueTest, concurrent_producers_no_task_loss) { + const int NUM_PRODUCERS = 4; + const int TASKS_PER_PRODUCER = 50; + const int TOTAL = NUM_PRODUCERS * TASKS_PER_PRODUCER; + + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::atomic<int> started(0); + std::vector<pthread_t> producers(NUM_PRODUCERS); + std::vector<std::vector<bthread_t>> all_tids(NUM_PRODUCERS); + + struct ProducerArg { + int producer_id; + int tasks_per_producer; + bthread_attr_t* attr; + std::vector<bthread_t>* tids; + std::atomic<int>* started; + }; + + auto producer_fn = [](void* arg) -> void* { + ProducerArg* pa = static_cast<ProducerArg*>(arg); + pa->started->fetch_add(1); + // Spin until all producers are ready + while (pa->started->load() < 4) { + cpu_relax(); + } + pa->tids->resize(pa->tasks_per_producer); + for (int i = 0; i < pa->tasks_per_producer; ++i) { + int id = pa->producer_id * pa->tasks_per_producer + i; + TaskArg* ta = new TaskArg{id}; + int rc = bthread_start_background(&(*pa->tids)[i], pa->attr, + priority_task_fn, ta); + EXPECT_EQ(0, rc); + } + return NULL; + }; + + std::vector<ProducerArg> pargs(NUM_PRODUCERS); + for (int i = 0; i < NUM_PRODUCERS; ++i) { + pargs[i] = {i, TASKS_PER_PRODUCER, &attr, &all_tids[i], &started}; + ASSERT_EQ(0, pthread_create(&producers[i], NULL, producer_fn, &pargs[i])); + } + + for (int i = 0; i < NUM_PRODUCERS; ++i) { + pthread_join(producers[i], NULL); + } + + // Join all bthreads + for (int i = 0; i < NUM_PRODUCERS; ++i) { + for (auto tid : all_tids[i]) { + bthread_join(tid, NULL); + } + } + + ASSERT_EQ(TOTAL, g_priority_count.load()); + std::lock_guard<std::mutex> lk(g_tid_mutex); + ASSERT_EQ((size_t)TOTAL, g_executed_ids.size()); +} + +// Test 4: Priority tasks submitted with only 1 shard (degenerate case). +// Verifies correctness when nshard=1. +TEST_F(PriorityQueueTest, single_shard_correctness) { + // This test relies on FLAGS_priority_queue_shards being set before + // TaskControl init. Since TaskControl is already initialized by the + // time we run, we test with whatever shard count is configured. Review Comment: This test is named/annotated as a single-shard correctness test, but it never sets `FLAGS_priority_queue_shards` to 1 (the main sets it to 4), so it doesn’t actually exercise the degenerate nshard=1 case. Either set `priority_queue_shards=1` before any bthread/TaskControl initialization (e.g., in `main`) or rename/update the test to reflect what it truly verifies. ```suggestion // Test 4: Priority tasks submitted under the configured shard count. // Verifies correctness and that no priority tasks are lost. TEST_F(PriorityQueueTest, configured_shards_correctness) { // FLAGS_priority_queue_shards must be set before TaskControl init. // This test therefore validates behavior with whatever shard count // was configured in main() for the test binary. ``` ########## test/bthread_priority_queue_owner_unittest.cpp: ########## @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Tests for B2 priority queue owner dynamic changes: +// 1. New workers bind to available shards correctly +// 2. Priority tasks survive concurrency increases +// 3. Stress: concurrent priority tasks + concurrency scaling + +#include <gtest/gtest.h> +#include <gflags/gflags.h> +#include <atomic> +#include <vector> +#include <set> +#include <mutex> +#include <thread> Review Comment: This file uses `usleep()` and `sched_yield()` but doesn’t include the headers that declare them. Please include `<unistd.h>` (for `usleep`) and `<sched.h>` (for `sched_yield`) to avoid relying on transitive includes and to prevent build failures on stricter toolchains. ```suggestion #include <thread> #include <unistd.h> #include <sched.h> ``` ########## src/bthread/task_control.cpp: ########## @@ -528,8 +549,47 @@ int TaskControl::_destroy_group(TaskGroup* g) { bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) { auto tag = tls_task_group->tag(); - if (_priority_queues[tag].steal(tid)) { - return true; + // priority queue: owner-first, then steal from other shards + if (_enable_priority_queue && !_priority_shards[tag].empty()) { + auto& shards = _priority_shards[tag]; + const size_t nshard = shards.size(); + + // Owner-first: if current TaskGroup owns a shard, flush and pop + const int my_shard = tls_task_group->_priority_shard_index; + if (my_shard >= 0 && (size_t)my_shard < nshard) { + PriorityShard* shard = shards[my_shard].get(); + if (shard->owner.load(butil::memory_order_relaxed) == tls_task_group) { + static const size_t kFlushBatch = 8; + flush_priority_inbound(shard, kFlushBatch); + if (shard->wsq.pop(tid)) { + return true; + } + } + } + + // Steal from all shards (random start to avoid hot spot) + size_t start = butil::fast_rand() % nshard; + for (size_t i = 0; i < nshard; ++i) { + size_t idx = (start + i) % nshard; + if (shards[idx]->wsq.steal(tid)) { + return true; + } + } + + // Salvage: drain ownerless shards' inbound to prevent task starvation. + // This handles the TOCTOU race where a producer enqueues after unbind + // finishes draining but before a new owner binds. + for (size_t i = 0; i < nshard; ++i) { + size_t idx = (start + i) % nshard; + PriorityShard* shard = shards[idx].get(); + if (shard->owner.load(butil::memory_order_relaxed) == NULL && + !shard->draining.load(butil::memory_order_relaxed)) { + bthread_t salvaged; + if (shard->inbound.Dequeue(salvaged)) { + fallback_enqueue(tag, salvaged); Review Comment: In the salvage path you call `MPSCQueue::Dequeue` from `steal_task()`. `butil::MPSCQueue` is explicitly single-consumer, but multiple workers can execute `steal_task()` concurrently, which can lead to data races/corruption when more than one thread dequeues from the same inbound queue. Consider removing this salvage dequeue, switching inbound to an MPMC structure, or guarding salvage draining with a per-shard CAS/lock so only one thread ever dequeues at a time (and keep dequeue ownership consistent with the queue’s single-consumer contract). ```suggestion // // `inbound` is a single-consumer queue, so salvage must first claim // exclusive drain ownership. Multiple workers can execute steal_task() // concurrently, and relaxed loads alone are not sufficient to prevent // two workers from dequeuing from the same inbound queue at once. for (size_t i = 0; i < nshard; ++i) { size_t idx = (start + i) % nshard; PriorityShard* shard = shards[idx].get(); if (shard->owner.load(butil::memory_order_relaxed) == NULL) { bool expected = false; if (shard->draining.compare_exchange_strong( expected, true, butil::memory_order_acquire, butil::memory_order_relaxed)) { bthread_t salvaged; if (shard->owner.load(butil::memory_order_relaxed) == NULL && shard->inbound.Dequeue(salvaged)) { fallback_enqueue(tag, salvaged); } shard->draining.store(false, butil::memory_order_release); ``` ########## test/bthread_priority_queue_unittest.cpp: ########## @@ -0,0 +1,248 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> +#include <gflags/gflags.h> +#include <atomic> +#include <vector> +#include <set> +#include <mutex> +#include "bthread/bthread.h" +#include "bthread/processor.h" + +namespace { + +// Counter incremented by priority bthreads to verify execution +std::atomic<int> g_priority_count(0); +// Mutex + set for collecting executed tids to verify no loss +std::mutex g_tid_mutex; +std::set<int> g_executed_ids; + +void reset_globals() { + g_priority_count.store(0); + std::lock_guard<std::mutex> lk(g_tid_mutex); + g_executed_ids.clear(); +} + +struct TaskArg { + int id; +}; + +void* priority_task_fn(void* arg) { + TaskArg* ta = static_cast<TaskArg*>(arg); + g_priority_count.fetch_add(1, std::memory_order_relaxed); + { + std::lock_guard<std::mutex> lk(g_tid_mutex); + g_executed_ids.insert(ta->id); + } + delete ta; + return NULL; +} + +void* normal_task_fn(void* /*arg*/) { + // Just a normal task that does nothing, used as a filler + bthread_usleep(1000); + return NULL; +} + +class PriorityQueueTest : public ::testing::Test { +protected: + void SetUp() override { + reset_globals(); + } +}; + +// Test 1: End-to-end priority task submission and execution. +// Multiple producers submit priority tasks, verify all tasks are executed. +TEST_F(PriorityQueueTest, e2e_priority_tasks_all_executed) { + const int N = 200; + + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::vector<bthread_t> tids(N); + for (int i = 0; i < N; ++i) { + TaskArg* arg = new TaskArg{i}; + ASSERT_EQ(0, bthread_start_background(&tids[i], &attr, + priority_task_fn, arg)); + } + + for (int i = 0; i < N; ++i) { + bthread_join(tids[i], NULL); + } + + ASSERT_EQ(N, g_priority_count.load()); + std::lock_guard<std::mutex> lk(g_tid_mutex); + ASSERT_EQ((size_t)N, g_executed_ids.size()); + for (int i = 0; i < N; ++i) { + ASSERT_TRUE(g_executed_ids.count(i)) << "Missing task id=" << i; + } +} + +// Test 2: Mix of priority and normal tasks, all complete correctly. +TEST_F(PriorityQueueTest, mixed_priority_and_normal_tasks) { + const int N_PRIORITY = 100; + const int N_NORMAL = 100; + + bthread_attr_t priority_attr = BTHREAD_ATTR_NORMAL; + priority_attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::vector<bthread_t> tids; + tids.reserve(N_PRIORITY + N_NORMAL); + + for (int i = 0; i < N_PRIORITY + N_NORMAL; ++i) { + bthread_t tid; + if (i % 2 == 0 && (i / 2) < N_PRIORITY) { + // Priority task + TaskArg* arg = new TaskArg{i / 2}; + ASSERT_EQ(0, bthread_start_background(&tid, &priority_attr, + priority_task_fn, arg)); + } else { + // Normal task + ASSERT_EQ(0, bthread_start_background(&tid, NULL, + normal_task_fn, NULL)); + } + tids.push_back(tid); + } + + for (auto tid : tids) { + bthread_join(tid, NULL); + } + + ASSERT_EQ(N_PRIORITY, g_priority_count.load()); +} + +// Test 3: Concurrent producers submitting priority tasks from multiple pthreads. +// Simulates multiple event dispatchers pushing to the priority queue. +TEST_F(PriorityQueueTest, concurrent_producers_no_task_loss) { + const int NUM_PRODUCERS = 4; + const int TASKS_PER_PRODUCER = 50; + const int TOTAL = NUM_PRODUCERS * TASKS_PER_PRODUCER; + + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::atomic<int> started(0); + std::vector<pthread_t> producers(NUM_PRODUCERS); + std::vector<std::vector<bthread_t>> all_tids(NUM_PRODUCERS); + + struct ProducerArg { + int producer_id; + int tasks_per_producer; + bthread_attr_t* attr; + std::vector<bthread_t>* tids; + std::atomic<int>* started; + }; + + auto producer_fn = [](void* arg) -> void* { + ProducerArg* pa = static_cast<ProducerArg*>(arg); + pa->started->fetch_add(1); + // Spin until all producers are ready + while (pa->started->load() < 4) { + cpu_relax(); + } Review Comment: The producer barrier is hardcoded to `4` (`while (pa->started->load() < 4)`), which will break if `NUM_PRODUCERS` is changed. Use `NUM_PRODUCERS` (or pass the expected count through `ProducerArg`) so the test remains correct when the constant is updated. ########## src/bthread/task_control.cpp: ########## @@ -689,4 +749,117 @@ std::vector<bthread_t> TaskControl::get_living_bthreads() { return living_bthread_ids; } +void TaskControl::push_priority_queue(bthread_tag_t tag, bthread_t tid) { + if (!_enable_priority_queue || _priority_shards[tag].empty()) { + fallback_enqueue(tag, tid); + return; + } + auto& shards = _priority_shards[tag]; + const size_t nshard = shards.size(); + + // thread_local round-robin, zero contention + static BAIDU_THREAD_LOCAL size_t tl_rr = 0; + size_t start = tl_rr++ % nshard; + + // Prefer shards that have an active owner (not draining) + for (size_t i = 0; i < nshard; ++i) { + size_t idx = (start + i) % nshard; + if (shards[idx]->owner.load(butil::memory_order_relaxed) != NULL && + !shards[idx]->draining.load(butil::memory_order_relaxed)) { + shards[idx]->inbound.Enqueue(tid); + return; + } + } + + // All shards ownerless, fallback to round-robin pick + shards[start]->inbound.Enqueue(tid); Review Comment: When no shard has an active owner, this code still enqueues into `shards[start]->inbound`. With an ownerless shard there may be no single designated consumer to flush inbound, and the current workaround relies on salvage dequeues in `steal_task()` (which breaks MPSC’s single-consumer rule). Safer options: (1) if no owned shard is available, immediately `fallback_enqueue(tag, tid)`; or (2) introduce a single-consumer handoff mechanism (e.g., temporarily claim a shard for draining) before allowing enqueues to ownerless shards. ```suggestion // No shard has an active owner, so avoid enqueueing into an ownerless // shard-local inbound queue with no designated single consumer. fallback_enqueue(tag, tid); ``` ########## src/bthread/task_control.cpp: ########## @@ -689,4 +749,117 @@ std::vector<bthread_t> TaskControl::get_living_bthreads() { return living_bthread_ids; } +void TaskControl::push_priority_queue(bthread_tag_t tag, bthread_t tid) { + if (!_enable_priority_queue || _priority_shards[tag].empty()) { + fallback_enqueue(tag, tid); + return; + } + auto& shards = _priority_shards[tag]; + const size_t nshard = shards.size(); + + // thread_local round-robin, zero contention + static BAIDU_THREAD_LOCAL size_t tl_rr = 0; + size_t start = tl_rr++ % nshard; + + // Prefer shards that have an active owner (not draining) + for (size_t i = 0; i < nshard; ++i) { + size_t idx = (start + i) % nshard; + if (shards[idx]->owner.load(butil::memory_order_relaxed) != NULL && + !shards[idx]->draining.load(butil::memory_order_relaxed)) { + shards[idx]->inbound.Enqueue(tid); + return; + } + } + + // All shards ownerless, fallback to round-robin pick + shards[start]->inbound.Enqueue(tid); +} + +void TaskControl::bind_priority_owner(TaskGroup* g, bthread_tag_t tag) { + auto& shards = _priority_shards[tag]; + if (shards.empty()) { + return; + } + const size_t nshard = shards.size(); + size_t start = butil::fast_rand() % nshard; + for (size_t i = 0; i < nshard; ++i) { + size_t idx = (start + i) % nshard; + // Skip shards being drained + if (shards[idx]->draining.load(butil::memory_order_acquire)) { + continue; + } + TaskGroup* expected = NULL; + if (shards[idx]->owner.compare_exchange_strong( + expected, g, + butil::memory_order_release, + butil::memory_order_relaxed)) { + g->_priority_shard_index = static_cast<int>(idx); + return; + } + } + // All shards occupied, this group won't own a shard (will only steal) +} + +void TaskControl::unbind_priority_owner(TaskGroup* g, bthread_tag_t tag) { + const int idx = g->_priority_shard_index; + if (idx < 0) { + return; + } + auto& shards = _priority_shards[tag]; + if ((size_t)idx >= shards.size()) { + return; + } + PriorityShard* shard = shards[idx].get(); + if (shard->owner.load(butil::memory_order_relaxed) != g) { + g->_priority_shard_index = -1; + return; + } + + // Mark draining to prevent new owner from binding + shard->draining.store(true, butil::memory_order_release); + shard->owner.store(NULL, butil::memory_order_release); + + // Drain inbound + bthread_t tid; + while (shard->inbound.Dequeue(tid)) { + fallback_enqueue(tag, tid); + } + // Drain wsq, steal since we're no longer the owner + while (shard->wsq.steal(&tid)) { + fallback_enqueue(tag, tid); + } + + // Allow new owner to bind + shard->draining.store(false, butil::memory_order_release); + g->_priority_shard_index = -1; +} + +void TaskControl::flush_priority_inbound(PriorityShard* shard, size_t max_batch) { + bthread_t tid; + for (size_t i = 0; i < max_batch; ++i) { + if (!shard->inbound.Dequeue(tid)) { + break; + } + if (!shard->wsq.push(tid)) { + // wsq full, push back won't work; fallback this task + fallback_enqueue(tls_task_group->tag(), tid); + break; + } + } +} + +void TaskControl::fallback_enqueue(bthread_tag_t tag, bthread_t tid) { + // Clear BTHREAD_GLOBAL_PRIORITY flag to prevent re-entering priority queue + TaskMeta* m = TaskGroup::address_meta(tid); + if (m) { + m->attr.flags &= ~BTHREAD_GLOBAL_PRIORITY; + } + // Enqueue to a random group's remote_rq, thenormal scheduling path + TaskGroup* g = choose_one_group(tag); + if (g) { + g->_remote_rq.push(tid); + signal_task(1, tag); Review Comment: `RemoteTaskQueue::push()` can fail when the bounded queue is full, but the return value is ignored here, which can silently drop tasks during fallback (breaking correctness). Please use the existing `TaskGroup::ready_to_run_remote(TaskMeta*)` path (which retries until push succeeds), or implement a retry/alternate-group loop and log/backoff similar to `TaskGroup::ready_to_run_remote` to ensure the tid is not lost. ```suggestion // Enqueue through the existing remote-ready path so bounded remote queues // are retried instead of silently dropping the task when full. TaskGroup* g = choose_one_group(tag); if (g && m) { g->ready_to_run_remote(m); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
