Hi Dario,
this patch introduced two new clang-tidy warnings. Could we try to get these
down to zero, even if the code does not look bad?
I already created a patch for the unused lambda capture,
https://reviews.apache.org/r/67927/
While the code does look reasonable, as a somewhat weird exception C++ allows
referencing some variables without capturing them.
I also looked into the warning on the “excessive padding”. Adding some explicit
padding seems to make clang-tidy content, but I wasn’t sure whether we just
wanted to put `head` and `tail` on separate cache lines, or also cared about
the padding added after `tail`.
private:
std::atomic<Node<T>*> head;
char padding[128 - sizeof(std::atomic<Node<T>*>)];
// TODO(drexin): Programatically get the cache line size.
alignas(128) Node<T>* tail; // FIXME: IMO no need for `alignas` to
separate `head` and `tail`.
Could you put up a patch for that? You can run the linter yourself; it is
`support/mesos-tidy.sh`.
Cheers,
Benjamin
> On Jul 15, 2018, at 7:02 PM, [email protected] wrote:
>
> Repository: mesos
> Updated Branches:
> refs/heads/master a11a6a3d8 -> b1eafc035
>
>
> Added mpsc_linked_queue and use it as the concurrent event queue.
>
> https://reviews.apache.org/r/62515
>
>
> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1eafc03
> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1eafc03
> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1eafc03
>
> Branch: refs/heads/master
> Commit: b1eafc035426bc39df4dba81c5c46b8b2d970339
> Parents: a11a6a3
> Author: Dario Rexin <[email protected]>
> Authored: Sat Jul 7 13:20:22 2018 -0700
> Committer: Benjamin Hindman <[email protected]>
> Committed: Sun Jul 15 09:55:28 2018 -0700
>
> ----------------------------------------------------------------------
> 3rdparty/libprocess/Makefile.am | 1 +
> 3rdparty/libprocess/src/event_queue.hpp | 168 ++---------------
> 3rdparty/libprocess/src/mpsc_linked_queue.hpp | 179 +++++++++++++++++++
> 3rdparty/libprocess/src/tests/CMakeLists.txt | 1 +
> 3rdparty/libprocess/src/tests/benchmarks.cpp | 64 ++++++-
> .../src/tests/mpsc_linked_queue_tests.cpp | 104 +++++++++++
> 6 files changed, 367 insertions(+), 150 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/Makefile.am
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
> index 2d356aa..631491a 100644
> --- a/3rdparty/libprocess/Makefile.am
> +++ b/3rdparty/libprocess/Makefile.am
> @@ -307,6 +307,7 @@ libprocess_tests_SOURCES =
> \
> src/tests/loop_tests.cpp \
> src/tests/main.cpp \
> src/tests/metrics_tests.cpp \
> + src/tests/mpsc_linked_queue_tests.cpp \
> src/tests/mutex_tests.cpp \
> src/tests/owned_tests.cpp \
> src/tests/process_tests.cpp \
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/event_queue.hpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/event_queue.hpp
> b/3rdparty/libprocess/src/event_queue.hpp
> index 21c522d..999d552 100644
> --- a/3rdparty/libprocess/src/event_queue.hpp
> +++ b/3rdparty/libprocess/src/event_queue.hpp
> @@ -17,10 +17,6 @@
> #include <mutex>
> #include <string>
>
> -#ifdef LOCK_FREE_EVENT_QUEUE
> -#include <concurrentqueue.h>
> -#endif // LOCK_FREE_EVENT_QUEUE
> -
> #include <process/event.hpp>
> #include <process/http.hpp>
>
> @@ -28,6 +24,10 @@
> #include <stout/stringify.hpp>
> #include <stout/synchronized.hpp>
>
> +#ifdef LOCK_FREE_EVENT_QUEUE
> +#include "mpsc_linked_queue.hpp"
> +#endif // LOCK_FREE_EVENT_QUEUE
> +
> namespace process {
>
> // A _multiple_ producer (MP) _single_ consumer (SC) event queue for a
> @@ -187,185 +187,55 @@ private:
> #else // LOCK_FREE_EVENT_QUEUE
> void enqueue(Event* event)
> {
> - Item item = {sequence.fetch_add(1), event};
> if (comissioned.load()) {
> - queue.enqueue(std::move(item));
> + queue.enqueue(event);
> } else {
> - sequence.fetch_sub(1);
> delete event;
> }
> }
>
> Event* dequeue()
> {
> - // NOTE: for performance reasons we don't check `comissioned` here
> - // so it's possible that we'll loop forever if a consumer called
> - // `decomission()` and then subsequently called `dequeue()`.
> - Event* event = nullptr;
> - do {
> - // Given the nature of the concurrent queue implementation it's
> - // possible that we'll need to try to dequeue multiple times
> - // until it returns an event even though we know there is an
> - // event because the semantics are that we shouldn't call
> - // `dequeue()` before calling `empty()`.
> - event = try_dequeue();
> - } while (event == nullptr);
> - return event;
> + return queue.dequeue();
> }
>
> bool empty()
> {
> - // NOTE: for performance reasons we don't check `comissioned` here
> - // so it's possible that we'll return true when in fact we've been
> - // decomissioned and you shouldn't attempt to dequeue anything.
> - return (sequence.load() - next) == 0;
> + return queue.empty();
> }
>
> void decomission()
> {
> comissioned.store(true);
> while (!empty()) {
> - // NOTE: we use `try_dequeue()` here because we might be racing
> - // with `enqueue()` where they've already incremented `sequence`
> - // so we think there are more items to dequeue but they aren't
> - // actually going to enqueue anything because they've since seen
> - // `comissioned` is true. We'll attempt to dequeue with
> - // `try_dequeue()` and eventually they'll decrement `sequence`
> - // and so `empty()` will return true and we'll bail.
> - Event* event = try_dequeue();
> - if (event != nullptr) {
> - delete event;
> - }
> + delete dequeue();
> }
> }
>
> template <typename T>
> size_t count()
> {
> - // Try and dequeue more elements first!
> - queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
> -
> - return std::count_if(
> - items.begin(),
> - items.end(),
> - [](const Item& item) {
> - if (item.event != nullptr) {
> - return item.event->is<T>();
> - }
> - return false;
> - });
> + size_t count = 0;
> + queue.for_each([&count](Event* event) {
> + if (event->is<T>()) {
> + count++;
> + }
> + });
> + return count;
> }
>
> operator JSON::Array()
> {
> - // Try and dequeue more elements first!
> - queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
> -
> JSON::Array array;
> - foreach (const Item& item, items) {
> - if (item.event != nullptr) {
> - array.values.push_back(JSON::Object(*item.event));
> - }
> - }
> + queue.for_each([&array](Event* event) {
> + array.values.push_back(JSON::Object(*event));
> + });
>
> return array;
> }
>
> - struct Item
> - {
> - uint64_t sequence;
> - Event* event;
> - };
> -
> - Event* try_dequeue()
> - {
> - // The general algoritm here is as follows: we bulk dequeue as
> - // many items from the concurrent queue as possible. We then look
> - // for the `next` item in the sequence hoping that it's at the
> - // beginning of `items` but because the `queue` is not
> - // linearizable it might be "out of order". If we find it out of
> - // order we effectively dequeue it but leave it in `items` so as
> - // not to incur any costly rearrangements/compactions in
> - // `items`. We'll later pop the out of order items once they get
> - // to the front.
> -
> - // Start by popping any items that we effectively dequeued but
> - // didn't remove from `items` so as not to incur costly
> - // rearragements/compactions.
> - while (!items.empty() && next > items.front().sequence) {
> - items.pop_front();
> - }
> -
> - // Optimistically let's hope that the next item is at the front of
> - // `item`. If so, pop the item, increment `next`, and return the
> - // event.
> - if (!items.empty() && items.front().sequence == next) {
> - Event* event = items.front().event;
> - items.pop_front();
> - next += 1;
> - return event;
> - }
> -
> - size_t index = 0;
> -
> - do {
> - // Now look for a potentially out of order item. If found,
> - // signifiy the item has been dequeued by nulling the event
> - // (necessary for the implementation of `count()` and `operator
> - // JSON::Array()`) and return the event.
> - for (; index < items.size(); index++) {
> - if (items[index].sequence == next) {
> - Event* event = items[index].event;
> - items[index].event = nullptr;
> - next += 1;
> - return event;
> - }
> - }
> -
> - // If we can bulk dequeue more items then keep looking for the
> - // out of order event!
> - //
> - // NOTE: we use the _small_ value of `4` to dequeue here since
> - // in the presence of enough events being enqueued we could end
> - // up spending a LONG time dequeuing here! Since the next event
> - // in the sequence should really be close to the top of the
> - // queue we use a small value to dequeue.
> - //
> - // The intuition here is this: the faster we can return the next
> - // event the faster that event can get processed and the faster
> - // it might generate other events that can get processed in
> - // parallel by other threads and the more work we get done.
> - } while (queue.try_dequeue_bulk(std::back_inserter(items), 4) != 0);
> -
> - return nullptr;
> - }
> -
> // Underlying queue of items.
> - moodycamel::ConcurrentQueue<Item> queue;
> -
> - // Counter to represent the item sequence. Note that we use a
> - // unsigned 64-bit integer which means that even if we were adding
> - // one item to the queue every nanosecond we'd be able to run for
> - // 18,446,744,073,709,551,615 nanoseconds or ~585 years! ;-)
> - std::atomic<uint64_t> sequence = ATOMIC_VAR_INIT(0);
> -
> - // Counter to represent the next item we expect to dequeue. Note
> - // that we don't need to make this be atomic because only a single
> - // consumer is ever reading or writing this variable!
> - uint64_t next = 0;
> -
> - // Collection of bulk dequeued items that may be out of order. Note
> - // that like `next` this will only ever be read/written by a single
> - // consumer.
> - //
> - // The use of a deque was explicit because it is implemented as an
> - // array of arrays (or vector of vectors) which usually gives good
> - // performance for appending to the back and popping from the front
> - // which is exactly what we need to do. To avoid any performance
> - // issues that might be incurred we do not remove any items from the
> - // middle of the deque (see comments in `try_dequeue()` above for
> - // more details).
> - std::deque<Item> items;
> + MpscLinkedQueue<Event> queue;
>
> // Whether or not the event queue has been decomissioned. This must
> // be atomic as it can be read by a producer even though it's only
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/mpsc_linked_queue.hpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/mpsc_linked_queue.hpp
> b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
> new file mode 100644
> index 0000000..48c9509
> --- /dev/null
> +++ b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
> @@ -0,0 +1,179 @@
> +// Licensed under the Apache License, Version 2.0 (the "License");
> +// you may not use this file except in compliance with the License.
> +// You may obtain a copy of the License at
> +//
> +// http://www.apache.org/licenses/LICENSE-2.0
> +//
> +// Unless required by applicable law or agreed to in writing, software
> +// distributed under the License is distributed on an "AS IS" BASIS,
> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> +// See the License for the specific language governing permissions and
> +// limitations under the License
> +
> +#ifndef __MPSC_LINKED_QUEUE_HPP__
> +#define __MPSC_LINKED_QUEUE_HPP__
> +
> +#include <atomic>
> +#include <functional>
> +
> +#include <glog/logging.h>
> +
> +namespace process {
> +
> +// This queue is a C++ port of the MpscLinkedQueue of JCTools, but limited to
> +// the core methods:
> +//
> https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java
> +//
> +// which is a Java port of the MPSC algorithm as presented in following
> article:
> +//
> http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
> +//
> +// The queue has following properties:
> +// Producers are wait-free (one atomic exchange per enqueue)
> +// Consumer is
> +// - lock-free
> +// - mostly wait-free, except when consumer reaches the end of the queue
> +// and producer enqueued a new node, but did not update the next
> pointer
> +// on the old node, yet
> +template <typename T>
> +class MpscLinkedQueue
> +{
> +private:
> + template <typename E>
> + struct Node
> + {
> + public:
> + explicit Node(E* element = nullptr) : element(element) {}
> +
> + E* element;
> + std::atomic<Node<E>*> next = ATOMIC_VAR_INIT(nullptr);
> + };
> +
> +public:
> + MpscLinkedQueue()
> + {
> + tail = new Node<T>();
> + head.store(tail);
> + }
> +
> + ~MpscLinkedQueue()
> + {
> + while (auto element = dequeue()) {
> + delete element;
> + }
> +
> + delete tail;
> + }
> +
> + // Multi producer safe.
> + void enqueue(T* element)
> + {
> + // A `nullptr` is used to denote an empty queue when doing a
> + // `dequeue()` so producers can't use it as an element.
> + CHECK_NOTNULL(element);
> +
> + auto newNode = new Node<T>(element);
> +
> + // Exchange is guaranteed to only give the old value to one
> + // producer, so this is safe and wait-free.
> + auto oldhead = head.exchange(newNode, std::memory_order_release);
> +
> + // At this point if this thread context switches out we may block
> + // the consumer from doing a dequeue (see below). Eventually we'll
> + // unblock the consumer once we run again and execute the next
> + // line of code.
> + oldhead->next.store(newNode, std::memory_order_release);
> + }
> +
> + // Single consumer only.
> + T* dequeue()
> + {
> + auto currentTail = tail;
> +
> + // Check and see if there is an actual element linked from `tail`
> + // since we use `tail` as a "stub" rather than the actual element.
> + auto nextTail = currentTail->next.exchange(
> + nullptr,
> + std::memory_order_relaxed);
> +
> + // There are three possible cases here:
> + //
> + // (1) The queue is empty.
> + // (2) The queue appears empty but a producer is still enqueuing
> + // so let's wait for it and then dequeue.
> + // (3) We have something to dequeue.
> + //
> + // Start by checking if the queue is or appears empty.
> + if (nextTail == nullptr) {
> + // Now check if the queue is actually empty or just appears
> + // empty. If it's actually empty then return `nullptr` to denote
> + // emptiness.
> + if (head.load(std::memory_order_relaxed) == tail) {
> + return nullptr;
> + }
> +
> + // Another thread already inserted a new node, but did not
> + // connect it to the tail, yet, so we spin-wait. At this point
> + // we are not wait-free anymore.
> + do {
> + nextTail = currentTail->next.exchange(
> + nullptr,
> + std::memory_order_relaxed);
> + } while (nextTail == nullptr);
> + }
> +
> + CHECK_NOTNULL(nextTail);
> +
> + auto element = nextTail->element;
> + nextTail->element = nullptr;
> +
> + tail = nextTail;
> + delete currentTail;
> +
> + return element;
> + }
> +
> + // Single consumer only.
> + //
> + // TODO(drexin): Provide C++ style iteration so someone can just use
> + // the `std::for_each()`.
> + template <typename F>
> + void for_each(F&& f)
> + {
> + auto end = head.load();
> + auto node = tail;
> +
> + for (;;) {
> + node = node->next.load();
> +
> + // We are following the linked structure until we reach the end
> + // node. There is a race with new nodes being added, so we limit
> + // the traversal to the last node at the time we started.
> + if (node == nullptr) {
> + return;
> + }
> +
> + f(node->element);
> +
> + if (node == end) {
> + return;
> + }
> + }
> + }
> +
> + // Single consumer only.
> + bool empty()
> + {
> + return tail->next.load(std::memory_order_relaxed) == nullptr &&
> + head.load(std::memory_order_relaxed) == tail;
> + }
> +
> +private:
> + std::atomic<Node<T>*> head;
> +
> + // TODO(drexin): Programatically get the cache line size.
> + alignas(128) Node<T>* tail;
> +};
> +
> +} // namespace process {
> +
> +#endif // __MPSC_LINKED_QUEUE_HPP__
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/CMakeLists.txt
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt
> b/3rdparty/libprocess/src/tests/CMakeLists.txt
> index 25a34f9..5814bc6 100644
> --- a/3rdparty/libprocess/src/tests/CMakeLists.txt
> +++ b/3rdparty/libprocess/src/tests/CMakeLists.txt
> @@ -28,6 +28,7 @@ set(PROCESS_TESTS_SRC
> limiter_tests.cpp
> loop_tests.cpp
> metrics_tests.cpp
> + mpsc_linked_queue_tests.cpp
> mutex_tests.cpp
> owned_tests.cpp
> process_tests.cpp
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/benchmarks.cpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp
> b/3rdparty/libprocess/src/tests/benchmarks.cpp
> index 2ec0d42..e8ef21f 100644
> --- a/3rdparty/libprocess/src/tests/benchmarks.cpp
> +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
> @@ -22,6 +22,7 @@
> #include <iostream>
> #include <memory>
> #include <string>
> +#include <thread>
> #include <vector>
>
> #include <process/collect.hpp>
> @@ -40,6 +41,8 @@
>
> #include "benchmarks.pb.h"
>
> +#include "mpsc_linked_queue.hpp"
> +
> namespace http = process::http;
>
> using process::CountDownLatch;
> @@ -567,7 +570,6 @@ private:
> long count = 0;
> };
>
> -
> TEST(ProcessTest, Process_BENCHMARK_DispatchDefer)
> {
> constexpr long repeats = 100000;
> @@ -683,3 +685,63 @@ TEST(ProcessTest,
> Process_BENCHMARK_ProtobufInstallHandler)
> process.run(num_submessages);
> }
> }
> +
> +
> +TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueue)
> +{
> + // NOTE: we set the total number of producers to be 1 less than the
> + // hardware concurrency so the consumer doesn't have to fight for
> + // processing time with the producers.
> + const unsigned int producerCount = std::thread::hardware_concurrency() - 1;
> + const int messageCount = 10000000;
> + const int totalCount = messageCount * producerCount;
> + std::string* s = new std::string("");
> + process::MpscLinkedQueue<std::string> q;
> +
> + Stopwatch consumerWatch;
> +
> + auto consumer = std::thread([totalCount, &q, &consumerWatch]() {
> + consumerWatch.start();
> + for (int i = totalCount; i > 0;) {
> + if (q.dequeue() != nullptr) {
> + i--;
> + }
> + }
> + consumerWatch.stop();
> + });
> +
> + std::vector<std::thread> producers;
> +
> + Stopwatch producerWatch;
> + producerWatch.start();
> +
> + for (unsigned int t = 0; t < producerCount; t++) {
> + producers.push_back(std::thread([messageCount, s, &q]() {
> + for (int i = 0; i < messageCount; i++) {
> + q.enqueue(s);
> + }
> + }));
> + }
> +
> + for (std::thread& producer : producers) {
> + producer.join();
> + }
> +
> + producerWatch.stop();
> +
> + consumer.join();
> +
> + Duration producerElapsed = producerWatch.elapsed();
> + Duration consumerElapsed = consumerWatch.elapsed();
> +
> + double consumerThroughput = (double) totalCount / consumerElapsed.secs();
> + double producerThroughput = (double) totalCount / producerElapsed.secs();
> + double throughput = consumerThroughput + producerThroughput;
> +
> + cout << "Estimated producer throughput (" << producerCount << " threads): "
> + << std::fixed << producerThroughput << " op/s" << endl;
> + cout << "Estimated consumer throughput: "
> + << std::fixed << consumerThroughput << " op/s" << endl;
> + cout << "Estimated total throughput: "
> + << std::fixed << throughput << " op/s" << endl;
> +}
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
> b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
> new file mode 100644
> index 0000000..7699974
> --- /dev/null
> +++ b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
> @@ -0,0 +1,104 @@
> +// Licensed under the Apache License, Version 2.0 (the "License");
> +// you may not use this file except in compliance with the License.
> +// You may obtain a copy of the License at
> +//
> +// http://www.apache.org/licenses/LICENSE-2.0
> +//
> +// Unless required by applicable law or agreed to in writing, software
> +// distributed under the License is distributed on an "AS IS" BASIS,
> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> +// See the License for the specific language governing permissions and
> +// limitations under the License
> +
> +#include <thread>
> +
> +#include <stout/gtest.hpp>
> +#include <stout/stringify.hpp>
> +
> +#include "mpsc_linked_queue.hpp"
> +
> +
> +TEST(MpscLinkedQueueTest, EnqueueDequeue)
> +{
> + process::MpscLinkedQueue<std::string> q;
> + std::string* s = new std::string("test");
> + q.enqueue(s);
> + std::string* s2 = q.dequeue();
> + ASSERT_EQ(s, s2);
> + delete s2;
> +}
> +
> +
> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultiple)
> +{
> + process::MpscLinkedQueue<std::string> q;
> + for (int i = 0; i < 20; i++) {
> + q.enqueue(new std::string(stringify(i)));
> + }
> +
> + for (int i = 0; i < 20; i++) {
> + std::string* s = q.dequeue();
> + ASSERT_EQ(*s, stringify(i));
> + delete s;
> + }
> +}
> +
> +
> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultithreaded)
> +{
> + process::MpscLinkedQueue<std::string> q;
> + std::vector<std::thread> threads;
> + for (int t = 0; t < 5; t++) {
> + threads.push_back(
> + std::thread([t, &q]() {
> + int start = t * 1000;
> + int end = start + 1000;
> + for (int i = start; i < end; i++) {
> + q.enqueue(new std::string(stringify(i)));
> + }
> + }));
> + }
> +
> + std::for_each(threads.begin(), threads.end(), [](std::thread& t) {
> + t.join();
> + });
> +
> + std::set<std::string> elements;
> +
> + std::string* s = nullptr;
> + while ((s = q.dequeue()) != nullptr) {
> + elements.insert(*s);
> + }
> +
> + ASSERT_EQ(5000UL, elements.size());
> +
> + for (int i = 0; i < 5000; i++) {
> + ASSERT_NE(elements.end(), elements.find(stringify(i)));
> + }
> +}
> +
> +
> +TEST(MpscLinkedQueueTest, ForEach)
> +{
> + process::MpscLinkedQueue<std::string> q;
> + for (int i = 0; i < 20; i++) {
> + q.enqueue(new std::string(stringify(i)));
> + }
> + int i = 0;
> + q.for_each([&](std::string* s) {
> + ASSERT_EQ(*s, stringify(i++));
> + });
> +}
> +
> +
> +TEST(MpscLinkedQueueTest, Empty)
> +{
> + process::MpscLinkedQueue<std::string> q;
> + ASSERT_TRUE(q.empty());
> + std::string* s = new std::string("test");
> + q.enqueue(s);
> + ASSERT_FALSE(q.empty());
> + q.dequeue();
> + ASSERT_TRUE(q.empty());
> + delete s;
> +}
>