This is an automated email from the ASF dual-hosted git repository. aboda pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push: new 0e6088f MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool 0e6088f is described below commit 0e6088ffd1b2c7013117f47a3ea48d56bf944a51 Author: Arpad Boda <ab...@apache.org> AuthorDate: Mon Mar 23 15:12:55 2020 +0100 MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool Signed-off-by: Arpad Boda <ab...@apache.org> Approved by bakaid and szaszm on GH This closes #746 --- libminifi/include/utils/MinifiConcurrentQueue.h | 184 +++++++++++++++++++ libminifi/include/utils/ThreadPool.h | 18 +- libminifi/src/utils/ThreadPool.cpp | 28 ++- libminifi/test/CPPLINT.cfg | 2 +- libminifi/test/unit/MinifiConcurrentQueueTests.cpp | 201 +++++++++++++++++++++ 5 files changed, 407 insertions(+), 26 deletions(-) diff --git a/libminifi/include/utils/MinifiConcurrentQueue.h b/libminifi/include/utils/MinifiConcurrentQueue.h new file mode 100644 index 0000000..d0bdcab --- /dev/null +++ b/libminifi/include/utils/MinifiConcurrentQueue.h @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include <chrono> +#include <deque> +#include <mutex> +#include <condition_variable> +#include <stdexcept> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + + +// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers. +// Guarantees elements to be dequeued in order of insertion +template <typename T> +class ConcurrentQueue { + public: + explicit ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) + : ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { + if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock); + std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); + } + return *this; + } + + bool tryDequeue(T& out) { + std::unique_lock<std::mutex> lck(mtx_); + return tryDequeueImpl(lck, out); + } + + bool empty() const { + std::unique_lock<std::mutex> lck(mtx_); + return queue_.emptyImpl(lck); + } + + size_t size() const { + std::lock_guard<std::mutex> guard(mtx_); + return queue_.size(); + } + + void clear() { + std::lock_guard<std::mutex> guard(mtx_); + queue_.clear(); + } + + template <typename... Args> + void enqueue(Args&&... args) { + std::lock_guard<std::mutex> guard(mtx_); + queue_.emplace_back(std::forward<Args>(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&) + : queue_( std::move(other.queue_) ) {} + + protected: + void checkLock(std::unique_lock<std::mutex>& lck) const { + if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!"); + } + } + + bool tryDequeueImpl(std::unique_lock<std::mutex>& lck, T& out) { + checkLock(lck); + if (queue_.empty()) { + return false; + } + out = std::move(queue_.front()); + queue_.pop_front(); + return true; + } + + bool emptyImpl(std::unique_lock<std::mutex>& lck) const { + checkLock(lck); + return queue_.empty(); + } + + mutable std::mutex mtx_; + private: + std::deque<T> queue_; +}; + + +// A ConcurrentQueue extended with a condition variable to be able to block and wait for incoming data +// Stopping interrupts all consumers without a chance to consume remaining elements in the queue although elements can still be enqueued +// Started means queued elements can be consumed/dequeued and dequeueWait* calls can block +template <typename T> +class ConditionConcurrentQueue : private ConcurrentQueue<T> { + public: + explicit ConditionConcurrentQueue(bool start = true) : ConcurrentQueue<T>{}, running_{start} {} + + ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete; + ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete; + + using ConcurrentQueue<T>::size; + using ConcurrentQueue<T>::empty; + using ConcurrentQueue<T>::clear; + + + template <typename... Args> + void enqueue(Args&&... args) { + ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...); + if (running_) { + cv_.notify_one(); + } + } + + bool dequeueWait(T& out) { + std::unique_lock<std::mutex> lck(this->mtx_); + cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped + return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out); + } + + template< class Rep, class Period > + bool dequeueWaitFor(T& out, const std::chrono::duration<Rep, Period>& time) { + std::unique_lock<std::mutex> lck(this->mtx_); + cv_.wait_for(lck, time, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Wake up with timeout or in case there is something to do + return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out); + } + + bool tryDequeue(T& out) { + std::unique_lock<std::mutex> lck(this->mtx_); + return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out); + } + + void stop() { + std::lock_guard<std::mutex> guard(this->mtx_); + running_ = false; + cv_.notify_all(); + } + + void start() { + std::unique_lock<std::mutex> lck(this->mtx_); + running_ = true; + } + + bool isRunning() const { + std::lock_guard<std::mutex> guard(this->mtx_); + return running_; // In case it's not running no notifications are generated, dequeueing fails instead of blocking to avoid hanging threads + } + + private: + bool running_; + std::condition_variable cv_; +}; + +} /* namespace utils */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif // LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index 2554dc2..86e2abb 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -30,10 +30,10 @@ #include <functional> #include "BackTrace.h" +#include "MinifiConcurrentQueue.h" #include "Monitors.h" #include "core/expect.h" #include "controllers/ThreadManagementService.h" -#include "concurrentqueue.h" #include "core/controller/ControllerService.h" #include "core/controller/ControllerServiceProvider.h" namespace org { @@ -303,8 +303,12 @@ class ThreadPool { * Drain will notify tasks to stop following notification */ void drain() { + worker_queue_.stop(); while (current_workers_ > 0) { - tasks_available_.notify_one(); + // The sleeping workers were waken up and stopped, but we have to wait + // the ones that actually worked on something when the queue was stopped. + // Stopping the queue guarantees that they don't get any new task. + std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } // determines if threads are detached @@ -330,20 +334,18 @@ class ThreadPool { // integrated power manager std::shared_ptr<controllers::ThreadManagementService> thread_manager_; // thread queue for the recently deceased threads. - moodycamel::ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_; + ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_; // worker queue of worker objects - moodycamel::ConcurrentQueue<Worker<T>> worker_queue_; + ConditionConcurrentQueue<Worker<T>> worker_queue_; std::priority_queue<Worker<T>, std::vector<Worker<T>>, DelayedTaskComparator<T>> delayed_worker_queue_; -// notification for available work - std::condition_variable tasks_available_; +// mutex to protect task status and delayed queue + std::mutex worker_queue_mutex_; // notification for new delayed tasks that's before the current ones std::condition_variable delayed_task_available_; // map to identify if a task should be std::map<std::string, bool> task_status_; // manager mutex std::recursive_mutex manager_mutex_; -// work queue mutex - std::mutex worker_queue_mutex_; // thread pool name std::string name_; diff --git a/libminifi/src/utils/ThreadPool.cpp b/libminifi/src/utils/ThreadPool.cpp index 039e136..01651d8 100644 --- a/libminifi/src/utils/ThreadPool.cpp +++ b/libminifi/src/utils/ThreadPool.cpp @@ -39,7 +39,7 @@ void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) { } Worker<T> task; - if (worker_queue_.try_dequeue(task)) { + if (worker_queue_.dequeueWait(task)) { { std::unique_lock<std::mutex> lock(worker_queue_mutex_); if (!task_status_[task.getIdentifier()]) { @@ -64,8 +64,11 @@ void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) { } } } else { - std::unique_lock<std::mutex> lock(worker_queue_mutex_); - tasks_available_.wait(lock); + // This means that the threadpool is running, but the ConcurrentQueue is stopped -> shouldn't happen during normal conditions + // Might happen during startup or shutdown for a very short time + if (running_.load()) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } } } current_workers_--; @@ -83,7 +86,6 @@ void ThreadPool<T>::manage_delayed_queue() { Worker<T> task = std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top())); delayed_worker_queue_.pop(); worker_queue_.enqueue(std::move(task)); - tasks_available_.notify_one(); } if (delayed_worker_queue_.empty()) { delayed_task_available_.wait(lock); @@ -102,14 +104,11 @@ bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) { task_status_[task.getIdentifier()] = true; } future = std::move(task.getPromise()->get_future()); - bool enqueued = worker_queue_.enqueue(std::move(task)); - if (running_) { - tasks_available_.notify_one(); - } + worker_queue_.enqueue(std::move(task)); task_count_++; - return enqueued; + return true; } template<typename T> @@ -157,7 +156,7 @@ void ThreadPool<T>::manageWorkers() { current_workers_++; } std::shared_ptr<WorkerThread> thread_ref; - while (deceased_thread_queue_.try_dequeue(thread_ref)) { + while (deceased_thread_queue_.tryDequeue(thread_ref)) { std::unique_lock<std::mutex> lock(worker_queue_mutex_); if (thread_ref->thread_.joinable()) thread_ref->thread_.join(); @@ -186,10 +185,8 @@ void ThreadPool<T>::start() { std::lock_guard<std::recursive_mutex> lock(manager_mutex_); if (!running_) { running_ = true; + worker_queue_.start(); manager_thread_ = std::move(std::thread(&ThreadPool::manageWorkers, this)); - if (worker_queue_.size_approx() > 0) { - tasks_available_.notify_all(); - } std::lock_guard<std::mutex> quee_lock(worker_queue_mutex_); delayed_scheduler_thread_ = std::thread(&ThreadPool<T>::manage_delayed_queue, this); @@ -231,10 +228,7 @@ void ThreadPool<T>::shutdown() { delayed_worker_queue_.pop(); } - while (worker_queue_.size_approx() > 0) { - Worker<T> task; - worker_queue_.try_dequeue(task); - } + worker_queue_.clear(); } } diff --git a/libminifi/test/CPPLINT.cfg b/libminifi/test/CPPLINT.cfg index a59ff3b..3455f0f 100644 --- a/libminifi/test/CPPLINT.cfg +++ b/libminifi/test/CPPLINT.cfg @@ -1,4 +1,4 @@ -filter=-build/include_order,-build/include_alpha +filter=-build/include_order,-build/include_alpha,-build/namespaces exclude_files=Server.cpp exclude_files=TestBase.cpp exclude_files=RandomServerSocket.cpp diff --git a/libminifi/test/unit/MinifiConcurrentQueueTests.cpp b/libminifi/test/unit/MinifiConcurrentQueueTests.cpp new file mode 100644 index 0000000..aed2743 --- /dev/null +++ b/libminifi/test/unit/MinifiConcurrentQueueTests.cpp @@ -0,0 +1,201 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <chrono> +#include <random> +#include <string> +#include <thread> +#include <vector> +#include <set> + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +namespace utils = org::apache::nifi::minifi::utils; + +TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { + utils::ConcurrentQueue<std::string> queue; + std::vector<std::string> results; + + std::thread producer([&queue]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); + }); + + std::thread consumer([&queue, &results]() { + while (results.size() < 3) { + std::string s; + if (queue.tryDequeue(s)) { + results.push_back(s); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } + }); + + producer.join(); + consumer.join(); + + REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); +} + + +TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") { + utils::ConditionConcurrentQueue<std::string> queue(true); + std::vector<std::string> results; + + std::thread producer([&queue]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); + }); + + std::thread consumer([&queue, &results]() { + std::string s; + while (queue.dequeueWait(s)) { + results.push_back(s); + } + }); + + producer.join(); + + queue.stop(); + + consumer.join(); + + REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); +} + + +/* In this testcase the consumer thread puts back all items to the queue to consume again + * Even in this case the ones inserted later by the producer should be consumed */ +TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") { + utils::ConcurrentQueue<std::string> queue; + std::set<std::string> results; + + std::thread producer([&queue]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); + }); + + std::thread consumer([&queue, &results]() { + while (results.size() < 3) { + std::string s; + if (queue.tryDequeue(s)) { + results.insert(s); + queue.enqueue(std::move(s)); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } + }); + + producer.join(); + consumer.join(); + + REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); +} + +/* The same test as above, but covering the ConditionConcurrentQueue */ +TEST_CASE("TestConditionConqurrentQueue::testQueueWithReAdd", "[TestConditionQueueWithReAdd]") { + utils::ConditionConcurrentQueue<std::string> queue(true); + std::set<std::string> results; + + std::thread producer([&queue]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); + }); + + std::thread consumer([&queue, &results]() { + std::string s; + while (queue.dequeueWait(s)) { + results.insert(s); + queue.enqueue(std::move(s)); + } + }); + + producer.join(); + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + queue.stop(); + + consumer.join(); + + REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); +} + +TEST_CASE("TestConruccentQueues::highLoad", "[TestConcurrentQueuesHighLoad]") { + std::random_device dev; + std::mt19937 rng(dev()); + std::uniform_int_distribution<std::mt19937::result_type> dist(1, std::numeric_limits<int>::max()); + + std::vector<int> source(1000000); + std::vector<int> target; + + generate(source.begin(), source.end(), [&rng, &dist](){ return dist(rng); }); + + utils::ConcurrentQueue<int> queue; + utils::ConditionConcurrentQueue<int> cqueue(true); + + std::thread producer([&source, &queue]() { + for (int i : source) { queue.enqueue(i); } + }); + + std::thread relay([&queue, &cqueue]() { + size_t cnt = 0; + while (cnt < 1000000) { + int i; + if (queue.tryDequeue(i)) { + cnt++; + cqueue.enqueue(i); + } + } + }); + + std::thread consumer([&cqueue, &target]() { + int i; + while (cqueue.dequeueWait(i)) { + target.push_back(i); + } + }); + + producer.join(); + relay.join(); + + while (cqueue.size() > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(0)); + } + + cqueue.stop(); + consumer.join(); + + REQUIRE(source == target); +}