This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e6088f  MINIFICPP-1185 - Remove moodycamel::concurrentqueue from 
threadpool
0e6088f is described below

commit 0e6088ffd1b2c7013117f47a3ea48d56bf944a51
Author: Arpad Boda <ab...@apache.org>
AuthorDate: Mon Mar 23 15:12:55 2020 +0100

    MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    Approved by bakaid and szaszm on GH
    
    This closes #746
---
 libminifi/include/utils/MinifiConcurrentQueue.h    | 184 +++++++++++++++++++
 libminifi/include/utils/ThreadPool.h               |  18 +-
 libminifi/src/utils/ThreadPool.cpp                 |  28 ++-
 libminifi/test/CPPLINT.cfg                         |   2 +-
 libminifi/test/unit/MinifiConcurrentQueueTests.cpp | 201 +++++++++++++++++++++
 5 files changed, 407 insertions(+), 26 deletions(-)

diff --git a/libminifi/include/utils/MinifiConcurrentQueue.h 
b/libminifi/include/utils/MinifiConcurrentQueue.h
new file mode 100644
index 0000000..d0bdcab
--- /dev/null
+++ b/libminifi/include/utils/MinifiConcurrentQueue.h
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include <chrono>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#include <stdexcept>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+
+// Provides a queue API and guarantees no race conditions in case of multiple 
producers and consumers.
+// Guarantees elements to be dequeued in order of insertion
+template <typename T>
+class ConcurrentQueue {
+ public:    
+  explicit ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+    : ConcurrentQueue(std::move(other), 
std::lock_guard<std::mutex>(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+    if (this != &other) {
+      std::lock(mtx_, other.mtx_);
+      std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
+      std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
+      queue_.swap(other.queue_);
+    }
+    return *this;
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return tryDequeueImpl(lck, out);
+  }
+
+  bool empty() const {
+    std::unique_lock<std::mutex> lck(mtx_);
+    return queue_.emptyImpl(lck);
+  }
+
+  size_t size() const {
+    std::lock_guard<std::mutex> guard(mtx_);
+    return queue_.size();
+  }
+
+  void clear() {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.clear();
+  }
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    std::lock_guard<std::mutex> guard(mtx_);
+    queue_.emplace_back(std::forward<Args>(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
+    : queue_( std::move(other.queue_) ) {}
+
+ protected:
+  void checkLock(std::unique_lock<std::mutex>& lck) const {
+    if (!lck.owns_lock()) {
+      throw std::logic_error("Caller of protected functions of ConcurrentQueue 
should own the lock!"); 
+    }
+  }
+
+  bool tryDequeueImpl(std::unique_lock<std::mutex>& lck, T& out) {
+    checkLock(lck);
+    if (queue_.empty()) {
+      return false;
+    }
+    out = std::move(queue_.front());
+    queue_.pop_front();
+    return true;
+  }
+
+  bool emptyImpl(std::unique_lock<std::mutex>& lck) const {
+    checkLock(lck);
+    return queue_.empty();
+  }
+
+  mutable std::mutex mtx_;
+ private:
+  std::deque<T> queue_;
+};
+
+
+// A ConcurrentQueue extended with a condition variable to be able to block 
and wait for incoming data
+// Stopping interrupts all consumers without a chance to consume remaining 
elements in the queue although elements can still be enqueued
+// Started means queued elements can be consumed/dequeued and dequeueWait* 
calls can block
+template <typename T>
+class ConditionConcurrentQueue : private ConcurrentQueue<T> {
+ public:
+  explicit ConditionConcurrentQueue(bool start = true) : ConcurrentQueue<T>{}, 
running_{start} {}
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = 
delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = 
delete;
+
+  using ConcurrentQueue<T>::size;
+  using ConcurrentQueue<T>::empty;
+  using ConcurrentQueue<T>::clear;
+
+
+  template <typename... Args>
+  void enqueue(Args&&... args) {
+    ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
+    if (running_) {
+      cv_.notify_one();
+    }
+  }
+  
+  bool dequeueWait(T& out) {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); 
 // Only wake up if there is something to return or stopped 
+    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+  }
+
+  template< class Rep, class Period >
+  bool dequeueWaitFor(T& out, const std::chrono::duration<Rep, Period>& time) {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    cv_.wait_for(lck, time, [this, &lck]{ return !running_ || 
!this->emptyImpl(lck); });  // Wake up with timeout or in case there is 
something to do
+    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+  }
+
+  bool tryDequeue(T& out) {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+  }
+  
+  void stop() {
+    std::lock_guard<std::mutex> guard(this->mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+
+  void start() {
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    running_ = true;
+  }
+  
+  bool isRunning() const {
+    std::lock_guard<std::mutex> guard(this->mtx_);
+    return running_;  // In case it's not running no notifications are 
generated, dequeueing fails instead of blocking to avoid hanging threads
+  }
+
+ private:
+  bool running_;
+  std::condition_variable cv_;
+};
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif  // LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
diff --git a/libminifi/include/utils/ThreadPool.h 
b/libminifi/include/utils/ThreadPool.h
index 2554dc2..86e2abb 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -30,10 +30,10 @@
 #include <functional>
 
 #include "BackTrace.h"
+#include "MinifiConcurrentQueue.h"
 #include "Monitors.h"
 #include "core/expect.h"
 #include "controllers/ThreadManagementService.h"
-#include "concurrentqueue.h"
 #include "core/controller/ControllerService.h"
 #include "core/controller/ControllerServiceProvider.h"
 namespace org {
@@ -303,8 +303,12 @@ class ThreadPool {
    * Drain will notify tasks to stop following notification
    */
   void drain() {
+    worker_queue_.stop();
     while (current_workers_ > 0) {
-      tasks_available_.notify_one();
+      // The sleeping workers were waken up and stopped, but we have to wait 
+      // the ones that actually worked on something when the queue was stopped.
+      // Stopping the queue guarantees that they don't get any new task.
+      std::this_thread::sleep_for(std::chrono::milliseconds(1));
     }
   }
 // determines if threads are detached
@@ -330,20 +334,18 @@ class ThreadPool {
 // integrated power manager
   std::shared_ptr<controllers::ThreadManagementService> thread_manager_;
   // thread queue for the recently deceased threads.
-  moodycamel::ConcurrentQueue<std::shared_ptr<WorkerThread>> 
deceased_thread_queue_;
+  ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
 // worker queue of worker objects
-  moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
+  ConditionConcurrentQueue<Worker<T>> worker_queue_;
   std::priority_queue<Worker<T>, std::vector<Worker<T>>, 
DelayedTaskComparator<T>> delayed_worker_queue_;
-// notification for available work
-  std::condition_variable tasks_available_;
+// mutex to  protect task status and delayed queue 
+  std::mutex worker_queue_mutex_;
 // notification for new delayed tasks that's before the current ones
   std::condition_variable delayed_task_available_;
 // map to identify if a task should be
   std::map<std::string, bool> task_status_;
 // manager mutex
   std::recursive_mutex manager_mutex_;
-// work queue mutex
-  std::mutex worker_queue_mutex_;
   // thread pool name
   std::string name_;
 
diff --git a/libminifi/src/utils/ThreadPool.cpp 
b/libminifi/src/utils/ThreadPool.cpp
index 039e136..01651d8 100644
--- a/libminifi/src/utils/ThreadPool.cpp
+++ b/libminifi/src/utils/ThreadPool.cpp
@@ -39,7 +39,7 @@ void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> 
thread) {
     }
 
     Worker<T> task;
-    if (worker_queue_.try_dequeue(task)) {
+    if (worker_queue_.dequeueWait(task)) {
       {
         std::unique_lock<std::mutex> lock(worker_queue_mutex_);
         if (!task_status_[task.getIdentifier()]) {
@@ -64,8 +64,11 @@ void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> 
thread) {
         }
       }
     } else {
-      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-      tasks_available_.wait(lock);
+      // This means that the threadpool is running, but the ConcurrentQueue is 
stopped -> shouldn't happen during normal conditions
+      // Might happen during startup or shutdown for a very short time
+      if (running_.load()) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+      }
     }
   }
   current_workers_--;
@@ -83,7 +86,6 @@ void ThreadPool<T>::manage_delayed_queue() {
       Worker<T> task = 
std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
       delayed_worker_queue_.pop();
       worker_queue_.enqueue(std::move(task));
-      tasks_available_.notify_one();
     }
     if (delayed_worker_queue_.empty()) {
       delayed_task_available_.wait(lock);
@@ -102,14 +104,11 @@ bool ThreadPool<T>::execute(Worker<T> &&task, 
std::future<T> &future) {
     task_status_[task.getIdentifier()] = true;
   }
   future = std::move(task.getPromise()->get_future());
-  bool enqueued = worker_queue_.enqueue(std::move(task));
-  if (running_) {
-    tasks_available_.notify_one();
-  }
+  worker_queue_.enqueue(std::move(task));
 
   task_count_++;
 
-  return enqueued;
+  return true;
 }
 
 template<typename T>
@@ -157,7 +156,7 @@ void ThreadPool<T>::manageWorkers() {
           current_workers_++;
         }
         std::shared_ptr<WorkerThread> thread_ref;
-        while (deceased_thread_queue_.try_dequeue(thread_ref)) {
+        while (deceased_thread_queue_.tryDequeue(thread_ref)) {
           std::unique_lock<std::mutex> lock(worker_queue_mutex_);
           if (thread_ref->thread_.joinable())
             thread_ref->thread_.join();
@@ -186,10 +185,8 @@ void ThreadPool<T>::start() {
   std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
   if (!running_) {
     running_ = true;
+    worker_queue_.start();
     manager_thread_ = std::move(std::thread(&ThreadPool::manageWorkers, this));
-    if (worker_queue_.size_approx() > 0) {
-      tasks_available_.notify_all();
-    }
 
     std::lock_guard<std::mutex> quee_lock(worker_queue_mutex_);
     delayed_scheduler_thread_ = 
std::thread(&ThreadPool<T>::manage_delayed_queue, this);
@@ -231,10 +228,7 @@ void ThreadPool<T>::shutdown() {
       delayed_worker_queue_.pop();
     }
 
-    while (worker_queue_.size_approx() > 0) {
-      Worker<T> task;
-      worker_queue_.try_dequeue(task);
-    }
+    worker_queue_.clear();
   }
 }
 
diff --git a/libminifi/test/CPPLINT.cfg b/libminifi/test/CPPLINT.cfg
index a59ff3b..3455f0f 100644
--- a/libminifi/test/CPPLINT.cfg
+++ b/libminifi/test/CPPLINT.cfg
@@ -1,4 +1,4 @@
-filter=-build/include_order,-build/include_alpha
+filter=-build/include_order,-build/include_alpha,-build/namespaces
 exclude_files=Server.cpp
 exclude_files=TestBase.cpp
 exclude_files=RandomServerSocket.cpp
diff --git a/libminifi/test/unit/MinifiConcurrentQueueTests.cpp 
b/libminifi/test/unit/MinifiConcurrentQueueTests.cpp
new file mode 100644
index 0000000..aed2743
--- /dev/null
+++ b/libminifi/test/unit/MinifiConcurrentQueueTests.cpp
@@ -0,0 +1,201 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <random>
+#include <string>
+#include <thread>
+#include <vector>
+#include <set>
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+namespace utils = org::apache::nifi::minifi::utils;
+
+TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
+  utils::ConcurrentQueue<std::string> queue;
+  std::vector<std::string> results;
+
+  std::thread producer([&queue]() {
+      queue.enqueue("ba");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("dum");
+      std::this_thread::sleep_for(std::chrono::milliseconds(3));
+      queue.enqueue("tss");
+    });
+
+  std::thread consumer([&queue, &results]() {
+     while (results.size() < 3) {
+       std::string s;
+       if (queue.tryDequeue(s)) {
+         results.push_back(s);
+       } else {
+         std::this_thread::sleep_for(std::chrono::milliseconds(1));
+       }
+     }
+    });
+
+  producer.join();
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+
+TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") {
+  utils::ConditionConcurrentQueue<std::string> queue(true);
+  std::vector<std::string> results;
+
+  std::thread producer([&queue]() {
+    queue.enqueue("ba");
+    std::this_thread::sleep_for(std::chrono::milliseconds(3));
+    queue.enqueue("dum");
+    std::this_thread::sleep_for(std::chrono::milliseconds(3));
+    queue.enqueue("tss");
+  });
+
+  std::thread consumer([&queue, &results]() {
+    std::string s;
+    while (queue.dequeueWait(s)) {
+      results.push_back(s);
+    }
+  });
+
+  producer.join();
+
+  queue.stop();
+
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+
+/* In this testcase the consumer thread puts back all items to the queue to 
consume again
+ * Even in this case the ones inserted later by the producer  should be 
consumed */
+TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") {
+  utils::ConcurrentQueue<std::string> queue;
+  std::set<std::string> results;
+
+  std::thread producer([&queue]() {
+    queue.enqueue("ba");
+    std::this_thread::sleep_for(std::chrono::milliseconds(3));
+    queue.enqueue("dum");
+    std::this_thread::sleep_for(std::chrono::milliseconds(3));
+    queue.enqueue("tss");
+  });
+
+  std::thread consumer([&queue, &results]() {
+    while (results.size() < 3) {
+      std::string s;
+      if (queue.tryDequeue(s)) {
+        results.insert(s);
+        queue.enqueue(std::move(s));
+      } else {
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+      }
+    }
+  });
+
+  producer.join();
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+/* The same test as above, but covering the ConditionConcurrentQueue */
+TEST_CASE("TestConditionConqurrentQueue::testQueueWithReAdd", 
"[TestConditionQueueWithReAdd]") {
+  utils::ConditionConcurrentQueue<std::string> queue(true);
+  std::set<std::string> results;
+
+  std::thread producer([&queue]()  {
+    queue.enqueue("ba");
+    std::this_thread::sleep_for(std::chrono::milliseconds(3));
+    queue.enqueue("dum");
+    std::this_thread::sleep_for(std::chrono::milliseconds(3));
+    queue.enqueue("tss");
+  });
+
+  std::thread consumer([&queue, &results]() {
+    std::string s;
+    while (queue.dequeueWait(s)) {
+      results.insert(s);
+      queue.enqueue(std::move(s));
+    }
+  });
+
+  producer.join();
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(10));
+
+  queue.stop();
+
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+TEST_CASE("TestConruccentQueues::highLoad", "[TestConcurrentQueuesHighLoad]") {
+  std::random_device dev;
+  std::mt19937 rng(dev());
+  std::uniform_int_distribution<std::mt19937::result_type> dist(1, 
std::numeric_limits<int>::max());
+
+  std::vector<int> source(1000000);
+  std::vector<int> target;
+
+  generate(source.begin(), source.end(), [&rng, &dist](){ return dist(rng); });
+
+  utils::ConcurrentQueue<int> queue;
+  utils::ConditionConcurrentQueue<int> cqueue(true);
+
+  std::thread producer([&source, &queue]()  {
+    for (int i : source) { queue.enqueue(i); }
+  });
+
+  std::thread relay([&queue, &cqueue]() {
+    size_t cnt = 0;
+    while (cnt < 1000000) {
+      int i;
+      if (queue.tryDequeue(i)) {
+        cnt++;
+        cqueue.enqueue(i);
+      }
+    }
+  });
+
+  std::thread consumer([&cqueue, &target]() {
+    int i;
+    while (cqueue.dequeueWait(i)) {
+      target.push_back(i);
+    }
+  });
+
+  producer.join();
+  relay.join();
+
+  while (cqueue.size() > 0) {
+    std::this_thread::sleep_for(std::chrono::milliseconds(0));
+  }
+
+  cqueue.stop();
+  consumer.join();
+
+  REQUIRE(source == target);
+}

Reply via email to