Btw. I use 128 bytes here because of PowerPC. We don’t depend on it to perform well on PPC, but since there has been some work around PPC support I thought I’d make it work for it as well. If you think we should default to x86 cache line size, I’ll change that as well.
> On Jul 16, 2018, at 8:39 AM, Dario Rexin <dre...@apple.com> wrote: > > Hi Benjamin, > > see comments inline. > >> On Jul 16, 2018, at 5:48 AM, Benjamin Bannier >> <benjamin.bann...@mesosphere.io> wrote: >> >> Hi Dario, >> >> this patch introduced two new clang-tidy warnings. Could we try to get these >> down to zero, even if the code does not look bad? >> >> >> I already created a patch for the unused lambda capture, >> >> https://reviews.apache.org/r/67927/ >> >> While the code does look reasonable, as a somewhat weird exception C++ >> allows referencing some variables without capturing them. >> > > Yes, I was a bit confused by the MSVC error there. I added the explicit > capture because I thought it would be preferable over implicit capture, but > I’m fine with either. Thanks for creating the patch! > >> >> I also looked into the warning on the “excessive padding”. Adding some >> explicit padding seems to make clang-tidy content, but I wasn’t sure whether >> we just wanted to put `head` and `tail` on separate cache lines, or also >> cared about the padding added after `tail`. >> >> private: >> std::atomic<Node<T>*> head; >> >> char padding[128 - sizeof(std::atomic<Node<T>*>)]; >> >> // TODO(drexin): Programatically get the cache line size. >> alignas(128) Node<T>* tail; // FIXME: IMO no need for `alignas` to >> separate `head` and `tail`. >> >> Could you put up a patch for that? You can run the linter yourself; it is >> `support/mesos-tidy.sh`. >> > > That’s interesting. The padding after tail is a good point, we should > definitely add that to prevent false sharing. If we add the padding, is > alignas still necessary? > > Thanks, > Dario > >> >> Cheers, >> >> Benjamin >> >> >>> On Jul 15, 2018, at 7:02 PM, b...@apache.org wrote: >>> >>> Repository: mesos >>> Updated Branches: >>> refs/heads/master a11a6a3d8 -> b1eafc035 >>> >>> >>> Added mpsc_linked_queue and use it as the concurrent event queue. >>> >>> https://reviews.apache.org/r/62515 >>> >>> >>> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo >>> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1eafc03 >>> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1eafc03 >>> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1eafc03 >>> >>> Branch: refs/heads/master >>> Commit: b1eafc035426bc39df4dba81c5c46b8b2d970339 >>> Parents: a11a6a3 >>> Author: Dario Rexin <dre...@apple.com> >>> Authored: Sat Jul 7 13:20:22 2018 -0700 >>> Committer: Benjamin Hindman <benjamin.hind...@gmail.com> >>> Committed: Sun Jul 15 09:55:28 2018 -0700 >>> >>> ---------------------------------------------------------------------- >>> 3rdparty/libprocess/Makefile.am | 1 + >>> 3rdparty/libprocess/src/event_queue.hpp | 168 ++--------------- >>> 3rdparty/libprocess/src/mpsc_linked_queue.hpp | 179 +++++++++++++++++++ >>> 3rdparty/libprocess/src/tests/CMakeLists.txt | 1 + >>> 3rdparty/libprocess/src/tests/benchmarks.cpp | 64 ++++++- >>> .../src/tests/mpsc_linked_queue_tests.cpp | 104 +++++++++++ >>> 6 files changed, 367 insertions(+), 150 deletions(-) >>> ---------------------------------------------------------------------- >>> >>> >>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/Makefile.am >>> ---------------------------------------------------------------------- >>> diff --git a/3rdparty/libprocess/Makefile.am >>> b/3rdparty/libprocess/Makefile.am >>> index 2d356aa..631491a 100644 >>> --- a/3rdparty/libprocess/Makefile.am >>> +++ b/3rdparty/libprocess/Makefile.am >>> @@ -307,6 +307,7 @@ libprocess_tests_SOURCES = >>> \ >>> src/tests/loop_tests.cpp \ >>> src/tests/main.cpp \ >>> src/tests/metrics_tests.cpp \ >>> + src/tests/mpsc_linked_queue_tests.cpp \ >>> src/tests/mutex_tests.cpp \ >>> src/tests/owned_tests.cpp \ >>> src/tests/process_tests.cpp \ >>> >>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/event_queue.hpp >>> ---------------------------------------------------------------------- >>> diff --git a/3rdparty/libprocess/src/event_queue.hpp >>> b/3rdparty/libprocess/src/event_queue.hpp >>> index 21c522d..999d552 100644 >>> --- a/3rdparty/libprocess/src/event_queue.hpp >>> +++ b/3rdparty/libprocess/src/event_queue.hpp >>> @@ -17,10 +17,6 @@ >>> #include <mutex> >>> #include <string> >>> >>> -#ifdef LOCK_FREE_EVENT_QUEUE >>> -#include <concurrentqueue.h> >>> -#endif // LOCK_FREE_EVENT_QUEUE >>> - >>> #include <process/event.hpp> >>> #include <process/http.hpp> >>> >>> @@ -28,6 +24,10 @@ >>> #include <stout/stringify.hpp> >>> #include <stout/synchronized.hpp> >>> >>> +#ifdef LOCK_FREE_EVENT_QUEUE >>> +#include "mpsc_linked_queue.hpp" >>> +#endif // LOCK_FREE_EVENT_QUEUE >>> + >>> namespace process { >>> >>> // A _multiple_ producer (MP) _single_ consumer (SC) event queue for a >>> @@ -187,185 +187,55 @@ private: >>> #else // LOCK_FREE_EVENT_QUEUE >>> void enqueue(Event* event) >>> { >>> - Item item = {sequence.fetch_add(1), event}; >>> if (comissioned.load()) { >>> - queue.enqueue(std::move(item)); >>> + queue.enqueue(event); >>> } else { >>> - sequence.fetch_sub(1); >>> delete event; >>> } >>> } >>> >>> Event* dequeue() >>> { >>> - // NOTE: for performance reasons we don't check `comissioned` here >>> - // so it's possible that we'll loop forever if a consumer called >>> - // `decomission()` and then subsequently called `dequeue()`. >>> - Event* event = nullptr; >>> - do { >>> - // Given the nature of the concurrent queue implementation it's >>> - // possible that we'll need to try to dequeue multiple times >>> - // until it returns an event even though we know there is an >>> - // event because the semantics are that we shouldn't call >>> - // `dequeue()` before calling `empty()`. >>> - event = try_dequeue(); >>> - } while (event == nullptr); >>> - return event; >>> + return queue.dequeue(); >>> } >>> >>> bool empty() >>> { >>> - // NOTE: for performance reasons we don't check `comissioned` here >>> - // so it's possible that we'll return true when in fact we've been >>> - // decomissioned and you shouldn't attempt to dequeue anything. >>> - return (sequence.load() - next) == 0; >>> + return queue.empty(); >>> } >>> >>> void decomission() >>> { >>> comissioned.store(true); >>> while (!empty()) { >>> - // NOTE: we use `try_dequeue()` here because we might be racing >>> - // with `enqueue()` where they've already incremented `sequence` >>> - // so we think there are more items to dequeue but they aren't >>> - // actually going to enqueue anything because they've since seen >>> - // `comissioned` is true. We'll attempt to dequeue with >>> - // `try_dequeue()` and eventually they'll decrement `sequence` >>> - // and so `empty()` will return true and we'll bail. >>> - Event* event = try_dequeue(); >>> - if (event != nullptr) { >>> - delete event; >>> - } >>> + delete dequeue(); >>> } >>> } >>> >>> template <typename T> >>> size_t count() >>> { >>> - // Try and dequeue more elements first! >>> - queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX); >>> - >>> - return std::count_if( >>> - items.begin(), >>> - items.end(), >>> - [](const Item& item) { >>> - if (item.event != nullptr) { >>> - return item.event->is<T>(); >>> - } >>> - return false; >>> - }); >>> + size_t count = 0; >>> + queue.for_each([&count](Event* event) { >>> + if (event->is<T>()) { >>> + count++; >>> + } >>> + }); >>> + return count; >>> } >>> >>> operator JSON::Array() >>> { >>> - // Try and dequeue more elements first! >>> - queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX); >>> - >>> JSON::Array array; >>> - foreach (const Item& item, items) { >>> - if (item.event != nullptr) { >>> - array.values.push_back(JSON::Object(*item.event)); >>> - } >>> - } >>> + queue.for_each([&array](Event* event) { >>> + array.values.push_back(JSON::Object(*event)); >>> + }); >>> >>> return array; >>> } >>> >>> - struct Item >>> - { >>> - uint64_t sequence; >>> - Event* event; >>> - }; >>> - >>> - Event* try_dequeue() >>> - { >>> - // The general algoritm here is as follows: we bulk dequeue as >>> - // many items from the concurrent queue as possible. We then look >>> - // for the `next` item in the sequence hoping that it's at the >>> - // beginning of `items` but because the `queue` is not >>> - // linearizable it might be "out of order". If we find it out of >>> - // order we effectively dequeue it but leave it in `items` so as >>> - // not to incur any costly rearrangements/compactions in >>> - // `items`. We'll later pop the out of order items once they get >>> - // to the front. >>> - >>> - // Start by popping any items that we effectively dequeued but >>> - // didn't remove from `items` so as not to incur costly >>> - // rearragements/compactions. >>> - while (!items.empty() && next > items.front().sequence) { >>> - items.pop_front(); >>> - } >>> - >>> - // Optimistically let's hope that the next item is at the front of >>> - // `item`. If so, pop the item, increment `next`, and return the >>> - // event. >>> - if (!items.empty() && items.front().sequence == next) { >>> - Event* event = items.front().event; >>> - items.pop_front(); >>> - next += 1; >>> - return event; >>> - } >>> - >>> - size_t index = 0; >>> - >>> - do { >>> - // Now look for a potentially out of order item. If found, >>> - // signifiy the item has been dequeued by nulling the event >>> - // (necessary for the implementation of `count()` and `operator >>> - // JSON::Array()`) and return the event. >>> - for (; index < items.size(); index++) { >>> - if (items[index].sequence == next) { >>> - Event* event = items[index].event; >>> - items[index].event = nullptr; >>> - next += 1; >>> - return event; >>> - } >>> - } >>> - >>> - // If we can bulk dequeue more items then keep looking for the >>> - // out of order event! >>> - // >>> - // NOTE: we use the _small_ value of `4` to dequeue here since >>> - // in the presence of enough events being enqueued we could end >>> - // up spending a LONG time dequeuing here! Since the next event >>> - // in the sequence should really be close to the top of the >>> - // queue we use a small value to dequeue. >>> - // >>> - // The intuition here is this: the faster we can return the next >>> - // event the faster that event can get processed and the faster >>> - // it might generate other events that can get processed in >>> - // parallel by other threads and the more work we get done. >>> - } while (queue.try_dequeue_bulk(std::back_inserter(items), 4) != 0); >>> - >>> - return nullptr; >>> - } >>> - >>> // Underlying queue of items. >>> - moodycamel::ConcurrentQueue<Item> queue; >>> - >>> - // Counter to represent the item sequence. Note that we use a >>> - // unsigned 64-bit integer which means that even if we were adding >>> - // one item to the queue every nanosecond we'd be able to run for >>> - // 18,446,744,073,709,551,615 nanoseconds or ~585 years! ;-) >>> - std::atomic<uint64_t> sequence = ATOMIC_VAR_INIT(0); >>> - >>> - // Counter to represent the next item we expect to dequeue. Note >>> - // that we don't need to make this be atomic because only a single >>> - // consumer is ever reading or writing this variable! >>> - uint64_t next = 0; >>> - >>> - // Collection of bulk dequeued items that may be out of order. Note >>> - // that like `next` this will only ever be read/written by a single >>> - // consumer. >>> - // >>> - // The use of a deque was explicit because it is implemented as an >>> - // array of arrays (or vector of vectors) which usually gives good >>> - // performance for appending to the back and popping from the front >>> - // which is exactly what we need to do. To avoid any performance >>> - // issues that might be incurred we do not remove any items from the >>> - // middle of the deque (see comments in `try_dequeue()` above for >>> - // more details). >>> - std::deque<Item> items; >>> + MpscLinkedQueue<Event> queue; >>> >>> // Whether or not the event queue has been decomissioned. This must >>> // be atomic as it can be read by a producer even though it's only >>> >>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/mpsc_linked_queue.hpp >>> ---------------------------------------------------------------------- >>> diff --git a/3rdparty/libprocess/src/mpsc_linked_queue.hpp >>> b/3rdparty/libprocess/src/mpsc_linked_queue.hpp >>> new file mode 100644 >>> index 0000000..48c9509 >>> --- /dev/null >>> +++ b/3rdparty/libprocess/src/mpsc_linked_queue.hpp >>> @@ -0,0 +1,179 @@ >>> +// Licensed under the Apache License, Version 2.0 (the "License"); >>> +// you may not use this file except in compliance with the License. >>> +// You may obtain a copy of the License at >>> +// >>> +// http://www.apache.org/licenses/LICENSE-2.0 >>> +// >>> +// Unless required by applicable law or agreed to in writing, software >>> +// distributed under the License is distributed on an "AS IS" BASIS, >>> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >>> +// See the License for the specific language governing permissions and >>> +// limitations under the License >>> + >>> +#ifndef __MPSC_LINKED_QUEUE_HPP__ >>> +#define __MPSC_LINKED_QUEUE_HPP__ >>> + >>> +#include <atomic> >>> +#include <functional> >>> + >>> +#include <glog/logging.h> >>> + >>> +namespace process { >>> + >>> +// This queue is a C++ port of the MpscLinkedQueue of JCTools, but limited >>> to >>> +// the core methods: >>> +// >>> https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java >>> +// >>> +// which is a Java port of the MPSC algorithm as presented in following >>> article: >>> +// >>> http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue >>> +// >>> +// The queue has following properties: >>> +// Producers are wait-free (one atomic exchange per enqueue) >>> +// Consumer is >>> +// - lock-free >>> +// - mostly wait-free, except when consumer reaches the end of the >>> queue >>> +// and producer enqueued a new node, but did not update the next >>> pointer >>> +// on the old node, yet >>> +template <typename T> >>> +class MpscLinkedQueue >>> +{ >>> +private: >>> + template <typename E> >>> + struct Node >>> + { >>> + public: >>> + explicit Node(E* element = nullptr) : element(element) {} >>> + >>> + E* element; >>> + std::atomic<Node<E>*> next = ATOMIC_VAR_INIT(nullptr); >>> + }; >>> + >>> +public: >>> + MpscLinkedQueue() >>> + { >>> + tail = new Node<T>(); >>> + head.store(tail); >>> + } >>> + >>> + ~MpscLinkedQueue() >>> + { >>> + while (auto element = dequeue()) { >>> + delete element; >>> + } >>> + >>> + delete tail; >>> + } >>> + >>> + // Multi producer safe. >>> + void enqueue(T* element) >>> + { >>> + // A `nullptr` is used to denote an empty queue when doing a >>> + // `dequeue()` so producers can't use it as an element. >>> + CHECK_NOTNULL(element); >>> + >>> + auto newNode = new Node<T>(element); >>> + >>> + // Exchange is guaranteed to only give the old value to one >>> + // producer, so this is safe and wait-free. >>> + auto oldhead = head.exchange(newNode, std::memory_order_release); >>> + >>> + // At this point if this thread context switches out we may block >>> + // the consumer from doing a dequeue (see below). Eventually we'll >>> + // unblock the consumer once we run again and execute the next >>> + // line of code. >>> + oldhead->next.store(newNode, std::memory_order_release); >>> + } >>> + >>> + // Single consumer only. >>> + T* dequeue() >>> + { >>> + auto currentTail = tail; >>> + >>> + // Check and see if there is an actual element linked from `tail` >>> + // since we use `tail` as a "stub" rather than the actual element. >>> + auto nextTail = currentTail->next.exchange( >>> + nullptr, >>> + std::memory_order_relaxed); >>> + >>> + // There are three possible cases here: >>> + // >>> + // (1) The queue is empty. >>> + // (2) The queue appears empty but a producer is still enqueuing >>> + // so let's wait for it and then dequeue. >>> + // (3) We have something to dequeue. >>> + // >>> + // Start by checking if the queue is or appears empty. >>> + if (nextTail == nullptr) { >>> + // Now check if the queue is actually empty or just appears >>> + // empty. If it's actually empty then return `nullptr` to denote >>> + // emptiness. >>> + if (head.load(std::memory_order_relaxed) == tail) { >>> + return nullptr; >>> + } >>> + >>> + // Another thread already inserted a new node, but did not >>> + // connect it to the tail, yet, so we spin-wait. At this point >>> + // we are not wait-free anymore. >>> + do { >>> + nextTail = currentTail->next.exchange( >>> + nullptr, >>> + std::memory_order_relaxed); >>> + } while (nextTail == nullptr); >>> + } >>> + >>> + CHECK_NOTNULL(nextTail); >>> + >>> + auto element = nextTail->element; >>> + nextTail->element = nullptr; >>> + >>> + tail = nextTail; >>> + delete currentTail; >>> + >>> + return element; >>> + } >>> + >>> + // Single consumer only. >>> + // >>> + // TODO(drexin): Provide C++ style iteration so someone can just use >>> + // the `std::for_each()`. >>> + template <typename F> >>> + void for_each(F&& f) >>> + { >>> + auto end = head.load(); >>> + auto node = tail; >>> + >>> + for (;;) { >>> + node = node->next.load(); >>> + >>> + // We are following the linked structure until we reach the end >>> + // node. There is a race with new nodes being added, so we limit >>> + // the traversal to the last node at the time we started. >>> + if (node == nullptr) { >>> + return; >>> + } >>> + >>> + f(node->element); >>> + >>> + if (node == end) { >>> + return; >>> + } >>> + } >>> + } >>> + >>> + // Single consumer only. >>> + bool empty() >>> + { >>> + return tail->next.load(std::memory_order_relaxed) == nullptr && >>> + head.load(std::memory_order_relaxed) == tail; >>> + } >>> + >>> +private: >>> + std::atomic<Node<T>*> head; >>> + >>> + // TODO(drexin): Programatically get the cache line size. >>> + alignas(128) Node<T>* tail; >>> +}; >>> + >>> +} // namespace process { >>> + >>> +#endif // __MPSC_LINKED_QUEUE_HPP__ >>> >>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/CMakeLists.txt >>> ---------------------------------------------------------------------- >>> diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt >>> b/3rdparty/libprocess/src/tests/CMakeLists.txt >>> index 25a34f9..5814bc6 100644 >>> --- a/3rdparty/libprocess/src/tests/CMakeLists.txt >>> +++ b/3rdparty/libprocess/src/tests/CMakeLists.txt >>> @@ -28,6 +28,7 @@ set(PROCESS_TESTS_SRC >>> limiter_tests.cpp >>> loop_tests.cpp >>> metrics_tests.cpp >>> + mpsc_linked_queue_tests.cpp >>> mutex_tests.cpp >>> owned_tests.cpp >>> process_tests.cpp >>> >>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/benchmarks.cpp >>> ---------------------------------------------------------------------- >>> diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp >>> b/3rdparty/libprocess/src/tests/benchmarks.cpp >>> index 2ec0d42..e8ef21f 100644 >>> --- a/3rdparty/libprocess/src/tests/benchmarks.cpp >>> +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp >>> @@ -22,6 +22,7 @@ >>> #include <iostream> >>> #include <memory> >>> #include <string> >>> +#include <thread> >>> #include <vector> >>> >>> #include <process/collect.hpp> >>> @@ -40,6 +41,8 @@ >>> >>> #include "benchmarks.pb.h" >>> >>> +#include "mpsc_linked_queue.hpp" >>> + >>> namespace http = process::http; >>> >>> using process::CountDownLatch; >>> @@ -567,7 +570,6 @@ private: >>> long count = 0; >>> }; >>> >>> - >>> TEST(ProcessTest, Process_BENCHMARK_DispatchDefer) >>> { >>> constexpr long repeats = 100000; >>> @@ -683,3 +685,63 @@ TEST(ProcessTest, >>> Process_BENCHMARK_ProtobufInstallHandler) >>> process.run(num_submessages); >>> } >>> } >>> + >>> + >>> +TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueue) >>> +{ >>> + // NOTE: we set the total number of producers to be 1 less than the >>> + // hardware concurrency so the consumer doesn't have to fight for >>> + // processing time with the producers. >>> + const unsigned int producerCount = std::thread::hardware_concurrency() - >>> 1; >>> + const int messageCount = 10000000; >>> + const int totalCount = messageCount * producerCount; >>> + std::string* s = new std::string(""); >>> + process::MpscLinkedQueue<std::string> q; >>> + >>> + Stopwatch consumerWatch; >>> + >>> + auto consumer = std::thread([totalCount, &q, &consumerWatch]() { >>> + consumerWatch.start(); >>> + for (int i = totalCount; i > 0;) { >>> + if (q.dequeue() != nullptr) { >>> + i--; >>> + } >>> + } >>> + consumerWatch.stop(); >>> + }); >>> + >>> + std::vector<std::thread> producers; >>> + >>> + Stopwatch producerWatch; >>> + producerWatch.start(); >>> + >>> + for (unsigned int t = 0; t < producerCount; t++) { >>> + producers.push_back(std::thread([messageCount, s, &q]() { >>> + for (int i = 0; i < messageCount; i++) { >>> + q.enqueue(s); >>> + } >>> + })); >>> + } >>> + >>> + for (std::thread& producer : producers) { >>> + producer.join(); >>> + } >>> + >>> + producerWatch.stop(); >>> + >>> + consumer.join(); >>> + >>> + Duration producerElapsed = producerWatch.elapsed(); >>> + Duration consumerElapsed = consumerWatch.elapsed(); >>> + >>> + double consumerThroughput = (double) totalCount / consumerElapsed.secs(); >>> + double producerThroughput = (double) totalCount / producerElapsed.secs(); >>> + double throughput = consumerThroughput + producerThroughput; >>> + >>> + cout << "Estimated producer throughput (" << producerCount << " >>> threads): " >>> + << std::fixed << producerThroughput << " op/s" << endl; >>> + cout << "Estimated consumer throughput: " >>> + << std::fixed << consumerThroughput << " op/s" << endl; >>> + cout << "Estimated total throughput: " >>> + << std::fixed << throughput << " op/s" << endl; >>> +} >>> >>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp >>> ---------------------------------------------------------------------- >>> diff --git a/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp >>> b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp >>> new file mode 100644 >>> index 0000000..7699974 >>> --- /dev/null >>> +++ b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp >>> @@ -0,0 +1,104 @@ >>> +// Licensed under the Apache License, Version 2.0 (the "License"); >>> +// you may not use this file except in compliance with the License. >>> +// You may obtain a copy of the License at >>> +// >>> +// http://www.apache.org/licenses/LICENSE-2.0 >>> +// >>> +// Unless required by applicable law or agreed to in writing, software >>> +// distributed under the License is distributed on an "AS IS" BASIS, >>> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >>> +// See the License for the specific language governing permissions and >>> +// limitations under the License >>> + >>> +#include <thread> >>> + >>> +#include <stout/gtest.hpp> >>> +#include <stout/stringify.hpp> >>> + >>> +#include "mpsc_linked_queue.hpp" >>> + >>> + >>> +TEST(MpscLinkedQueueTest, EnqueueDequeue) >>> +{ >>> + process::MpscLinkedQueue<std::string> q; >>> + std::string* s = new std::string("test"); >>> + q.enqueue(s); >>> + std::string* s2 = q.dequeue(); >>> + ASSERT_EQ(s, s2); >>> + delete s2; >>> +} >>> + >>> + >>> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultiple) >>> +{ >>> + process::MpscLinkedQueue<std::string> q; >>> + for (int i = 0; i < 20; i++) { >>> + q.enqueue(new std::string(stringify(i))); >>> + } >>> + >>> + for (int i = 0; i < 20; i++) { >>> + std::string* s = q.dequeue(); >>> + ASSERT_EQ(*s, stringify(i)); >>> + delete s; >>> + } >>> +} >>> + >>> + >>> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultithreaded) >>> +{ >>> + process::MpscLinkedQueue<std::string> q; >>> + std::vector<std::thread> threads; >>> + for (int t = 0; t < 5; t++) { >>> + threads.push_back( >>> + std::thread([t, &q]() { >>> + int start = t * 1000; >>> + int end = start + 1000; >>> + for (int i = start; i < end; i++) { >>> + q.enqueue(new std::string(stringify(i))); >>> + } >>> + })); >>> + } >>> + >>> + std::for_each(threads.begin(), threads.end(), [](std::thread& t) { >>> + t.join(); >>> + }); >>> + >>> + std::set<std::string> elements; >>> + >>> + std::string* s = nullptr; >>> + while ((s = q.dequeue()) != nullptr) { >>> + elements.insert(*s); >>> + } >>> + >>> + ASSERT_EQ(5000UL, elements.size()); >>> + >>> + for (int i = 0; i < 5000; i++) { >>> + ASSERT_NE(elements.end(), elements.find(stringify(i))); >>> + } >>> +} >>> + >>> + >>> +TEST(MpscLinkedQueueTest, ForEach) >>> +{ >>> + process::MpscLinkedQueue<std::string> q; >>> + for (int i = 0; i < 20; i++) { >>> + q.enqueue(new std::string(stringify(i))); >>> + } >>> + int i = 0; >>> + q.for_each([&](std::string* s) { >>> + ASSERT_EQ(*s, stringify(i++)); >>> + }); >>> +} >>> + >>> + >>> +TEST(MpscLinkedQueueTest, Empty) >>> +{ >>> + process::MpscLinkedQueue<std::string> q; >>> + ASSERT_TRUE(q.empty()); >>> + std::string* s = new std::string("test"); >>> + q.enqueue(s); >>> + ASSERT_FALSE(q.empty()); >>> + q.dequeue(); >>> + ASSERT_TRUE(q.empty()); >>> + delete s; >>> +} >>> >> >