Btw. I use 128 bytes here because of PowerPC. We don’t depend on it to perform 
well on PPC, but since there has been some work around PPC support I thought 
I’d make it work for it as well. If you think we should default to x86 cache 
line size, I’ll change that as well. 

> On Jul 16, 2018, at 8:39 AM, Dario Rexin <dre...@apple.com> wrote:
> 
> Hi Benjamin,
> 
> see comments inline.
> 
>> On Jul 16, 2018, at 5:48 AM, Benjamin Bannier 
>> <benjamin.bann...@mesosphere.io> wrote:
>> 
>> Hi Dario,
>> 
>> this patch introduced two new clang-tidy warnings. Could we try to get these 
>> down to zero, even if the code does not look bad?
>> 
>> 
>> I already created a patch for the unused lambda capture,
>> 
>>   https://reviews.apache.org/r/67927/
>> 
>> While the code does look reasonable, as a somewhat weird exception C++ 
>> allows referencing some variables without capturing them.
>> 
> 
> Yes, I was a bit confused by the MSVC error there. I added the explicit 
> capture because I thought it would be preferable over implicit capture, but 
> I’m fine with either. Thanks for creating the patch!
> 
>> 
>> I also looked into the warning on the “excessive padding”. Adding some 
>> explicit padding seems to make clang-tidy content, but I wasn’t sure whether 
>> we just wanted to put `head` and `tail` on separate cache lines, or also 
>> cared about the padding added after `tail`.
>> 
>>  private:
>>     std::atomic<Node<T>*> head;
>> 
>>     char padding[128 - sizeof(std::atomic<Node<T>*>)];
>> 
>>     // TODO(drexin): Programatically get the cache line size.
>>     alignas(128) Node<T>* tail; // FIXME: IMO no need for `alignas` to 
>> separate `head` and `tail`.
>> 
>> Could you put up a patch for that? You can run the linter yourself; it is 
>> `support/mesos-tidy.sh`.
>> 
> 
> That’s interesting. The padding after tail is a good point, we should 
> definitely add that to prevent false sharing. If we add the padding, is 
> alignas still necessary?
> 
> Thanks,
> Dario
> 
>> 
>> Cheers,
>> 
>> Benjamin
>> 
>> 
>>> On Jul 15, 2018, at 7:02 PM, b...@apache.org wrote:
>>> 
>>> Repository: mesos
>>> Updated Branches:
>>> refs/heads/master a11a6a3d8 -> b1eafc035
>>> 
>>> 
>>> Added mpsc_linked_queue and use it as the concurrent event queue.
>>> 
>>> https://reviews.apache.org/r/62515
>>> 
>>> 
>>> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
>>> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1eafc03
>>> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1eafc03
>>> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1eafc03
>>> 
>>> Branch: refs/heads/master
>>> Commit: b1eafc035426bc39df4dba81c5c46b8b2d970339
>>> Parents: a11a6a3
>>> Author: Dario Rexin <dre...@apple.com>
>>> Authored: Sat Jul 7 13:20:22 2018 -0700
>>> Committer: Benjamin Hindman <benjamin.hind...@gmail.com>
>>> Committed: Sun Jul 15 09:55:28 2018 -0700
>>> 
>>> ----------------------------------------------------------------------
>>> 3rdparty/libprocess/Makefile.am                 |   1 +
>>> 3rdparty/libprocess/src/event_queue.hpp         | 168 ++---------------
>>> 3rdparty/libprocess/src/mpsc_linked_queue.hpp   | 179 +++++++++++++++++++
>>> 3rdparty/libprocess/src/tests/CMakeLists.txt    |   1 +
>>> 3rdparty/libprocess/src/tests/benchmarks.cpp    |  64 ++++++-
>>> .../src/tests/mpsc_linked_queue_tests.cpp       | 104 +++++++++++
>>> 6 files changed, 367 insertions(+), 150 deletions(-)
>>> ----------------------------------------------------------------------
>>> 
>>> 
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/Makefile.am
>>> ----------------------------------------------------------------------
>>> diff --git a/3rdparty/libprocess/Makefile.am 
>>> b/3rdparty/libprocess/Makefile.am
>>> index 2d356aa..631491a 100644
>>> --- a/3rdparty/libprocess/Makefile.am
>>> +++ b/3rdparty/libprocess/Makefile.am
>>> @@ -307,6 +307,7 @@ libprocess_tests_SOURCES =                              
>>>         \
>>> src/tests/loop_tests.cpp                                    \
>>> src/tests/main.cpp                                          \
>>> src/tests/metrics_tests.cpp                                 \
>>> +  src/tests/mpsc_linked_queue_tests.cpp                            \
>>> src/tests/mutex_tests.cpp                                   \
>>> src/tests/owned_tests.cpp                                   \
>>> src/tests/process_tests.cpp                                 \
>>> 
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/event_queue.hpp
>>> ----------------------------------------------------------------------
>>> diff --git a/3rdparty/libprocess/src/event_queue.hpp 
>>> b/3rdparty/libprocess/src/event_queue.hpp
>>> index 21c522d..999d552 100644
>>> --- a/3rdparty/libprocess/src/event_queue.hpp
>>> +++ b/3rdparty/libprocess/src/event_queue.hpp
>>> @@ -17,10 +17,6 @@
>>> #include <mutex>
>>> #include <string>
>>> 
>>> -#ifdef LOCK_FREE_EVENT_QUEUE
>>> -#include <concurrentqueue.h>
>>> -#endif // LOCK_FREE_EVENT_QUEUE
>>> -
>>> #include <process/event.hpp>
>>> #include <process/http.hpp>
>>> 
>>> @@ -28,6 +24,10 @@
>>> #include <stout/stringify.hpp>
>>> #include <stout/synchronized.hpp>
>>> 
>>> +#ifdef LOCK_FREE_EVENT_QUEUE
>>> +#include "mpsc_linked_queue.hpp"
>>> +#endif // LOCK_FREE_EVENT_QUEUE
>>> +
>>> namespace process {
>>> 
>>> // A _multiple_ producer (MP) _single_ consumer (SC) event queue for a
>>> @@ -187,185 +187,55 @@ private:
>>> #else // LOCK_FREE_EVENT_QUEUE
>>> void enqueue(Event* event)
>>> {
>>> -    Item item = {sequence.fetch_add(1), event};
>>>   if (comissioned.load()) {
>>> -      queue.enqueue(std::move(item));
>>> +      queue.enqueue(event);
>>>   } else {
>>> -      sequence.fetch_sub(1);
>>>     delete event;
>>>   }
>>> }
>>> 
>>> Event* dequeue()
>>> {
>>> -    // NOTE: for performance reasons we don't check `comissioned` here
>>> -    // so it's possible that we'll loop forever if a consumer called
>>> -    // `decomission()` and then subsequently called `dequeue()`.
>>> -    Event* event = nullptr;
>>> -    do {
>>> -      // Given the nature of the concurrent queue implementation it's
>>> -      // possible that we'll need to try to dequeue multiple times
>>> -      // until it returns an event even though we know there is an
>>> -      // event because the semantics are that we shouldn't call
>>> -      // `dequeue()` before calling `empty()`.
>>> -      event = try_dequeue();
>>> -    } while (event == nullptr);
>>> -    return event;
>>> +    return queue.dequeue();
>>> }
>>> 
>>> bool empty()
>>> {
>>> -    // NOTE: for performance reasons we don't check `comissioned` here
>>> -    // so it's possible that we'll return true when in fact we've been
>>> -    // decomissioned and you shouldn't attempt to dequeue anything.
>>> -    return (sequence.load() - next) == 0;
>>> +    return queue.empty();
>>> }
>>> 
>>> void decomission()
>>> {
>>>   comissioned.store(true);
>>>   while (!empty()) {
>>> -      // NOTE: we use `try_dequeue()` here because we might be racing
>>> -      // with `enqueue()` where they've already incremented `sequence`
>>> -      // so we think there are more items to dequeue but they aren't
>>> -      // actually going to enqueue anything because they've since seen
>>> -      // `comissioned` is true. We'll attempt to dequeue with
>>> -      // `try_dequeue()` and eventually they'll decrement `sequence`
>>> -      // and so `empty()` will return true and we'll bail.
>>> -      Event* event = try_dequeue();
>>> -      if (event != nullptr) {
>>> -        delete event;
>>> -      }
>>> +      delete dequeue();
>>>   }
>>> }
>>> 
>>> template <typename T>
>>> size_t count()
>>> {
>>> -    // Try and dequeue more elements first!
>>> -    queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
>>> -
>>> -    return std::count_if(
>>> -        items.begin(),
>>> -        items.end(),
>>> -        [](const Item& item) {
>>> -          if (item.event != nullptr) {
>>> -            return item.event->is<T>();
>>> -          }
>>> -          return false;
>>> -        });
>>> +    size_t count = 0;
>>> +    queue.for_each([&count](Event* event) {
>>> +      if (event->is<T>()) {
>>> +        count++;
>>> +      }
>>> +    });
>>> +    return count;
>>> }
>>> 
>>> operator JSON::Array()
>>> {
>>> -    // Try and dequeue more elements first!
>>> -    queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
>>> -
>>>   JSON::Array array;
>>> -    foreach (const Item& item, items) {
>>> -      if (item.event != nullptr) {
>>> -        array.values.push_back(JSON::Object(*item.event));
>>> -      }
>>> -    }
>>> +    queue.for_each([&array](Event* event) {
>>> +      array.values.push_back(JSON::Object(*event));
>>> +    });
>>> 
>>>   return array;
>>> }
>>> 
>>> -  struct Item
>>> -  {
>>> -    uint64_t sequence;
>>> -    Event* event;
>>> -  };
>>> -
>>> -  Event* try_dequeue()
>>> -  {
>>> -    // The general algoritm here is as follows: we bulk dequeue as
>>> -    // many items from the concurrent queue as possible. We then look
>>> -    // for the `next` item in the sequence hoping that it's at the
>>> -    // beginning of `items` but because the `queue` is not
>>> -    // linearizable it might be "out of order". If we find it out of
>>> -    // order we effectively dequeue it but leave it in `items` so as
>>> -    // not to incur any costly rearrangements/compactions in
>>> -    // `items`. We'll later pop the out of order items once they get
>>> -    // to the front.
>>> -
>>> -    // Start by popping any items that we effectively dequeued but
>>> -    // didn't remove from `items` so as not to incur costly
>>> -    // rearragements/compactions.
>>> -    while (!items.empty() && next > items.front().sequence) {
>>> -      items.pop_front();
>>> -    }
>>> -
>>> -    // Optimistically let's hope that the next item is at the front of
>>> -    // `item`. If so, pop the item, increment `next`, and return the
>>> -    // event.
>>> -    if (!items.empty() && items.front().sequence == next) {
>>> -      Event* event = items.front().event;
>>> -      items.pop_front();
>>> -      next += 1;
>>> -      return event;
>>> -    }
>>> -
>>> -    size_t index = 0;
>>> -
>>> -    do {
>>> -      // Now look for a potentially out of order item. If found,
>>> -      //  signifiy the item has been dequeued by nulling the event
>>> -      //  (necessary for the implementation of `count()` and `operator
>>> -      //  JSON::Array()`) and return the event.
>>> -      for (; index < items.size(); index++) {
>>> -        if (items[index].sequence == next) {
>>> -          Event* event = items[index].event;
>>> -          items[index].event = nullptr;
>>> -          next += 1;
>>> -          return event;
>>> -        }
>>> -      }
>>> -
>>> -      // If we can bulk dequeue more items then keep looking for the
>>> -      // out of order event!
>>> -      //
>>> -      // NOTE: we use the _small_ value of `4` to dequeue here since
>>> -      // in the presence of enough events being enqueued we could end
>>> -      // up spending a LONG time dequeuing here! Since the next event
>>> -      // in the sequence should really be close to the top of the
>>> -      // queue we use a small value to dequeue.
>>> -      //
>>> -      // The intuition here is this: the faster we can return the next
>>> -      // event the faster that event can get processed and the faster
>>> -      // it might generate other events that can get processed in
>>> -      // parallel by other threads and the more work we get done.
>>> -    } while (queue.try_dequeue_bulk(std::back_inserter(items), 4) != 0);
>>> -
>>> -    return nullptr;
>>> -  }
>>> -
>>> // Underlying queue of items.
>>> -  moodycamel::ConcurrentQueue<Item> queue;
>>> -
>>> -  // Counter to represent the item sequence. Note that we use a
>>> -  // unsigned 64-bit integer which means that even if we were adding
>>> -  // one item to the queue every nanosecond we'd be able to run for
>>> -  // 18,446,744,073,709,551,615 nanoseconds or ~585 years! ;-)
>>> -  std::atomic<uint64_t> sequence = ATOMIC_VAR_INIT(0);
>>> -
>>> -  // Counter to represent the next item we expect to dequeue. Note
>>> -  // that we don't need to make this be atomic because only a single
>>> -  // consumer is ever reading or writing this variable!
>>> -  uint64_t next = 0;
>>> -
>>> -  // Collection of bulk dequeued items that may be out of order. Note
>>> -  // that like `next` this will only ever be read/written by a single
>>> -  // consumer.
>>> -  //
>>> -  // The use of a deque was explicit because it is implemented as an
>>> -  // array of arrays (or vector of vectors) which usually gives good
>>> -  // performance for appending to the back and popping from the front
>>> -  // which is exactly what we need to do. To avoid any performance
>>> -  // issues that might be incurred we do not remove any items from the
>>> -  // middle of the deque (see comments in `try_dequeue()` above for
>>> -  // more details).
>>> -  std::deque<Item> items;
>>> +  MpscLinkedQueue<Event> queue;
>>> 
>>> // Whether or not the event queue has been decomissioned. This must
>>> // be atomic as it can be read by a producer even though it's only
>>> 
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/mpsc_linked_queue.hpp
>>> ----------------------------------------------------------------------
>>> diff --git a/3rdparty/libprocess/src/mpsc_linked_queue.hpp 
>>> b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
>>> new file mode 100644
>>> index 0000000..48c9509
>>> --- /dev/null
>>> +++ b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
>>> @@ -0,0 +1,179 @@
>>> +// Licensed under the Apache License, Version 2.0 (the "License");
>>> +// you may not use this file except in compliance with the License.
>>> +// You may obtain a copy of the License at
>>> +//
>>> +//     http://www.apache.org/licenses/LICENSE-2.0
>>> +//
>>> +// Unless required by applicable law or agreed to in writing, software
>>> +// distributed under the License is distributed on an "AS IS" BASIS,
>>> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> +// See the License for the specific language governing permissions and
>>> +// limitations under the License
>>> +
>>> +#ifndef __MPSC_LINKED_QUEUE_HPP__
>>> +#define __MPSC_LINKED_QUEUE_HPP__
>>> +
>>> +#include <atomic>
>>> +#include <functional>
>>> +
>>> +#include <glog/logging.h>
>>> +
>>> +namespace process {
>>> +
>>> +// This queue is a C++ port of the MpscLinkedQueue of JCTools, but limited 
>>> to
>>> +// the core methods:
>>> +// 
>>> https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java
>>> +//
>>> +// which is a Java port of the MPSC algorithm as presented in following 
>>> article:
>>> +// 
>>> http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
>>> +//
>>> +// The queue has following properties:
>>> +//   Producers are wait-free (one atomic exchange per enqueue)
>>> +//   Consumer is
>>> +//     - lock-free
>>> +//     - mostly wait-free, except when consumer reaches the end of the 
>>> queue
>>> +//       and producer enqueued a new node, but did not update the next 
>>> pointer
>>> +//       on the old node, yet
>>> +template <typename T>
>>> +class MpscLinkedQueue
>>> +{
>>> +private:
>>> +  template <typename E>
>>> +  struct Node
>>> +  {
>>> +  public:
>>> +    explicit Node(E* element = nullptr) : element(element) {}
>>> +
>>> +    E* element;
>>> +    std::atomic<Node<E>*> next = ATOMIC_VAR_INIT(nullptr);
>>> +  };
>>> +
>>> +public:
>>> +  MpscLinkedQueue()
>>> +  {
>>> +    tail = new Node<T>();
>>> +    head.store(tail);
>>> +  }
>>> +
>>> +  ~MpscLinkedQueue()
>>> +  {
>>> +    while (auto element = dequeue()) {
>>> +      delete element;
>>> +    }
>>> +
>>> +    delete tail;
>>> +  }
>>> +
>>> +  // Multi producer safe.
>>> +  void enqueue(T* element)
>>> +  {
>>> +    // A `nullptr` is used to denote an empty queue when doing a
>>> +    // `dequeue()` so producers can't use it as an element.
>>> +    CHECK_NOTNULL(element);
>>> +
>>> +    auto newNode = new Node<T>(element);
>>> +
>>> +    // Exchange is guaranteed to only give the old value to one
>>> +    // producer, so this is safe and wait-free.
>>> +    auto oldhead = head.exchange(newNode, std::memory_order_release);
>>> +
>>> +    // At this point if this thread context switches out we may block
>>> +    // the consumer from doing a dequeue (see below). Eventually we'll
>>> +    // unblock the consumer once we run again and execute the next
>>> +    // line of code.
>>> +    oldhead->next.store(newNode, std::memory_order_release);
>>> +  }
>>> +
>>> +  // Single consumer only.
>>> +  T* dequeue()
>>> +  {
>>> +    auto currentTail = tail;
>>> +
>>> +    // Check and see if there is an actual element linked from `tail`
>>> +    // since we use `tail` as a "stub" rather than the actual element.
>>> +    auto nextTail = currentTail->next.exchange(
>>> +        nullptr,
>>> +        std::memory_order_relaxed);
>>> +
>>> +    // There are three possible cases here:
>>> +    //
>>> +    // (1) The queue is empty.
>>> +    // (2) The queue appears empty but a producer is still enqueuing
>>> +    //     so let's wait for it and then dequeue.
>>> +    // (3) We have something to dequeue.
>>> +    //
>>> +    // Start by checking if the queue is or appears empty.
>>> +    if (nextTail == nullptr) {
>>> +      // Now check if the queue is actually empty or just appears
>>> +      // empty. If it's actually empty then return `nullptr` to denote
>>> +      // emptiness.
>>> +      if (head.load(std::memory_order_relaxed) == tail) {
>>> +        return nullptr;
>>> +      }
>>> +
>>> +      // Another thread already inserted a new node, but did not
>>> +      // connect it to the tail, yet, so we spin-wait. At this point
>>> +      // we are not wait-free anymore.
>>> +      do {
>>> +        nextTail = currentTail->next.exchange(
>>> +            nullptr,
>>> +            std::memory_order_relaxed);
>>> +      } while (nextTail == nullptr);
>>> +    }
>>> +
>>> +    CHECK_NOTNULL(nextTail);
>>> +
>>> +    auto element = nextTail->element;
>>> +    nextTail->element = nullptr;
>>> +
>>> +    tail = nextTail;
>>> +    delete currentTail;
>>> +
>>> +    return element;
>>> +  }
>>> +
>>> +  // Single consumer only.
>>> +  //
>>> +  // TODO(drexin): Provide C++ style iteration so someone can just use
>>> +  // the `std::for_each()`.
>>> +  template <typename F>
>>> +  void for_each(F&& f)
>>> +  {
>>> +    auto end = head.load();
>>> +    auto node = tail;
>>> +
>>> +    for (;;) {
>>> +      node = node->next.load();
>>> +
>>> +      // We are following the linked structure until we reach the end
>>> +      // node. There is a race with new nodes being added, so we limit
>>> +      // the traversal to the last node at the time we started.
>>> +      if (node == nullptr) {
>>> +        return;
>>> +      }
>>> +
>>> +      f(node->element);
>>> +
>>> +      if (node == end) {
>>> +        return;
>>> +      }
>>> +    }
>>> +  }
>>> +
>>> +  // Single consumer only.
>>> +  bool empty()
>>> +  {
>>> +    return tail->next.load(std::memory_order_relaxed) == nullptr &&
>>> +      head.load(std::memory_order_relaxed) == tail;
>>> +  }
>>> +
>>> +private:
>>> +  std::atomic<Node<T>*> head;
>>> +
>>> +  // TODO(drexin): Programatically get the cache line size.
>>> +  alignas(128) Node<T>* tail;
>>> +};
>>> +
>>> +} // namespace process {
>>> +
>>> +#endif // __MPSC_LINKED_QUEUE_HPP__
>>> 
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/CMakeLists.txt
>>> ----------------------------------------------------------------------
>>> diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt 
>>> b/3rdparty/libprocess/src/tests/CMakeLists.txt
>>> index 25a34f9..5814bc6 100644
>>> --- a/3rdparty/libprocess/src/tests/CMakeLists.txt
>>> +++ b/3rdparty/libprocess/src/tests/CMakeLists.txt
>>> @@ -28,6 +28,7 @@ set(PROCESS_TESTS_SRC
>>> limiter_tests.cpp
>>> loop_tests.cpp
>>> metrics_tests.cpp
>>> +  mpsc_linked_queue_tests.cpp
>>> mutex_tests.cpp
>>> owned_tests.cpp
>>> process_tests.cpp
>>> 
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/benchmarks.cpp
>>> ----------------------------------------------------------------------
>>> diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp 
>>> b/3rdparty/libprocess/src/tests/benchmarks.cpp
>>> index 2ec0d42..e8ef21f 100644
>>> --- a/3rdparty/libprocess/src/tests/benchmarks.cpp
>>> +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
>>> @@ -22,6 +22,7 @@
>>> #include <iostream>
>>> #include <memory>
>>> #include <string>
>>> +#include <thread>
>>> #include <vector>
>>> 
>>> #include <process/collect.hpp>
>>> @@ -40,6 +41,8 @@
>>> 
>>> #include "benchmarks.pb.h"
>>> 
>>> +#include "mpsc_linked_queue.hpp"
>>> +
>>> namespace http = process::http;
>>> 
>>> using process::CountDownLatch;
>>> @@ -567,7 +570,6 @@ private:
>>> long count = 0;
>>> };
>>> 
>>> -
>>> TEST(ProcessTest, Process_BENCHMARK_DispatchDefer)
>>> {
>>> constexpr long repeats = 100000;
>>> @@ -683,3 +685,63 @@ TEST(ProcessTest, 
>>> Process_BENCHMARK_ProtobufInstallHandler)
>>>   process.run(num_submessages);
>>> }
>>> }
>>> +
>>> +
>>> +TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueue)
>>> +{
>>> +  // NOTE: we set the total number of producers to be 1 less than the
>>> +  // hardware concurrency so the consumer doesn't have to fight for
>>> +  // processing time with the producers.
>>> +  const unsigned int producerCount = std::thread::hardware_concurrency() - 
>>> 1;
>>> +  const int messageCount = 10000000;
>>> +  const int totalCount = messageCount * producerCount;
>>> +  std::string* s = new std::string("");
>>> +  process::MpscLinkedQueue<std::string> q;
>>> +
>>> +  Stopwatch consumerWatch;
>>> +
>>> +  auto consumer = std::thread([totalCount, &q, &consumerWatch]() {
>>> +    consumerWatch.start();
>>> +    for (int i = totalCount; i > 0;) {
>>> +      if (q.dequeue() != nullptr) {
>>> +        i--;
>>> +      }
>>> +    }
>>> +    consumerWatch.stop();
>>> +  });
>>> +
>>> +  std::vector<std::thread> producers;
>>> +
>>> +  Stopwatch producerWatch;
>>> +  producerWatch.start();
>>> +
>>> +  for (unsigned int t = 0; t < producerCount; t++) {
>>> +    producers.push_back(std::thread([messageCount, s, &q]() {
>>> +      for (int i = 0; i < messageCount; i++) {
>>> +        q.enqueue(s);
>>> +      }
>>> +    }));
>>> +  }
>>> +
>>> +  for (std::thread& producer : producers) {
>>> +    producer.join();
>>> +  }
>>> +
>>> +  producerWatch.stop();
>>> +
>>> +  consumer.join();
>>> +
>>> +  Duration producerElapsed = producerWatch.elapsed();
>>> +  Duration consumerElapsed = consumerWatch.elapsed();
>>> +
>>> +  double consumerThroughput = (double) totalCount / consumerElapsed.secs();
>>> +  double producerThroughput = (double) totalCount / producerElapsed.secs();
>>> +  double throughput = consumerThroughput + producerThroughput;
>>> +
>>> +  cout << "Estimated producer throughput (" << producerCount << " 
>>> threads): "
>>> +       << std::fixed << producerThroughput << " op/s" << endl;
>>> +  cout << "Estimated consumer throughput: "
>>> +       << std::fixed << consumerThroughput << " op/s" << endl;
>>> +  cout << "Estimated total throughput: "
>>> +       << std::fixed << throughput << " op/s" << endl;
>>> +}
>>> 
>>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
>>> ----------------------------------------------------------------------
>>> diff --git a/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp 
>>> b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
>>> new file mode 100644
>>> index 0000000..7699974
>>> --- /dev/null
>>> +++ b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
>>> @@ -0,0 +1,104 @@
>>> +// Licensed under the Apache License, Version 2.0 (the "License");
>>> +// you may not use this file except in compliance with the License.
>>> +// You may obtain a copy of the License at
>>> +//
>>> +//     http://www.apache.org/licenses/LICENSE-2.0
>>> +//
>>> +// Unless required by applicable law or agreed to in writing, software
>>> +// distributed under the License is distributed on an "AS IS" BASIS,
>>> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> +// See the License for the specific language governing permissions and
>>> +// limitations under the License
>>> +
>>> +#include <thread>
>>> +
>>> +#include <stout/gtest.hpp>
>>> +#include <stout/stringify.hpp>
>>> +
>>> +#include "mpsc_linked_queue.hpp"
>>> +
>>> +
>>> +TEST(MpscLinkedQueueTest, EnqueueDequeue)
>>> +{
>>> +  process::MpscLinkedQueue<std::string> q;
>>> +  std::string* s = new std::string("test");
>>> +  q.enqueue(s);
>>> +  std::string* s2 = q.dequeue();
>>> +  ASSERT_EQ(s, s2);
>>> +  delete s2;
>>> +}
>>> +
>>> +
>>> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultiple)
>>> +{
>>> +  process::MpscLinkedQueue<std::string> q;
>>> +  for (int i = 0; i < 20; i++) {
>>> +    q.enqueue(new std::string(stringify(i)));
>>> +  }
>>> +
>>> +  for (int i = 0; i < 20; i++) {
>>> +    std::string* s = q.dequeue();
>>> +    ASSERT_EQ(*s, stringify(i));
>>> +    delete s;
>>> +  }
>>> +}
>>> +
>>> +
>>> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultithreaded)
>>> +{
>>> +  process::MpscLinkedQueue<std::string> q;
>>> +  std::vector<std::thread> threads;
>>> +  for (int t = 0; t < 5; t++) {
>>> +    threads.push_back(
>>> +        std::thread([t, &q]() {
>>> +          int start = t * 1000;
>>> +          int end = start + 1000;
>>> +          for (int i = start; i < end; i++) {
>>> +            q.enqueue(new std::string(stringify(i)));
>>> +          }
>>> +        }));
>>> +  }
>>> +
>>> +  std::for_each(threads.begin(), threads.end(), [](std::thread& t) {
>>> +    t.join();
>>> +  });
>>> +
>>> +  std::set<std::string> elements;
>>> +
>>> +  std::string* s = nullptr;
>>> +  while ((s = q.dequeue()) != nullptr) {
>>> +    elements.insert(*s);
>>> +  }
>>> +
>>> +  ASSERT_EQ(5000UL, elements.size());
>>> +
>>> +  for (int i = 0; i < 5000; i++) {
>>> +    ASSERT_NE(elements.end(), elements.find(stringify(i)));
>>> +  }
>>> +}
>>> +
>>> +
>>> +TEST(MpscLinkedQueueTest, ForEach)
>>> +{
>>> +  process::MpscLinkedQueue<std::string> q;
>>> +  for (int i = 0; i < 20; i++) {
>>> +    q.enqueue(new std::string(stringify(i)));
>>> +  }
>>> +  int i = 0;
>>> +  q.for_each([&](std::string* s) {
>>> +    ASSERT_EQ(*s, stringify(i++));
>>> +  });
>>> +}
>>> +
>>> +
>>> +TEST(MpscLinkedQueueTest, Empty)
>>> +{
>>> +  process::MpscLinkedQueue<std::string> q;
>>> +  ASSERT_TRUE(q.empty());
>>> +  std::string* s = new std::string("test");
>>> +  q.enqueue(s);
>>> +  ASSERT_FALSE(q.empty());
>>> +  q.dequeue();
>>> +  ASSERT_TRUE(q.empty());
>>> +  delete s;
>>> +}
>>> 
>> 
> 

Reply via email to