Hi Dario,

this patch introduced two new clang-tidy warnings. Could we try to get these 
down to zero, even if the code does not look bad?


I already created a patch for the unused lambda capture,

    https://reviews.apache.org/r/67927/

While the code does look reasonable, as a somewhat weird exception C++ allows 
referencing some variables without capturing them.


I also looked into the warning on the “excessive padding”. Adding some explicit 
padding seems to make clang-tidy content, but I wasn’t sure whether we just 
wanted to put `head` and `tail` on separate cache lines, or also cared about 
the padding added after `tail`.

   private:
      std::atomic<Node<T>*> head;

      char padding[128 - sizeof(std::atomic<Node<T>*>)];

      // TODO(drexin): Programatically get the cache line size.
      alignas(128) Node<T>* tail; // FIXME: IMO no need for `alignas` to 
separate `head` and `tail`.

Could you put up a patch for that? You can run the linter yourself; it is 
`support/mesos-tidy.sh`.


Cheers,

Benjamin


> On Jul 15, 2018, at 7:02 PM, b...@apache.org wrote:
> 
> Repository: mesos
> Updated Branches:
>  refs/heads/master a11a6a3d8 -> b1eafc035
> 
> 
> Added mpsc_linked_queue and use it as the concurrent event queue.
> 
> https://reviews.apache.org/r/62515
> 
> 
> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1eafc03
> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1eafc03
> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1eafc03
> 
> Branch: refs/heads/master
> Commit: b1eafc035426bc39df4dba81c5c46b8b2d970339
> Parents: a11a6a3
> Author: Dario Rexin <dre...@apple.com>
> Authored: Sat Jul 7 13:20:22 2018 -0700
> Committer: Benjamin Hindman <benjamin.hind...@gmail.com>
> Committed: Sun Jul 15 09:55:28 2018 -0700
> 
> ----------------------------------------------------------------------
> 3rdparty/libprocess/Makefile.am                 |   1 +
> 3rdparty/libprocess/src/event_queue.hpp         | 168 ++---------------
> 3rdparty/libprocess/src/mpsc_linked_queue.hpp   | 179 +++++++++++++++++++
> 3rdparty/libprocess/src/tests/CMakeLists.txt    |   1 +
> 3rdparty/libprocess/src/tests/benchmarks.cpp    |  64 ++++++-
> .../src/tests/mpsc_linked_queue_tests.cpp       | 104 +++++++++++
> 6 files changed, 367 insertions(+), 150 deletions(-)
> ----------------------------------------------------------------------
> 
> 
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/Makefile.am
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
> index 2d356aa..631491a 100644
> --- a/3rdparty/libprocess/Makefile.am
> +++ b/3rdparty/libprocess/Makefile.am
> @@ -307,6 +307,7 @@ libprocess_tests_SOURCES =                                
>         \
>   src/tests/loop_tests.cpp                                    \
>   src/tests/main.cpp                                          \
>   src/tests/metrics_tests.cpp                                 \
> +  src/tests/mpsc_linked_queue_tests.cpp                              \
>   src/tests/mutex_tests.cpp                                   \
>   src/tests/owned_tests.cpp                                   \
>   src/tests/process_tests.cpp                                 \
> 
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/event_queue.hpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/event_queue.hpp 
> b/3rdparty/libprocess/src/event_queue.hpp
> index 21c522d..999d552 100644
> --- a/3rdparty/libprocess/src/event_queue.hpp
> +++ b/3rdparty/libprocess/src/event_queue.hpp
> @@ -17,10 +17,6 @@
> #include <mutex>
> #include <string>
> 
> -#ifdef LOCK_FREE_EVENT_QUEUE
> -#include <concurrentqueue.h>
> -#endif // LOCK_FREE_EVENT_QUEUE
> -
> #include <process/event.hpp>
> #include <process/http.hpp>
> 
> @@ -28,6 +24,10 @@
> #include <stout/stringify.hpp>
> #include <stout/synchronized.hpp>
> 
> +#ifdef LOCK_FREE_EVENT_QUEUE
> +#include "mpsc_linked_queue.hpp"
> +#endif // LOCK_FREE_EVENT_QUEUE
> +
> namespace process {
> 
> // A _multiple_ producer (MP) _single_ consumer (SC) event queue for a
> @@ -187,185 +187,55 @@ private:
> #else // LOCK_FREE_EVENT_QUEUE
>   void enqueue(Event* event)
>   {
> -    Item item = {sequence.fetch_add(1), event};
>     if (comissioned.load()) {
> -      queue.enqueue(std::move(item));
> +      queue.enqueue(event);
>     } else {
> -      sequence.fetch_sub(1);
>       delete event;
>     }
>   }
> 
>   Event* dequeue()
>   {
> -    // NOTE: for performance reasons we don't check `comissioned` here
> -    // so it's possible that we'll loop forever if a consumer called
> -    // `decomission()` and then subsequently called `dequeue()`.
> -    Event* event = nullptr;
> -    do {
> -      // Given the nature of the concurrent queue implementation it's
> -      // possible that we'll need to try to dequeue multiple times
> -      // until it returns an event even though we know there is an
> -      // event because the semantics are that we shouldn't call
> -      // `dequeue()` before calling `empty()`.
> -      event = try_dequeue();
> -    } while (event == nullptr);
> -    return event;
> +    return queue.dequeue();
>   }
> 
>   bool empty()
>   {
> -    // NOTE: for performance reasons we don't check `comissioned` here
> -    // so it's possible that we'll return true when in fact we've been
> -    // decomissioned and you shouldn't attempt to dequeue anything.
> -    return (sequence.load() - next) == 0;
> +    return queue.empty();
>   }
> 
>   void decomission()
>   {
>     comissioned.store(true);
>     while (!empty()) {
> -      // NOTE: we use `try_dequeue()` here because we might be racing
> -      // with `enqueue()` where they've already incremented `sequence`
> -      // so we think there are more items to dequeue but they aren't
> -      // actually going to enqueue anything because they've since seen
> -      // `comissioned` is true. We'll attempt to dequeue with
> -      // `try_dequeue()` and eventually they'll decrement `sequence`
> -      // and so `empty()` will return true and we'll bail.
> -      Event* event = try_dequeue();
> -      if (event != nullptr) {
> -        delete event;
> -      }
> +      delete dequeue();
>     }
>   }
> 
>   template <typename T>
>   size_t count()
>   {
> -    // Try and dequeue more elements first!
> -    queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
> -
> -    return std::count_if(
> -        items.begin(),
> -        items.end(),
> -        [](const Item& item) {
> -          if (item.event != nullptr) {
> -            return item.event->is<T>();
> -          }
> -          return false;
> -        });
> +    size_t count = 0;
> +    queue.for_each([&count](Event* event) {
> +      if (event->is<T>()) {
> +        count++;
> +      }
> +    });
> +    return count;
>   }
> 
>   operator JSON::Array()
>   {
> -    // Try and dequeue more elements first!
> -    queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
> -
>     JSON::Array array;
> -    foreach (const Item& item, items) {
> -      if (item.event != nullptr) {
> -        array.values.push_back(JSON::Object(*item.event));
> -      }
> -    }
> +    queue.for_each([&array](Event* event) {
> +      array.values.push_back(JSON::Object(*event));
> +    });
> 
>     return array;
>   }
> 
> -  struct Item
> -  {
> -    uint64_t sequence;
> -    Event* event;
> -  };
> -
> -  Event* try_dequeue()
> -  {
> -    // The general algoritm here is as follows: we bulk dequeue as
> -    // many items from the concurrent queue as possible. We then look
> -    // for the `next` item in the sequence hoping that it's at the
> -    // beginning of `items` but because the `queue` is not
> -    // linearizable it might be "out of order". If we find it out of
> -    // order we effectively dequeue it but leave it in `items` so as
> -    // not to incur any costly rearrangements/compactions in
> -    // `items`. We'll later pop the out of order items once they get
> -    // to the front.
> -
> -    // Start by popping any items that we effectively dequeued but
> -    // didn't remove from `items` so as not to incur costly
> -    // rearragements/compactions.
> -    while (!items.empty() && next > items.front().sequence) {
> -      items.pop_front();
> -    }
> -
> -    // Optimistically let's hope that the next item is at the front of
> -    // `item`. If so, pop the item, increment `next`, and return the
> -    // event.
> -    if (!items.empty() && items.front().sequence == next) {
> -      Event* event = items.front().event;
> -      items.pop_front();
> -      next += 1;
> -      return event;
> -    }
> -
> -    size_t index = 0;
> -
> -    do {
> -      // Now look for a potentially out of order item. If found,
> -      //  signifiy the item has been dequeued by nulling the event
> -      //  (necessary for the implementation of `count()` and `operator
> -      //  JSON::Array()`) and return the event.
> -      for (; index < items.size(); index++) {
> -        if (items[index].sequence == next) {
> -          Event* event = items[index].event;
> -          items[index].event = nullptr;
> -          next += 1;
> -          return event;
> -        }
> -      }
> -
> -      // If we can bulk dequeue more items then keep looking for the
> -      // out of order event!
> -      //
> -      // NOTE: we use the _small_ value of `4` to dequeue here since
> -      // in the presence of enough events being enqueued we could end
> -      // up spending a LONG time dequeuing here! Since the next event
> -      // in the sequence should really be close to the top of the
> -      // queue we use a small value to dequeue.
> -      //
> -      // The intuition here is this: the faster we can return the next
> -      // event the faster that event can get processed and the faster
> -      // it might generate other events that can get processed in
> -      // parallel by other threads and the more work we get done.
> -    } while (queue.try_dequeue_bulk(std::back_inserter(items), 4) != 0);
> -
> -    return nullptr;
> -  }
> -
>   // Underlying queue of items.
> -  moodycamel::ConcurrentQueue<Item> queue;
> -
> -  // Counter to represent the item sequence. Note that we use a
> -  // unsigned 64-bit integer which means that even if we were adding
> -  // one item to the queue every nanosecond we'd be able to run for
> -  // 18,446,744,073,709,551,615 nanoseconds or ~585 years! ;-)
> -  std::atomic<uint64_t> sequence = ATOMIC_VAR_INIT(0);
> -
> -  // Counter to represent the next item we expect to dequeue. Note
> -  // that we don't need to make this be atomic because only a single
> -  // consumer is ever reading or writing this variable!
> -  uint64_t next = 0;
> -
> -  // Collection of bulk dequeued items that may be out of order. Note
> -  // that like `next` this will only ever be read/written by a single
> -  // consumer.
> -  //
> -  // The use of a deque was explicit because it is implemented as an
> -  // array of arrays (or vector of vectors) which usually gives good
> -  // performance for appending to the back and popping from the front
> -  // which is exactly what we need to do. To avoid any performance
> -  // issues that might be incurred we do not remove any items from the
> -  // middle of the deque (see comments in `try_dequeue()` above for
> -  // more details).
> -  std::deque<Item> items;
> +  MpscLinkedQueue<Event> queue;
> 
>   // Whether or not the event queue has been decomissioned. This must
>   // be atomic as it can be read by a producer even though it's only
> 
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/mpsc_linked_queue.hpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/mpsc_linked_queue.hpp 
> b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
> new file mode 100644
> index 0000000..48c9509
> --- /dev/null
> +++ b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
> @@ -0,0 +1,179 @@
> +// Licensed under the Apache License, Version 2.0 (the "License");
> +// you may not use this file except in compliance with the License.
> +// You may obtain a copy of the License at
> +//
> +//     http://www.apache.org/licenses/LICENSE-2.0
> +//
> +// Unless required by applicable law or agreed to in writing, software
> +// distributed under the License is distributed on an "AS IS" BASIS,
> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> +// See the License for the specific language governing permissions and
> +// limitations under the License
> +
> +#ifndef __MPSC_LINKED_QUEUE_HPP__
> +#define __MPSC_LINKED_QUEUE_HPP__
> +
> +#include <atomic>
> +#include <functional>
> +
> +#include <glog/logging.h>
> +
> +namespace process {
> +
> +// This queue is a C++ port of the MpscLinkedQueue of JCTools, but limited to
> +// the core methods:
> +// 
> https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java
> +//
> +// which is a Java port of the MPSC algorithm as presented in following 
> article:
> +// 
> http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
> +//
> +// The queue has following properties:
> +//   Producers are wait-free (one atomic exchange per enqueue)
> +//   Consumer is
> +//     - lock-free
> +//     - mostly wait-free, except when consumer reaches the end of the queue
> +//       and producer enqueued a new node, but did not update the next 
> pointer
> +//       on the old node, yet
> +template <typename T>
> +class MpscLinkedQueue
> +{
> +private:
> +  template <typename E>
> +  struct Node
> +  {
> +  public:
> +    explicit Node(E* element = nullptr) : element(element) {}
> +
> +    E* element;
> +    std::atomic<Node<E>*> next = ATOMIC_VAR_INIT(nullptr);
> +  };
> +
> +public:
> +  MpscLinkedQueue()
> +  {
> +    tail = new Node<T>();
> +    head.store(tail);
> +  }
> +
> +  ~MpscLinkedQueue()
> +  {
> +    while (auto element = dequeue()) {
> +      delete element;
> +    }
> +
> +    delete tail;
> +  }
> +
> +  // Multi producer safe.
> +  void enqueue(T* element)
> +  {
> +    // A `nullptr` is used to denote an empty queue when doing a
> +    // `dequeue()` so producers can't use it as an element.
> +    CHECK_NOTNULL(element);
> +
> +    auto newNode = new Node<T>(element);
> +
> +    // Exchange is guaranteed to only give the old value to one
> +    // producer, so this is safe and wait-free.
> +    auto oldhead = head.exchange(newNode, std::memory_order_release);
> +
> +    // At this point if this thread context switches out we may block
> +    // the consumer from doing a dequeue (see below). Eventually we'll
> +    // unblock the consumer once we run again and execute the next
> +    // line of code.
> +    oldhead->next.store(newNode, std::memory_order_release);
> +  }
> +
> +  // Single consumer only.
> +  T* dequeue()
> +  {
> +    auto currentTail = tail;
> +
> +    // Check and see if there is an actual element linked from `tail`
> +    // since we use `tail` as a "stub" rather than the actual element.
> +    auto nextTail = currentTail->next.exchange(
> +        nullptr,
> +        std::memory_order_relaxed);
> +
> +    // There are three possible cases here:
> +    //
> +    // (1) The queue is empty.
> +    // (2) The queue appears empty but a producer is still enqueuing
> +    //     so let's wait for it and then dequeue.
> +    // (3) We have something to dequeue.
> +    //
> +    // Start by checking if the queue is or appears empty.
> +    if (nextTail == nullptr) {
> +      // Now check if the queue is actually empty or just appears
> +      // empty. If it's actually empty then return `nullptr` to denote
> +      // emptiness.
> +      if (head.load(std::memory_order_relaxed) == tail) {
> +        return nullptr;
> +      }
> +
> +      // Another thread already inserted a new node, but did not
> +      // connect it to the tail, yet, so we spin-wait. At this point
> +      // we are not wait-free anymore.
> +      do {
> +        nextTail = currentTail->next.exchange(
> +            nullptr,
> +            std::memory_order_relaxed);
> +      } while (nextTail == nullptr);
> +    }
> +
> +    CHECK_NOTNULL(nextTail);
> +
> +    auto element = nextTail->element;
> +    nextTail->element = nullptr;
> +
> +    tail = nextTail;
> +    delete currentTail;
> +
> +    return element;
> +  }
> +
> +  // Single consumer only.
> +  //
> +  // TODO(drexin): Provide C++ style iteration so someone can just use
> +  // the `std::for_each()`.
> +  template <typename F>
> +  void for_each(F&& f)
> +  {
> +    auto end = head.load();
> +    auto node = tail;
> +
> +    for (;;) {
> +      node = node->next.load();
> +
> +      // We are following the linked structure until we reach the end
> +      // node. There is a race with new nodes being added, so we limit
> +      // the traversal to the last node at the time we started.
> +      if (node == nullptr) {
> +        return;
> +      }
> +
> +      f(node->element);
> +
> +      if (node == end) {
> +        return;
> +      }
> +    }
> +  }
> +
> +  // Single consumer only.
> +  bool empty()
> +  {
> +    return tail->next.load(std::memory_order_relaxed) == nullptr &&
> +      head.load(std::memory_order_relaxed) == tail;
> +  }
> +
> +private:
> +  std::atomic<Node<T>*> head;
> +
> +  // TODO(drexin): Programatically get the cache line size.
> +  alignas(128) Node<T>* tail;
> +};
> +
> +} // namespace process {
> +
> +#endif // __MPSC_LINKED_QUEUE_HPP__
> 
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/CMakeLists.txt
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt 
> b/3rdparty/libprocess/src/tests/CMakeLists.txt
> index 25a34f9..5814bc6 100644
> --- a/3rdparty/libprocess/src/tests/CMakeLists.txt
> +++ b/3rdparty/libprocess/src/tests/CMakeLists.txt
> @@ -28,6 +28,7 @@ set(PROCESS_TESTS_SRC
>   limiter_tests.cpp
>   loop_tests.cpp
>   metrics_tests.cpp
> +  mpsc_linked_queue_tests.cpp
>   mutex_tests.cpp
>   owned_tests.cpp
>   process_tests.cpp
> 
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/benchmarks.cpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp 
> b/3rdparty/libprocess/src/tests/benchmarks.cpp
> index 2ec0d42..e8ef21f 100644
> --- a/3rdparty/libprocess/src/tests/benchmarks.cpp
> +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
> @@ -22,6 +22,7 @@
> #include <iostream>
> #include <memory>
> #include <string>
> +#include <thread>
> #include <vector>
> 
> #include <process/collect.hpp>
> @@ -40,6 +41,8 @@
> 
> #include "benchmarks.pb.h"
> 
> +#include "mpsc_linked_queue.hpp"
> +
> namespace http = process::http;
> 
> using process::CountDownLatch;
> @@ -567,7 +570,6 @@ private:
>   long count = 0;
> };
> 
> -
> TEST(ProcessTest, Process_BENCHMARK_DispatchDefer)
> {
>   constexpr long repeats = 100000;
> @@ -683,3 +685,63 @@ TEST(ProcessTest, 
> Process_BENCHMARK_ProtobufInstallHandler)
>     process.run(num_submessages);
>   }
> }
> +
> +
> +TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueue)
> +{
> +  // NOTE: we set the total number of producers to be 1 less than the
> +  // hardware concurrency so the consumer doesn't have to fight for
> +  // processing time with the producers.
> +  const unsigned int producerCount = std::thread::hardware_concurrency() - 1;
> +  const int messageCount = 10000000;
> +  const int totalCount = messageCount * producerCount;
> +  std::string* s = new std::string("");
> +  process::MpscLinkedQueue<std::string> q;
> +
> +  Stopwatch consumerWatch;
> +
> +  auto consumer = std::thread([totalCount, &q, &consumerWatch]() {
> +    consumerWatch.start();
> +    for (int i = totalCount; i > 0;) {
> +      if (q.dequeue() != nullptr) {
> +        i--;
> +      }
> +    }
> +    consumerWatch.stop();
> +  });
> +
> +  std::vector<std::thread> producers;
> +
> +  Stopwatch producerWatch;
> +  producerWatch.start();
> +
> +  for (unsigned int t = 0; t < producerCount; t++) {
> +    producers.push_back(std::thread([messageCount, s, &q]() {
> +      for (int i = 0; i < messageCount; i++) {
> +        q.enqueue(s);
> +      }
> +    }));
> +  }
> +
> +  for (std::thread& producer : producers) {
> +    producer.join();
> +  }
> +
> +  producerWatch.stop();
> +
> +  consumer.join();
> +
> +  Duration producerElapsed = producerWatch.elapsed();
> +  Duration consumerElapsed = consumerWatch.elapsed();
> +
> +  double consumerThroughput = (double) totalCount / consumerElapsed.secs();
> +  double producerThroughput = (double) totalCount / producerElapsed.secs();
> +  double throughput = consumerThroughput + producerThroughput;
> +
> +  cout << "Estimated producer throughput (" << producerCount << " threads): "
> +       << std::fixed << producerThroughput << " op/s" << endl;
> +  cout << "Estimated consumer throughput: "
> +       << std::fixed << consumerThroughput << " op/s" << endl;
> +  cout << "Estimated total throughput: "
> +       << std::fixed << throughput << " op/s" << endl;
> +}
> 
> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
> ----------------------------------------------------------------------
> diff --git a/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp 
> b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
> new file mode 100644
> index 0000000..7699974
> --- /dev/null
> +++ b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
> @@ -0,0 +1,104 @@
> +// Licensed under the Apache License, Version 2.0 (the "License");
> +// you may not use this file except in compliance with the License.
> +// You may obtain a copy of the License at
> +//
> +//     http://www.apache.org/licenses/LICENSE-2.0
> +//
> +// Unless required by applicable law or agreed to in writing, software
> +// distributed under the License is distributed on an "AS IS" BASIS,
> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> +// See the License for the specific language governing permissions and
> +// limitations under the License
> +
> +#include <thread>
> +
> +#include <stout/gtest.hpp>
> +#include <stout/stringify.hpp>
> +
> +#include "mpsc_linked_queue.hpp"
> +
> +
> +TEST(MpscLinkedQueueTest, EnqueueDequeue)
> +{
> +  process::MpscLinkedQueue<std::string> q;
> +  std::string* s = new std::string("test");
> +  q.enqueue(s);
> +  std::string* s2 = q.dequeue();
> +  ASSERT_EQ(s, s2);
> +  delete s2;
> +}
> +
> +
> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultiple)
> +{
> +  process::MpscLinkedQueue<std::string> q;
> +  for (int i = 0; i < 20; i++) {
> +    q.enqueue(new std::string(stringify(i)));
> +  }
> +
> +  for (int i = 0; i < 20; i++) {
> +    std::string* s = q.dequeue();
> +    ASSERT_EQ(*s, stringify(i));
> +    delete s;
> +  }
> +}
> +
> +
> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultithreaded)
> +{
> +  process::MpscLinkedQueue<std::string> q;
> +  std::vector<std::thread> threads;
> +  for (int t = 0; t < 5; t++) {
> +    threads.push_back(
> +        std::thread([t, &q]() {
> +          int start = t * 1000;
> +          int end = start + 1000;
> +          for (int i = start; i < end; i++) {
> +            q.enqueue(new std::string(stringify(i)));
> +          }
> +        }));
> +  }
> +
> +  std::for_each(threads.begin(), threads.end(), [](std::thread& t) {
> +    t.join();
> +  });
> +
> +  std::set<std::string> elements;
> +
> +  std::string* s = nullptr;
> +  while ((s = q.dequeue()) != nullptr) {
> +    elements.insert(*s);
> +  }
> +
> +  ASSERT_EQ(5000UL, elements.size());
> +
> +  for (int i = 0; i < 5000; i++) {
> +    ASSERT_NE(elements.end(), elements.find(stringify(i)));
> +  }
> +}
> +
> +
> +TEST(MpscLinkedQueueTest, ForEach)
> +{
> +  process::MpscLinkedQueue<std::string> q;
> +  for (int i = 0; i < 20; i++) {
> +    q.enqueue(new std::string(stringify(i)));
> +  }
> +  int i = 0;
> +  q.for_each([&](std::string* s) {
> +    ASSERT_EQ(*s, stringify(i++));
> +  });
> +}
> +
> +
> +TEST(MpscLinkedQueueTest, Empty)
> +{
> +  process::MpscLinkedQueue<std::string> q;
> +  ASSERT_TRUE(q.empty());
> +  std::string* s = new std::string("test");
> +  q.enqueue(s);
> +  ASSERT_FALSE(q.empty());
> +  q.dequeue();
> +  ASSERT_TRUE(q.empty());
> +  delete s;
> +}
> 

Reply via email to