Hi Dario, this patch introduced two new clang-tidy warnings. Could we try to get these down to zero, even if the code does not look bad?
I already created a patch for the unused lambda capture, https://reviews.apache.org/r/67927/ While the code does look reasonable, as a somewhat weird exception C++ allows referencing some variables without capturing them. I also looked into the warning on the “excessive padding”. Adding some explicit padding seems to make clang-tidy content, but I wasn’t sure whether we just wanted to put `head` and `tail` on separate cache lines, or also cared about the padding added after `tail`. private: std::atomic<Node<T>*> head; char padding[128 - sizeof(std::atomic<Node<T>*>)]; // TODO(drexin): Programatically get the cache line size. alignas(128) Node<T>* tail; // FIXME: IMO no need for `alignas` to separate `head` and `tail`. Could you put up a patch for that? You can run the linter yourself; it is `support/mesos-tidy.sh`. Cheers, Benjamin > On Jul 15, 2018, at 7:02 PM, b...@apache.org wrote: > > Repository: mesos > Updated Branches: > refs/heads/master a11a6a3d8 -> b1eafc035 > > > Added mpsc_linked_queue and use it as the concurrent event queue. > > https://reviews.apache.org/r/62515 > > > Project: http://git-wip-us.apache.org/repos/asf/mesos/repo > Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1eafc03 > Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1eafc03 > Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1eafc03 > > Branch: refs/heads/master > Commit: b1eafc035426bc39df4dba81c5c46b8b2d970339 > Parents: a11a6a3 > Author: Dario Rexin <dre...@apple.com> > Authored: Sat Jul 7 13:20:22 2018 -0700 > Committer: Benjamin Hindman <benjamin.hind...@gmail.com> > Committed: Sun Jul 15 09:55:28 2018 -0700 > > ---------------------------------------------------------------------- > 3rdparty/libprocess/Makefile.am | 1 + > 3rdparty/libprocess/src/event_queue.hpp | 168 ++--------------- > 3rdparty/libprocess/src/mpsc_linked_queue.hpp | 179 +++++++++++++++++++ > 3rdparty/libprocess/src/tests/CMakeLists.txt | 1 + > 3rdparty/libprocess/src/tests/benchmarks.cpp | 64 ++++++- > .../src/tests/mpsc_linked_queue_tests.cpp | 104 +++++++++++ > 6 files changed, 367 insertions(+), 150 deletions(-) > ---------------------------------------------------------------------- > > > http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/Makefile.am > ---------------------------------------------------------------------- > diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am > index 2d356aa..631491a 100644 > --- a/3rdparty/libprocess/Makefile.am > +++ b/3rdparty/libprocess/Makefile.am > @@ -307,6 +307,7 @@ libprocess_tests_SOURCES = > \ > src/tests/loop_tests.cpp \ > src/tests/main.cpp \ > src/tests/metrics_tests.cpp \ > + src/tests/mpsc_linked_queue_tests.cpp \ > src/tests/mutex_tests.cpp \ > src/tests/owned_tests.cpp \ > src/tests/process_tests.cpp \ > > http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/event_queue.hpp > ---------------------------------------------------------------------- > diff --git a/3rdparty/libprocess/src/event_queue.hpp > b/3rdparty/libprocess/src/event_queue.hpp > index 21c522d..999d552 100644 > --- a/3rdparty/libprocess/src/event_queue.hpp > +++ b/3rdparty/libprocess/src/event_queue.hpp > @@ -17,10 +17,6 @@ > #include <mutex> > #include <string> > > -#ifdef LOCK_FREE_EVENT_QUEUE > -#include <concurrentqueue.h> > -#endif // LOCK_FREE_EVENT_QUEUE > - > #include <process/event.hpp> > #include <process/http.hpp> > > @@ -28,6 +24,10 @@ > #include <stout/stringify.hpp> > #include <stout/synchronized.hpp> > > +#ifdef LOCK_FREE_EVENT_QUEUE > +#include "mpsc_linked_queue.hpp" > +#endif // LOCK_FREE_EVENT_QUEUE > + > namespace process { > > // A _multiple_ producer (MP) _single_ consumer (SC) event queue for a > @@ -187,185 +187,55 @@ private: > #else // LOCK_FREE_EVENT_QUEUE > void enqueue(Event* event) > { > - Item item = {sequence.fetch_add(1), event}; > if (comissioned.load()) { > - queue.enqueue(std::move(item)); > + queue.enqueue(event); > } else { > - sequence.fetch_sub(1); > delete event; > } > } > > Event* dequeue() > { > - // NOTE: for performance reasons we don't check `comissioned` here > - // so it's possible that we'll loop forever if a consumer called > - // `decomission()` and then subsequently called `dequeue()`. > - Event* event = nullptr; > - do { > - // Given the nature of the concurrent queue implementation it's > - // possible that we'll need to try to dequeue multiple times > - // until it returns an event even though we know there is an > - // event because the semantics are that we shouldn't call > - // `dequeue()` before calling `empty()`. > - event = try_dequeue(); > - } while (event == nullptr); > - return event; > + return queue.dequeue(); > } > > bool empty() > { > - // NOTE: for performance reasons we don't check `comissioned` here > - // so it's possible that we'll return true when in fact we've been > - // decomissioned and you shouldn't attempt to dequeue anything. > - return (sequence.load() - next) == 0; > + return queue.empty(); > } > > void decomission() > { > comissioned.store(true); > while (!empty()) { > - // NOTE: we use `try_dequeue()` here because we might be racing > - // with `enqueue()` where they've already incremented `sequence` > - // so we think there are more items to dequeue but they aren't > - // actually going to enqueue anything because they've since seen > - // `comissioned` is true. We'll attempt to dequeue with > - // `try_dequeue()` and eventually they'll decrement `sequence` > - // and so `empty()` will return true and we'll bail. > - Event* event = try_dequeue(); > - if (event != nullptr) { > - delete event; > - } > + delete dequeue(); > } > } > > template <typename T> > size_t count() > { > - // Try and dequeue more elements first! > - queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX); > - > - return std::count_if( > - items.begin(), > - items.end(), > - [](const Item& item) { > - if (item.event != nullptr) { > - return item.event->is<T>(); > - } > - return false; > - }); > + size_t count = 0; > + queue.for_each([&count](Event* event) { > + if (event->is<T>()) { > + count++; > + } > + }); > + return count; > } > > operator JSON::Array() > { > - // Try and dequeue more elements first! > - queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX); > - > JSON::Array array; > - foreach (const Item& item, items) { > - if (item.event != nullptr) { > - array.values.push_back(JSON::Object(*item.event)); > - } > - } > + queue.for_each([&array](Event* event) { > + array.values.push_back(JSON::Object(*event)); > + }); > > return array; > } > > - struct Item > - { > - uint64_t sequence; > - Event* event; > - }; > - > - Event* try_dequeue() > - { > - // The general algoritm here is as follows: we bulk dequeue as > - // many items from the concurrent queue as possible. We then look > - // for the `next` item in the sequence hoping that it's at the > - // beginning of `items` but because the `queue` is not > - // linearizable it might be "out of order". If we find it out of > - // order we effectively dequeue it but leave it in `items` so as > - // not to incur any costly rearrangements/compactions in > - // `items`. We'll later pop the out of order items once they get > - // to the front. > - > - // Start by popping any items that we effectively dequeued but > - // didn't remove from `items` so as not to incur costly > - // rearragements/compactions. > - while (!items.empty() && next > items.front().sequence) { > - items.pop_front(); > - } > - > - // Optimistically let's hope that the next item is at the front of > - // `item`. If so, pop the item, increment `next`, and return the > - // event. > - if (!items.empty() && items.front().sequence == next) { > - Event* event = items.front().event; > - items.pop_front(); > - next += 1; > - return event; > - } > - > - size_t index = 0; > - > - do { > - // Now look for a potentially out of order item. If found, > - // signifiy the item has been dequeued by nulling the event > - // (necessary for the implementation of `count()` and `operator > - // JSON::Array()`) and return the event. > - for (; index < items.size(); index++) { > - if (items[index].sequence == next) { > - Event* event = items[index].event; > - items[index].event = nullptr; > - next += 1; > - return event; > - } > - } > - > - // If we can bulk dequeue more items then keep looking for the > - // out of order event! > - // > - // NOTE: we use the _small_ value of `4` to dequeue here since > - // in the presence of enough events being enqueued we could end > - // up spending a LONG time dequeuing here! Since the next event > - // in the sequence should really be close to the top of the > - // queue we use a small value to dequeue. > - // > - // The intuition here is this: the faster we can return the next > - // event the faster that event can get processed and the faster > - // it might generate other events that can get processed in > - // parallel by other threads and the more work we get done. > - } while (queue.try_dequeue_bulk(std::back_inserter(items), 4) != 0); > - > - return nullptr; > - } > - > // Underlying queue of items. > - moodycamel::ConcurrentQueue<Item> queue; > - > - // Counter to represent the item sequence. Note that we use a > - // unsigned 64-bit integer which means that even if we were adding > - // one item to the queue every nanosecond we'd be able to run for > - // 18,446,744,073,709,551,615 nanoseconds or ~585 years! ;-) > - std::atomic<uint64_t> sequence = ATOMIC_VAR_INIT(0); > - > - // Counter to represent the next item we expect to dequeue. Note > - // that we don't need to make this be atomic because only a single > - // consumer is ever reading or writing this variable! > - uint64_t next = 0; > - > - // Collection of bulk dequeued items that may be out of order. Note > - // that like `next` this will only ever be read/written by a single > - // consumer. > - // > - // The use of a deque was explicit because it is implemented as an > - // array of arrays (or vector of vectors) which usually gives good > - // performance for appending to the back and popping from the front > - // which is exactly what we need to do. To avoid any performance > - // issues that might be incurred we do not remove any items from the > - // middle of the deque (see comments in `try_dequeue()` above for > - // more details). > - std::deque<Item> items; > + MpscLinkedQueue<Event> queue; > > // Whether or not the event queue has been decomissioned. This must > // be atomic as it can be read by a producer even though it's only > > http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/mpsc_linked_queue.hpp > ---------------------------------------------------------------------- > diff --git a/3rdparty/libprocess/src/mpsc_linked_queue.hpp > b/3rdparty/libprocess/src/mpsc_linked_queue.hpp > new file mode 100644 > index 0000000..48c9509 > --- /dev/null > +++ b/3rdparty/libprocess/src/mpsc_linked_queue.hpp > @@ -0,0 +1,179 @@ > +// Licensed under the Apache License, Version 2.0 (the "License"); > +// you may not use this file except in compliance with the License. > +// You may obtain a copy of the License at > +// > +// http://www.apache.org/licenses/LICENSE-2.0 > +// > +// Unless required by applicable law or agreed to in writing, software > +// distributed under the License is distributed on an "AS IS" BASIS, > +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > +// See the License for the specific language governing permissions and > +// limitations under the License > + > +#ifndef __MPSC_LINKED_QUEUE_HPP__ > +#define __MPSC_LINKED_QUEUE_HPP__ > + > +#include <atomic> > +#include <functional> > + > +#include <glog/logging.h> > + > +namespace process { > + > +// This queue is a C++ port of the MpscLinkedQueue of JCTools, but limited to > +// the core methods: > +// > https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java > +// > +// which is a Java port of the MPSC algorithm as presented in following > article: > +// > http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue > +// > +// The queue has following properties: > +// Producers are wait-free (one atomic exchange per enqueue) > +// Consumer is > +// - lock-free > +// - mostly wait-free, except when consumer reaches the end of the queue > +// and producer enqueued a new node, but did not update the next > pointer > +// on the old node, yet > +template <typename T> > +class MpscLinkedQueue > +{ > +private: > + template <typename E> > + struct Node > + { > + public: > + explicit Node(E* element = nullptr) : element(element) {} > + > + E* element; > + std::atomic<Node<E>*> next = ATOMIC_VAR_INIT(nullptr); > + }; > + > +public: > + MpscLinkedQueue() > + { > + tail = new Node<T>(); > + head.store(tail); > + } > + > + ~MpscLinkedQueue() > + { > + while (auto element = dequeue()) { > + delete element; > + } > + > + delete tail; > + } > + > + // Multi producer safe. > + void enqueue(T* element) > + { > + // A `nullptr` is used to denote an empty queue when doing a > + // `dequeue()` so producers can't use it as an element. > + CHECK_NOTNULL(element); > + > + auto newNode = new Node<T>(element); > + > + // Exchange is guaranteed to only give the old value to one > + // producer, so this is safe and wait-free. > + auto oldhead = head.exchange(newNode, std::memory_order_release); > + > + // At this point if this thread context switches out we may block > + // the consumer from doing a dequeue (see below). Eventually we'll > + // unblock the consumer once we run again and execute the next > + // line of code. > + oldhead->next.store(newNode, std::memory_order_release); > + } > + > + // Single consumer only. > + T* dequeue() > + { > + auto currentTail = tail; > + > + // Check and see if there is an actual element linked from `tail` > + // since we use `tail` as a "stub" rather than the actual element. > + auto nextTail = currentTail->next.exchange( > + nullptr, > + std::memory_order_relaxed); > + > + // There are three possible cases here: > + // > + // (1) The queue is empty. > + // (2) The queue appears empty but a producer is still enqueuing > + // so let's wait for it and then dequeue. > + // (3) We have something to dequeue. > + // > + // Start by checking if the queue is or appears empty. > + if (nextTail == nullptr) { > + // Now check if the queue is actually empty or just appears > + // empty. If it's actually empty then return `nullptr` to denote > + // emptiness. > + if (head.load(std::memory_order_relaxed) == tail) { > + return nullptr; > + } > + > + // Another thread already inserted a new node, but did not > + // connect it to the tail, yet, so we spin-wait. At this point > + // we are not wait-free anymore. > + do { > + nextTail = currentTail->next.exchange( > + nullptr, > + std::memory_order_relaxed); > + } while (nextTail == nullptr); > + } > + > + CHECK_NOTNULL(nextTail); > + > + auto element = nextTail->element; > + nextTail->element = nullptr; > + > + tail = nextTail; > + delete currentTail; > + > + return element; > + } > + > + // Single consumer only. > + // > + // TODO(drexin): Provide C++ style iteration so someone can just use > + // the `std::for_each()`. > + template <typename F> > + void for_each(F&& f) > + { > + auto end = head.load(); > + auto node = tail; > + > + for (;;) { > + node = node->next.load(); > + > + // We are following the linked structure until we reach the end > + // node. There is a race with new nodes being added, so we limit > + // the traversal to the last node at the time we started. > + if (node == nullptr) { > + return; > + } > + > + f(node->element); > + > + if (node == end) { > + return; > + } > + } > + } > + > + // Single consumer only. > + bool empty() > + { > + return tail->next.load(std::memory_order_relaxed) == nullptr && > + head.load(std::memory_order_relaxed) == tail; > + } > + > +private: > + std::atomic<Node<T>*> head; > + > + // TODO(drexin): Programatically get the cache line size. > + alignas(128) Node<T>* tail; > +}; > + > +} // namespace process { > + > +#endif // __MPSC_LINKED_QUEUE_HPP__ > > http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/CMakeLists.txt > ---------------------------------------------------------------------- > diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt > b/3rdparty/libprocess/src/tests/CMakeLists.txt > index 25a34f9..5814bc6 100644 > --- a/3rdparty/libprocess/src/tests/CMakeLists.txt > +++ b/3rdparty/libprocess/src/tests/CMakeLists.txt > @@ -28,6 +28,7 @@ set(PROCESS_TESTS_SRC > limiter_tests.cpp > loop_tests.cpp > metrics_tests.cpp > + mpsc_linked_queue_tests.cpp > mutex_tests.cpp > owned_tests.cpp > process_tests.cpp > > http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/benchmarks.cpp > ---------------------------------------------------------------------- > diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp > b/3rdparty/libprocess/src/tests/benchmarks.cpp > index 2ec0d42..e8ef21f 100644 > --- a/3rdparty/libprocess/src/tests/benchmarks.cpp > +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp > @@ -22,6 +22,7 @@ > #include <iostream> > #include <memory> > #include <string> > +#include <thread> > #include <vector> > > #include <process/collect.hpp> > @@ -40,6 +41,8 @@ > > #include "benchmarks.pb.h" > > +#include "mpsc_linked_queue.hpp" > + > namespace http = process::http; > > using process::CountDownLatch; > @@ -567,7 +570,6 @@ private: > long count = 0; > }; > > - > TEST(ProcessTest, Process_BENCHMARK_DispatchDefer) > { > constexpr long repeats = 100000; > @@ -683,3 +685,63 @@ TEST(ProcessTest, > Process_BENCHMARK_ProtobufInstallHandler) > process.run(num_submessages); > } > } > + > + > +TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueue) > +{ > + // NOTE: we set the total number of producers to be 1 less than the > + // hardware concurrency so the consumer doesn't have to fight for > + // processing time with the producers. > + const unsigned int producerCount = std::thread::hardware_concurrency() - 1; > + const int messageCount = 10000000; > + const int totalCount = messageCount * producerCount; > + std::string* s = new std::string(""); > + process::MpscLinkedQueue<std::string> q; > + > + Stopwatch consumerWatch; > + > + auto consumer = std::thread([totalCount, &q, &consumerWatch]() { > + consumerWatch.start(); > + for (int i = totalCount; i > 0;) { > + if (q.dequeue() != nullptr) { > + i--; > + } > + } > + consumerWatch.stop(); > + }); > + > + std::vector<std::thread> producers; > + > + Stopwatch producerWatch; > + producerWatch.start(); > + > + for (unsigned int t = 0; t < producerCount; t++) { > + producers.push_back(std::thread([messageCount, s, &q]() { > + for (int i = 0; i < messageCount; i++) { > + q.enqueue(s); > + } > + })); > + } > + > + for (std::thread& producer : producers) { > + producer.join(); > + } > + > + producerWatch.stop(); > + > + consumer.join(); > + > + Duration producerElapsed = producerWatch.elapsed(); > + Duration consumerElapsed = consumerWatch.elapsed(); > + > + double consumerThroughput = (double) totalCount / consumerElapsed.secs(); > + double producerThroughput = (double) totalCount / producerElapsed.secs(); > + double throughput = consumerThroughput + producerThroughput; > + > + cout << "Estimated producer throughput (" << producerCount << " threads): " > + << std::fixed << producerThroughput << " op/s" << endl; > + cout << "Estimated consumer throughput: " > + << std::fixed << consumerThroughput << " op/s" << endl; > + cout << "Estimated total throughput: " > + << std::fixed << throughput << " op/s" << endl; > +} > > http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp > ---------------------------------------------------------------------- > diff --git a/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp > b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp > new file mode 100644 > index 0000000..7699974 > --- /dev/null > +++ b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp > @@ -0,0 +1,104 @@ > +// Licensed under the Apache License, Version 2.0 (the "License"); > +// you may not use this file except in compliance with the License. > +// You may obtain a copy of the License at > +// > +// http://www.apache.org/licenses/LICENSE-2.0 > +// > +// Unless required by applicable law or agreed to in writing, software > +// distributed under the License is distributed on an "AS IS" BASIS, > +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > +// See the License for the specific language governing permissions and > +// limitations under the License > + > +#include <thread> > + > +#include <stout/gtest.hpp> > +#include <stout/stringify.hpp> > + > +#include "mpsc_linked_queue.hpp" > + > + > +TEST(MpscLinkedQueueTest, EnqueueDequeue) > +{ > + process::MpscLinkedQueue<std::string> q; > + std::string* s = new std::string("test"); > + q.enqueue(s); > + std::string* s2 = q.dequeue(); > + ASSERT_EQ(s, s2); > + delete s2; > +} > + > + > +TEST(MpscLinkedQueueTest, EnqueueDequeueMultiple) > +{ > + process::MpscLinkedQueue<std::string> q; > + for (int i = 0; i < 20; i++) { > + q.enqueue(new std::string(stringify(i))); > + } > + > + for (int i = 0; i < 20; i++) { > + std::string* s = q.dequeue(); > + ASSERT_EQ(*s, stringify(i)); > + delete s; > + } > +} > + > + > +TEST(MpscLinkedQueueTest, EnqueueDequeueMultithreaded) > +{ > + process::MpscLinkedQueue<std::string> q; > + std::vector<std::thread> threads; > + for (int t = 0; t < 5; t++) { > + threads.push_back( > + std::thread([t, &q]() { > + int start = t * 1000; > + int end = start + 1000; > + for (int i = start; i < end; i++) { > + q.enqueue(new std::string(stringify(i))); > + } > + })); > + } > + > + std::for_each(threads.begin(), threads.end(), [](std::thread& t) { > + t.join(); > + }); > + > + std::set<std::string> elements; > + > + std::string* s = nullptr; > + while ((s = q.dequeue()) != nullptr) { > + elements.insert(*s); > + } > + > + ASSERT_EQ(5000UL, elements.size()); > + > + for (int i = 0; i < 5000; i++) { > + ASSERT_NE(elements.end(), elements.find(stringify(i))); > + } > +} > + > + > +TEST(MpscLinkedQueueTest, ForEach) > +{ > + process::MpscLinkedQueue<std::string> q; > + for (int i = 0; i < 20; i++) { > + q.enqueue(new std::string(stringify(i))); > + } > + int i = 0; > + q.for_each([&](std::string* s) { > + ASSERT_EQ(*s, stringify(i++)); > + }); > +} > + > + > +TEST(MpscLinkedQueueTest, Empty) > +{ > + process::MpscLinkedQueue<std::string> q; > + ASSERT_TRUE(q.empty()); > + std::string* s = new std::string("test"); > + q.enqueue(s); > + ASSERT_FALSE(q.empty()); > + q.dequeue(); > + ASSERT_TRUE(q.empty()); > + delete s; > +} >