[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400061549
 
 

 ##
 File path: libminifi/src/utils/ThreadPool.cpp
 ##
 @@ -64,8 +64,10 @@ void ThreadPool::run_tasks(std::shared_ptr 
thread) {
 }
   }
 } else {
-  std::unique_lock lock(worker_queue_mutex_);
-  tasks_available_.wait(lock);
+  // This means that the threadpool is running, but the ConcurrentQueue is 
stopped -> shouldn't happen
+  if (running_.load()) {
+std::this_thread::sleep_for(std::chrono::milliseconds(1));
+  }
 
 Review comment:
   thx for the explanation. Feel free to close the thread.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400045447
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -29,11 +29,12 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
+
+// Provides a queue API and guarantees no race conditions in case of multiple 
producers and consumers.
 
 Review comment:
   I would add:
   - Guarantees that the elements are dequeued in the order of insertion


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400043152
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != ) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue 
should own the lock!");
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
 
 Review comment:
   It's fine and expected that we use it in libminifi. What I'd like is that we 
only support dependency on public members for libminifi API users and treat 
`protected`/`private` as "internal". The language doesn't provide tools for 
that (like package-private in Java), so that's an API documentation issue. We 
could also use pimpl, but that would mean a lot of boilerplate, an extra level 
of indirection and no inlining, so I'm against that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400060410
 
 

 ##
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
+
+TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
+  utils::ConcurrentQueue queue;
+  std::vector results;
+
+  std::thread producer([]() {
+  queue.enqueue("ba");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
 
 Review comment:
   I see your point, but we should cover data races as well. Can we have at 
least a handful of test cases with rapid insertions, i.e. with no sleep, to 
cover both problems?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400048134
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -81,25 +82,38 @@ class ConcurrentQueue {
 : queue_( std::move(other.queue_) ) {}
 
  protected:
-  bool tryDequeue(std::unique_lock& lck, T& out) {
+  void checkLock(std::unique_lock& lck) const {
 if (!lck.owns_lock()) {
-  throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue 
should own the lock!");
+  throw std::logic_error("Caller of protected functions of ConcurrentQueue 
should own the lock!"); 
 }
+  }
+
+  bool tryDequeueImpl(std::unique_lock& lck, T& out) {
+checkLock(lck);
 if (queue_.empty()) {
   return false;
 }
 out = std::move(queue_.front());
 queue_.pop_front();
 return true;
   }
 
 Review comment:
   I like that you used protected functions taking `std::unique_lock` to make 
it an error to use the non-locking functions without a lock.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400057782
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+
+// Provides a queue API and guarantees no race conditions in case of multiple 
producers and consumers.
+template 
+class ConcurrentQueue {
+ public:
+  explicit ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != ) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeueImpl(lck, out);
+  }
+
+  bool empty() const {
+std::unique_lock lck(mtx_);
+return queue_.emptyImpl(lck);
+  }
+
+  size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  void checkLock(std::unique_lock& lck) const {
+if (!lck.owns_lock()) {
+  throw std::logic_error("Caller of protected functions of ConcurrentQueue 
should own the lock!"); 
+}
+  }
+
+  bool tryDequeueImpl(std::unique_lock& lck, T& out) {
+checkLock(lck);
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+
+  bool emptyImpl(std::unique_lock& lck) const {
+checkLock(lck);
+return queue_.empty();
+  }
+
+  mutable std::mutex mtx_;
+ private:
+  std::deque queue_;
+};
+
+
+// A ConcurrentQueue extended with a condition variable to be able to block 
and wait for incoming data
+template 
+class ConditionConcurrentQueue : private ConcurrentQueue {
+ public:
+  explicit ConditionConcurrentQueue(bool start = true) : ConcurrentQueue{}, 
running_{start} {}
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = 
delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = 
delete;
+
+  using ConcurrentQueue::size;
+  using ConcurrentQueue::empty;
+  using ConcurrentQueue::clear;
+
+
+  template 
+  void enqueue(Args&&... args) {
+ConcurrentQueue::enqueue(std::forward(args)...);
+if (running_) {
+  cv_.notify_one();
+}
+  }
+  
+  bool dequeueWait(T& out) {
+std::unique_lock lck(this->mtx_);
+cv_.wait(lck, [this, ]{ return !running_ || !this->emptyImpl(lck); }); 
 // Only wake up if there is something to return or stopped 
+return running_ && ConcurrentQueue::tryDequeueImpl(lck, out);
+  }
+
+  template< class Rep, class Period >
+  bool dequeueWaitFor(T& out, const std::chrono::duration& time) {
+std::unique_lock lck(this->mtx_);
+cv_.wait_for(lck, time, [this, ]{ return !running_ || 
!this->emptyImpl(lck); });  // Wake up with timeout or in case there is 
something to do
+return running_ && ConcurrentQueue::tryDequeueImpl(lck, out);
+  }
+
+  bool tryDequeue(T& out) {
+std::unique_lock lck(this->mtx_);
+return running_ && 

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400051711
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -119,10 +133,22 @@ class ConditionConcurrentQueue : private 
ConcurrentQueue {
 }
   }
   
-  bool dequeue(T& out) {
+  bool dequeueWait(T& out) {
+std::unique_lock lck(this->mtx_);
+cv_.wait(lck, [this, ]{ return !running_ || !this->emptyImpl(lck); }); 
 // Only wake up if there is something to return or stopped 
+return running_ && ConcurrentQueue::tryDequeueImpl(lck, out);
+  }
+
+  template< class Rep, class Period >
+  bool dequeueWaitFor(T& out, const std::chrono::duration& time) {
 
 Review comment:
   I like that you used a template here, not just a concrete duration type like 
`milliseconds`, which is more common in the code base.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400056195
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+
+// Provides a queue API and guarantees no race conditions in case of multiple 
producers and consumers.
+template 
+class ConcurrentQueue {
+ public:
+  explicit ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != ) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeueImpl(lck, out);
+  }
+
+  bool empty() const {
+std::unique_lock lck(mtx_);
+return queue_.emptyImpl(lck);
+  }
+
+  size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  void checkLock(std::unique_lock& lck) const {
+if (!lck.owns_lock()) {
+  throw std::logic_error("Caller of protected functions of ConcurrentQueue 
should own the lock!"); 
+}
+  }
+
+  bool tryDequeueImpl(std::unique_lock& lck, T& out) {
+checkLock(lck);
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+
+  bool emptyImpl(std::unique_lock& lck) const {
+checkLock(lck);
+return queue_.empty();
+  }
+
+  mutable std::mutex mtx_;
+ private:
+  std::deque queue_;
+};
+
+
+// A ConcurrentQueue extended with a condition variable to be able to block 
and wait for incoming data
 
 Review comment:
   I would like to add:
   - `stop` interrupts all consumers without a chance to consume remaining 
elements in the queue
   - started means queue elements can be consumed/dequeued. 
   - It's possible to enqueue elements regardless of the running state.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400043883
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -29,11 +29,12 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
+
+// Provides a queue API and guarantees no race conditions in case of multiple 
producers and consumers.
 template 
 class ConcurrentQueue {
  public:
-  ConcurrentQueue() = default;
-  virtual ~ConcurrentQueue() = default;
+  explicit ConcurrentQueue() = default;
 
 Review comment:
   I don't see the need to make this explicit, although it doesn't hurt much. 
It prevents code like this:
   `ConcurrentQueue make_queue() { return {}; }`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-28 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399652688
 
 

 ##
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
+
+TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
+  utils::ConcurrentQueue queue;
+  std::vector results;
+
+  std::thread producer([]() {
+  queue.enqueue("ba");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
+  queue.enqueue("dum");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
+  queue.enqueue("tss");
+});
+
+  std::thread consumer([, ]() {
+ while (results.size() < 3) {
+   std::string s;
+   if (queue.tryDequeue(s)) {
+ results.push_back(s);
+   } else {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+   }
+ }
+});
+
+  producer.join();
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+
+TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") {
+  utils::ConditionConcurrentQueue queue(true);
+  std::vector results;
+
+  std::thread producer([]() {
+queue.enqueue("ba");
+std::this_thread::sleep_for(std::chrono::milliseconds(3));
+queue.enqueue("dum");
+std::this_thread::sleep_for(std::chrono::milliseconds(3));
+queue.enqueue("tss");
+  });
+
+  std::thread consumer([, ]() {
+std::string s;
+while (queue.dequeue(s)) {
+  results.push_back(s);
+}
+  });
+
+  producer.join();
+
+  queue.stop();
+
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+
+/* In this testcase the consumer thread puts back all items to the queue to 
consume again
+ * Even in this case the ones inserted later by the producer  should be 
consumed */
+TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") {
+  utils::ConcurrentQueue queue;
+  std::set results;
+
+  std::thread producer([]() {
+  queue.enqueue("ba");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
+  queue.enqueue("dum");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
+  queue.enqueue("tss");
+});
 
 Review comment:
   there seems to be one extra level of indentation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-28 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399648668
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != ) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
 
 Review comment:
   Just an interesting note: the compiler will optimize out the extra boolean 
of `std::unique_lock` (vs. `std::lock_guard`) if it's not used. The point is 
that the locked version's signature is fine, there's absolutely no reason to 
optimize.
   
   Proof: https://godbolt.org/z/ZgIf0f
   I'm only surprised that the compiler emits the same assembly twice instead 
of labeling the same code twice.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-28 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399644084
 
 

 ##
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
+
+TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
+  utils::ConcurrentQueue queue;
+  std::vector results;
+
+  std::thread producer([]() {
+  queue.enqueue("ba");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
+  queue.enqueue("dum");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
+  queue.enqueue("tss");
+});
+
+  std::thread consumer([, ]() {
+ while (results.size() < 3) {
+   std::string s;
+   if (queue.tryDequeue(s)) {
+ results.push_back(s);
+   } else {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+   }
+ }
+});
+
+  producer.join();
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+
+TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") {
+  utils::ConditionConcurrentQueue queue(true);
+  std::vector results;
+
+  std::thread producer([]() {
+queue.enqueue("ba");
+std::this_thread::sleep_for(std::chrono::milliseconds(3));
+queue.enqueue("dum");
+std::this_thread::sleep_for(std::chrono::milliseconds(3));
+queue.enqueue("tss");
+  });
+
+  std::thread consumer([, ]() {
+std::string s;
+while (queue.dequeue(s)) {
+  results.push_back(s);
+}
+  });
+
+  producer.join();
+
+  queue.stop();
+
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+
+/* In this testcase the consumer thread puts back all items to the queue to 
consume again
+ * Even in this case the ones inserted later by the producer  should be 
consumed */
+TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") {
+  utils::ConcurrentQueue queue;
+  std::set results;
+
+  std::thread producer([]() {
+  queue.enqueue("ba");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
+  queue.enqueue("dum");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
+  queue.enqueue("tss");
+});
+
+  std::thread consumer([, ]() {
+while (results.size() < 3) {
+  std::string s;
+  if (queue.tryDequeue(s)) {
+results.insert(s);
+queue.enqueue(std::move(s));
+  } else {
+std::this_thread::sleep_for(std::chrono::milliseconds(1));
+  }
+}
+  });
+
+  producer.join();
+
+  // Give some time for the consumer to loop over the queue
+  std::this_thread::sleep_for(std::chrono::milliseconds(10));
+
+  consumer.join();
 
 Review comment:
   The wait is redundant since `consumer.join()` will wait until the consumer 
finishes execution which is until it reaches the first "tss".


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-28 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399650366
 
 

 ##
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
 
 Review comment:
   You can either remove the `utils::` prefix from the symbol references from 
this namespace in this file or change this line to `namespace utils = 
org::apache::nifi::minifi::utils;`. I personally have no preference, but abseil 
(google) recommends no using-directives at all.
   
   Abseil: "Tip of the Week #153: Don't use using-directives" 
https://abseil.io/tips/153
   
   The C++ Core Guidelines allow this usage of using-directives:
   http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Rs-using


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-28 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399639532
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
 
 Review comment:
   The virtual destructor is no longer needed with private inheritance.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-28 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399644887
 
 

 ##
 File path: libminifi/src/utils/ThreadPool.cpp
 ##
 @@ -64,8 +64,10 @@ void ThreadPool::run_tasks(std::shared_ptr 
thread) {
 }
   }
 } else {
-  std::unique_lock lock(worker_queue_mutex_);
-  tasks_available_.wait(lock);
+  // This means that the threadpool is running, but the ConcurrentQueue is 
stopped -> shouldn't happen
+  if (running_.load()) {
+std::this_thread::sleep_for(std::chrono::milliseconds(1));
+  }
 
 Review comment:
   If it shouldn't happen and we're handling it, then we should log the event 
and consider throw/abort and generating a core dump for debugging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-28 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399642420
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != ) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue 
should own the lock!");
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
+};
+
+template 
+class ConditionConcurrentQueue : private ConcurrentQueue {
 
 Review comment:
   It would be nice to have comments explaining the purpose of each class and 
the guarantees. This way the user doesn't have to read the code to use the 
classes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-28 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399640042
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != ) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue 
should own the lock!");
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
 
 Review comment:
   Can we mark this API private to extension developers? It would be nice to 
have the freedom to change the underlying container later, e.g. to a lock-free 
queue that keeps order, without breaking API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-28 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399640583
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != ) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue 
should own the lock!");
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
+};
+
+template 
+class ConditionConcurrentQueue : private ConcurrentQueue {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), 
running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = 
delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = 
delete;
+
+  using ConcurrentQueue::size;
+  using ConcurrentQueue::empty;
+  using ConcurrentQueue::clear;
+
+
+  template 
+  void enqueue(Args&&... args) {
+ConcurrentQueue::enqueue(std::forward(args)...);
+if (running_) {
+  cv_.notify_one();
+}
+  }
+  
+  bool dequeue(T& out) {
 
 Review comment:
   I think a non-blocking `tryDequeue` would be valuable in this class. Also, 
it would be nice to be able to wait for a specified duration or until a given 
time point, using `cv_.wait_for()` and `cv_.wait_until()`.
   My idea of the naming is (not very creative, but obvious):
   - Indefinite blocking: `dequeue_wait`
   - Blocking for duration: `dequeue_wait_for`
   - Blocking until timestamp: `dequeue_wait_until`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-28 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399643197
 
 

 ##
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
+
+TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
+  utils::ConcurrentQueue queue;
+  std::vector results;
+
+  std::thread producer([]() {
+  queue.enqueue("ba");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
 
 Review comment:
   I suggest avoiding all sleeps in the producers to speed up the test cases 
and to have the chance to discover race conditions that would only result in 
data races with tighter timings.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-28 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r397293761
 
 

 ##
 File path: libminifi/include/utils/ThreadPool.h
 ##
 @@ -330,20 +331,18 @@ class ThreadPool {
 // integrated power manager
   std::shared_ptr thread_manager_;
   // thread queue for the recently deceased threads.
-  moodycamel::ConcurrentQueue> 
deceased_thread_queue_;
+  ConcurrentQueue> deceased_thread_queue_;
 // worker queue of worker objects
-  moodycamel::ConcurrentQueue> worker_queue_;
+  ConditionConcurrentQueue> worker_queue_;
   std::priority_queue, std::vector>, 
DelayedTaskComparator> delayed_worker_queue_;
-// notification for available work
-  std::condition_variable tasks_available_;
+// mutex to  protect task status and delayed queue   
+  std::mutex worker_queue_mutex_;
 
 Review comment:
   Thanks and nevermind, I mixed the queues.
   
   But there is something that can still be fixed here: whitespaces at the end 
of line 338


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-28 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399639717
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != ) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue 
should own the lock!");
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
+};
+
+template 
+class ConditionConcurrentQueue : private ConcurrentQueue {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), 
running_{start} {};
 
 Review comment:
   - This constructor should be `explicit`
   - redundant semicolon at the end of the line
   - inconsistency between initializers (direct-initialization vs 
direct-list-initialization), but this is not a big deal
   - I think `start = true` is a better default, because RAII and first start 
is initialization IMO, because the class is only useful when started.  If the 
thread pool class uses late initialization (start), it should override the 
default.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-24 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r397169633
 
 

 ##
 File path: libminifi/include/utils/ThreadPool.h
 ##
 @@ -303,8 +303,9 @@ class ThreadPool {
* Drain will notify tasks to stop following notification
*/
   void drain() {
+worker_queue_.stop();
 while (current_workers_ > 0) {
-  tasks_available_.notify_one();
+  std::this_thread::sleep_for(std::chrono::milliseconds(1));
 
 Review comment:
   Could you add a code comment explaining this, including the fact that this 
is a best-effort solution? It will be helpful to future readers of the code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-24 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r397167251
 
 

 ##
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != ) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  virtual size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  virtual void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  return false;
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
+};
+
+template 
+class ConditionConcurrentQueue : public ConcurrentQueue {
 
 Review comment:
   In the case of any "implemented in terms of" relationship, the new class 
would need to provide all of the required functionality and delegate to the 
inner class. This means boilerplate, as you pointed out, regardless of whether 
it's private inheritance or composition.
   
   One way to reduce boilerplate in the case of private inheritance and 
unmodified member functions is via using-declarations. It still requires 
iterating the member functions that are to be exposed without modification, but 
they are shorter than providing wrapper implementations.
   
   In my opinion, cleaner dependencies (in this case) add more value than 
avoided boilerplate, but this is subjective.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-23 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396522340
 
 

 ##
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != ) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  virtual size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  virtual void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  return false;
 
 Review comment:
   If we are in an unknown state, crash and restart by the service manager is 
better than throwing and/or logging something and then continuing somewhere 
somehow, IMHO.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-23 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396535599
 
 

 ##
 File path: libminifi/include/utils/ThreadPool.h
 ##
 @@ -303,8 +303,9 @@ class ThreadPool {
* Drain will notify tasks to stop following notification
*/
   void drain() {
+worker_queue_.stop();
 while (current_workers_ > 0) {
-  tasks_available_.notify_one();
+  std::this_thread::sleep_for(std::chrono::milliseconds(1));
 
 Review comment:
   What is the purpose of this sleep?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-23 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396521172
 
 

 ##
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != ) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  virtual size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  virtual void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  return false;
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
+};
+
+template 
+class ConditionConcurrentQueue : public ConcurrentQueue {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), 
running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = 
delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = 
delete;
+  
+  template 
+  void enqueue(Args&&... args) {
+ConcurrentQueue::enqueue(std::forward(args)...);
+if (running_) {
+  cv_.notify_one();
+}
+  }
+  
+  bool tryDequeue(T& out) override {
+std::unique_lock lck(this->mtx_);
+if (running_ && this->queue_.empty()) {
+  cv_.wait(lck, [this]{ return !running_ || !this->queue_.empty(); });  // 
Only wake up if there is something to return or stopped 
+}
+return running_ && ConcurrentQueue::tryDequeue(lck, out);
+  }
+  
+  void stop() {
+std::lock_guard guard(this->mtx_);
+running_ = false;
+cv_.notify_all();
+  }
+
+  void start() {
 
 Review comment:
   What does `start` mean? Why do I have to start my queue after creating it? 
Same for `isRunning`. If it serves some purpose, we need a comment explaining 
it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-23 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396515674
 
 

 ##
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != ) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  virtual size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  virtual void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  return false;
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
+};
+
+template 
+class ConditionConcurrentQueue : public ConcurrentQueue {
 
 Review comment:
   I think it's not a good design to allow dynamic polymorphism between these 
containers, despite the "is a" relationship being present. My intuition screams 
design issue but can't fully grasp what is the root cause. I'll try to make 
some points below.
   
   Performance: I prefer not to have virtual functions on containers, as most 
or all users will know their requirements against their container. This design 
violates the zero overhead principle by imposing virtual calls on users that 
don't need notification capabilities.
   
   Hierarchy: I think the relationship here is an added 
[aspect](https://en.wikipedia.org/wiki/Aspect-oriented_programming) rather than 
a hierarchy.
   
http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#c120-use-class-hierarchies-to-represent-concepts-with-inherent-hierarchical-structure-only
   
   Inheritance: 
   - The inheritance here is both interface and implementation inheritance.
   
http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#c129-when-designing-a-class-hierarchy-distinguish-between-implementation-inheritance-and-interface-inheritance
   - Leaving the class open for extension makes it possible for subclasses to 
violate the invariants of the base class, violating encapsulation.
   
   Do we really need runtime polymorphism? If yes, I'd make it possible through 
concept-based polymorphism (via type erasure) without affecting the 
implementation.
   
   In either case, I'd make ConcurrentQueue closed and ConditionConcurrentQueue 
a wrapper around ConcurrentQueue ("implemented in terms of") rather than a 
public subclass ("is a"). To access the mutex, I recommend this to be a private 
inheritance with only the mutex of the base class marked as protected, or some 
other way of leaking the mutex.
   
   I feel like my above arguments are weak, and my proposed design is not very 
sound. As always, I welcome discussion.


[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-23 Thread GitBox
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396518120
 
 

 ##
 File path: libminifi/include/utils/ThreadPool.h
 ##
 @@ -330,20 +331,18 @@ class ThreadPool {
 // integrated power manager
   std::shared_ptr thread_manager_;
   // thread queue for the recently deceased threads.
-  moodycamel::ConcurrentQueue> 
deceased_thread_queue_;
+  ConcurrentQueue> deceased_thread_queue_;
 // worker queue of worker objects
-  moodycamel::ConcurrentQueue> worker_queue_;
+  ConditionConcurrentQueue> worker_queue_;
   std::priority_queue, std::vector>, 
DelayedTaskComparator> delayed_worker_queue_;
-// notification for available work
-  std::condition_variable tasks_available_;
+// mutex to  protect task status and delayed queue   
+  std::mutex worker_queue_mutex_;
 
 Review comment:
   Is this mutex still used?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services