This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 3bacab8a Support A Multiple Producer, Single Consumer Queue (#2492)
3bacab8a is described below
commit 3bacab8a58b3498f563799816b3291d21bfaf358
Author: Bright Chen <[email protected]>
AuthorDate: Tue Jan 23 13:22:08 2024 +0800
Support A Multiple Producer, Single Consumer Queue (#2492)
---
src/butil/containers/mpsc_queue.h | 189 ++++++++++++++++++++++++++++++++++++++
src/butil/thread_key.h | 6 +-
test/BUILD.bazel | 1 +
test/CMakeLists.txt | 1 +
test/Makefile | 1 +
test/mpsc_queue_unittest.cc | 124 +++++++++++++++++++++++++
6 files changed, 319 insertions(+), 3 deletions(-)
diff --git a/src/butil/containers/mpsc_queue.h
b/src/butil/containers/mpsc_queue.h
new file mode 100644
index 00000000..4c7072da
--- /dev/null
+++ b/src/butil/containers/mpsc_queue.h
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A Multiple Producer, Single Consumer Queue.
+// It allows multiple threads to enqueue, and allows one thread
+// (and only one thread) to dequeue.
+
+#ifndef BUTIL_MPSC_QUEUE_H
+#define BUTIL_MPSC_QUEUE_H
+
+#include "butil/object_pool.h"
+#include "butil/type_traits.h"
+
+namespace butil {
+
+template <typename T>
+struct BAIDU_CACHELINE_ALIGNMENT MPSCQueueNode {
+ static MPSCQueueNode* const UNCONNECTED;
+
+ MPSCQueueNode* next{NULL};
+ char data_mem[sizeof(T)]{};
+
+};
+
+template <typename T>
+MPSCQueueNode<T>* const MPSCQueueNode<T>::UNCONNECTED =
(MPSCQueueNode<T>*)(intptr_t)-1;
+
+// Default allocator for MPSCQueueNode.
+template <typename T>
+class DefaultAllocator {
+public:
+ void* Alloc() { return malloc(sizeof(MPSCQueueNode<T>)); }
+ void Free(void* p) { free(p); }
+};
+
+// Allocator using ObjectPool for MPSCQueueNode.
+template <typename T>
+class ObjectPoolAllocator {
+public:
+ void* Alloc() { return get_object<MPSCQueueNode<T>>(); }
+ void Free(void* p) { return_object(p); }
+};
+
+
+template <typename T, typename Alloc = DefaultAllocator<T>>
+class MPSCQueue {
+public:
+ MPSCQueue()
+ : _head(NULL)
+ , _cur_enqueue_node(NULL)
+ , _cur_dequeue_node(NULL) {}
+
+ ~MPSCQueue();
+
+ // Enqueue data to the queue.
+ void Enqueue(typename add_const_reference<T>::type data);
+ void Enqueue(T&& data);
+
+ // Dequeue data from the queue.
+ bool Dequeue(T& data);
+
+private:
+ // Reverse the list until old_head.
+ void ReverseList(MPSCQueueNode<T>* old_head);
+
+ void EnqueueImpl(MPSCQueueNode<T>* node);
+ bool DequeueImpl(T* data);
+
+ Alloc _alloc;
+ atomic<MPSCQueueNode<T>*> _head;
+ atomic<MPSCQueueNode<T>*> _cur_enqueue_node;
+ MPSCQueueNode<T>* _cur_dequeue_node;
+};
+
+template <typename T, typename Alloc>
+MPSCQueue<T, Alloc>::~MPSCQueue() {
+ while (DequeueImpl(NULL));
+}
+
+template <typename T, typename Alloc>
+void MPSCQueue<T, Alloc>::Enqueue(typename add_const_reference<T>::type data) {
+ auto node = (MPSCQueueNode<T>*)_alloc.Alloc();
+ node->next = MPSCQueueNode<T>::UNCONNECTED;
+ new ((void*)&node->data_mem) T(data);
+ EnqueueImpl(node);
+}
+
+template <typename T, typename Alloc>
+void MPSCQueue<T, Alloc>::Enqueue(T&& data) {
+ auto node = (MPSCQueueNode<T>*)_alloc.Alloc();
+ node->next = MPSCQueueNode<T>::UNCONNECTED;
+ new ((void*)&node->data_mem) T(std::forward<T>(data));
+ EnqueueImpl(node);
+}
+
+template <typename T, typename Alloc>
+void MPSCQueue<T, Alloc>::EnqueueImpl(MPSCQueueNode<T>* node) {
+ MPSCQueueNode<T>* prev = _head.exchange(node, memory_order_release);
+ if (prev) {
+ node->next = prev;
+ return;
+ }
+ node->next = NULL;
+ _cur_enqueue_node.store(node, memory_order_relaxed);
+}
+
+template <typename T, typename Alloc>
+bool MPSCQueue<T, Alloc>::Dequeue(T& data) {
+ return DequeueImpl(&data);
+}
+
+template <typename T, typename Alloc>
+bool MPSCQueue<T, Alloc>::DequeueImpl(T* data) {
+ MPSCQueueNode<T>* node;
+ if (_cur_dequeue_node) {
+ node = _cur_dequeue_node;
+ } else {
+ node = _cur_enqueue_node.load(memory_order_relaxed);
+ }
+ if (!node) {
+ return false;
+ }
+
+ _cur_enqueue_node.store(NULL, memory_order_relaxed);
+ if (data) {
+ auto mem = (T* const)node->data_mem;
+ *data = std::move(*mem);
+ }
+ MPSCQueueNode<T>* old_node = node;
+ if (!node->next) {
+ ReverseList(node);
+ }
+ _cur_dequeue_node = node->next;
+ return_object(old_node);
+
+ return true;
+}
+
+template <typename T, typename Alloc>
+void MPSCQueue<T, Alloc>::ReverseList(MPSCQueueNode<T>* old_head) {
+ // Try to set _write_head to NULL to mark that it is done.
+ MPSCQueueNode<T>* new_head = old_head;
+ MPSCQueueNode<T>* desired = NULL;
+ if (_head.compare_exchange_strong(
+ new_head, desired, memory_order_acquire)) {
+ // No one added new requests.
+ return;
+ }
+ CHECK_NE(new_head, old_head);
+ // Above acquire fence pairs release fence of exchange in Enqueue() to make
+ // sure that we see all fields of requests set.
+
+ // Someone added new requests.
+ // Reverse the list until old_head.
+ MPSCQueueNode<T>* tail = NULL;
+ MPSCQueueNode<T>* p = new_head;
+ do {
+ while (p->next == MPSCQueueNode<T>::UNCONNECTED) {
+ // TODO(gejun): elaborate this
+ sched_yield();
+ }
+ MPSCQueueNode<T>* const saved_next = p->next;
+ p->next = tail;
+ tail = p;
+ p = saved_next;
+ CHECK(p);
+ } while (p != old_head);
+
+ // Link old list with new list.
+ old_head->next = tail;
+}
+
+}
+
+#endif // BUTIL_MPSC_QUEUE_H
diff --git a/src/butil/thread_key.h b/src/butil/thread_key.h
index 48f02f7d..f8d8f0e4 100644
--- a/src/butil/thread_key.h
+++ b/src/butil/thread_key.h
@@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef BRPC_THREAD_KEY_H
-#define BRPC_THREAD_KEY_H
+#ifndef BUTIL_THREAD_KEY_H
+#define BUTIL_THREAD_KEY_H
#include <limits>
#include <pthread.h>
@@ -199,4 +199,4 @@ void ThreadLocal<T>::reset(T* ptr) {
}
-#endif //BRPC_THREAD_KEY_H
+#endif // BUTIL_THREAD_KEY_H
diff --git a/test/BUILD.bazel b/test/BUILD.bazel
index 8c57c10a..3bf7cd45 100644
--- a/test/BUILD.bazel
+++ b/test/BUILD.bazel
@@ -53,6 +53,7 @@ TEST_BUTIL_SOURCES = [
"mru_cache_unittest.cc",
"small_map_unittest.cc",
"stack_container_unittest.cc",
+ "mpsc_queue_unittest.cc",
"cpu_unittest.cc",
"crash_logging_unittest.cc",
"leak_tracker_unittest.cc",
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 5aaf7d3a..a6227318 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -88,6 +88,7 @@ SET(TEST_BUTIL_SOURCES
${PROJECT_SOURCE_DIR}/test/mru_cache_unittest.cc
${PROJECT_SOURCE_DIR}/test/small_map_unittest.cc
${PROJECT_SOURCE_DIR}/test/stack_container_unittest.cc
+ ${PROJECT_SOURCE_DIR}/test/mpsc_queue_unittest.cc
${PROJECT_SOURCE_DIR}/test/cpu_unittest.cc
${PROJECT_SOURCE_DIR}/test/crash_logging_unittest.cc
${PROJECT_SOURCE_DIR}/test/leak_tracker_unittest.cc
diff --git a/test/Makefile b/test/Makefile
index 6e0dbc97..82efc381 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -58,6 +58,7 @@ TEST_BUTIL_SOURCES = \
mru_cache_unittest.cc \
small_map_unittest.cc \
stack_container_unittest.cc \
+ mpsc_queue_unittest.cc \
cpu_unittest.cc \
crash_logging_unittest.cc \
leak_tracker_unittest.cc \
diff --git a/test/mpsc_queue_unittest.cc b/test/mpsc_queue_unittest.cc
new file mode 100644
index 00000000..da67cfb7
--- /dev/null
+++ b/test/mpsc_queue_unittest.cc
@@ -0,0 +1,124 @@
+#include <gtest/gtest.h>
+#include <pthread.h>
+#include "butil/containers/mpsc_queue.h"
+
+namespace {
+
+const uint MAX_COUNT = 10000000;
+
+void Consume(butil::MPSCQueue<uint>& q, bool allow_empty) {
+ uint i = 0;
+ uint empty_count = 0;
+ while (true) {
+ uint d;
+ if (!q.Dequeue(d)) {
+ ASSERT_TRUE(allow_empty);
+ ASSERT_LT(empty_count++, (const uint)10000);
+ ::usleep(10 * 1000);
+ continue;
+ }
+ ASSERT_EQ(i++, d);
+ if (i == MAX_COUNT) {
+ break;
+ }
+ }
+}
+
+void* ProduceThread(void* arg) {
+ auto q = (butil::MPSCQueue<uint>*)arg;
+ for (uint i = 0; i < MAX_COUNT; ++i) {
+ q->Enqueue(i);
+ }
+ return NULL;
+}
+
+void* ConsumeThread1(void* arg) {
+ auto q = (butil::MPSCQueue<uint>*)arg;
+ Consume(*q, true);
+ return NULL;
+}
+
+TEST(MPSCQueueTest, spsc_single_thread) {
+ butil::MPSCQueue<uint> q;
+ for (uint i = 0; i < MAX_COUNT; ++i) {
+ q.Enqueue(i);
+ }
+ Consume(q, false);
+}
+
+TEST(MPSCQueueTest, spsc_multi_thread) {
+ butil::MPSCQueue<uint> q;
+ pthread_t produce_tid;
+ ASSERT_EQ(0, pthread_create(&produce_tid, NULL, ProduceThread, &q));
+ pthread_t consume_tid;
+ ASSERT_EQ(0, pthread_create(&consume_tid, NULL, ConsumeThread1, &q));
+
+ pthread_join(produce_tid, NULL);
+ pthread_join(consume_tid, NULL);
+
+}
+
+butil::atomic<uint> g_index(0);
+void* MultiProduceThread(void* arg) {
+ auto q = (butil::MPSCQueue<uint>*)arg;
+ while (true) {
+ uint i = g_index.fetch_add(1, butil::memory_order_relaxed);
+ if (i >= MAX_COUNT) {
+ break;
+ }
+ q->Enqueue(i);
+ }
+ return NULL;
+}
+
+butil::Mutex g_mutex;
+bool g_counts[MAX_COUNT];
+void Consume2(butil::MPSCQueue<uint>& q) {
+ uint empty_count = 0;
+ uint count = 0;
+ while (true) {
+ uint d;
+ if (!q.Dequeue(d)) {
+ ASSERT_LT(empty_count++, (const uint)10000);
+ ::usleep(1 * 1000);
+ continue;
+ }
+ ASSERT_LT(d, MAX_COUNT);
+ {
+ BAIDU_SCOPED_LOCK(g_mutex);
+ ASSERT_FALSE(g_counts[d]);
+ g_counts[d] = true;
+ }
+ if (++count >= MAX_COUNT) {
+ break;
+ }
+ }
+}
+
+void* ConsumeThread2(void* arg) {
+ auto q = (butil::MPSCQueue<uint>*)arg;
+ Consume2(*q);
+ return NULL;
+}
+
+TEST(MPSCQueueTest, mpsc_multi_thread) {
+ butil::MPSCQueue<uint> q;
+
+ int thread_num = 8;
+ pthread_t threads[thread_num];
+ for (int i = 0; i < thread_num; ++i) {
+ ASSERT_EQ(0, pthread_create(&threads[i], NULL, MultiProduceThread,
&q));
+ }
+
+ pthread_t consume_tid;
+ ASSERT_EQ(0, pthread_create(&consume_tid, NULL, ConsumeThread2, &q));
+
+ for (int i = 0; i < thread_num; ++i) {
+ pthread_join(threads[i], NULL);
+ }
+ pthread_join(consume_tid, NULL);
+
+}
+
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]