[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400205341
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != &other) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue 
should own the lock!");
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
 
 Review comment:
   I think that topic is way more general than this PR.
   
   I would be happy to a see a doc about the interfaces we consider public API, 
but that doesn't exist yet.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400204455
 
 

 ##
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
+
+TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
+  utils::ConcurrentQueue queue;
+  std::vector results;
+
+  std::thread producer([&queue]() {
+  queue.enqueue("ba");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
 
 Review comment:
   Done, added a new testcases that deals with a lot of elements without 
sleeps. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400203410
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+
+// Provides a queue API and guarantees no race conditions in case of multiple 
producers and consumers.
+template 
+class ConcurrentQueue {
+ public:
+  explicit ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != &other) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeueImpl(lck, out);
+  }
+
+  bool empty() const {
+std::unique_lock lck(mtx_);
+return queue_.emptyImpl(lck);
+  }
+
+  size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  void checkLock(std::unique_lock& lck) const {
+if (!lck.owns_lock()) {
+  throw std::logic_error("Caller of protected functions of ConcurrentQueue 
should own the lock!"); 
+}
+  }
+
+  bool tryDequeueImpl(std::unique_lock& lck, T& out) {
+checkLock(lck);
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+
+  bool emptyImpl(std::unique_lock& lck) const {
+checkLock(lck);
+return queue_.empty();
+  }
+
+  mutable std::mutex mtx_;
+ private:
+  std::deque queue_;
+};
+
+
+// A ConcurrentQueue extended with a condition variable to be able to block 
and wait for incoming data
 
 Review comment:
   Added


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400203336
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+
+// Provides a queue API and guarantees no race conditions in case of multiple 
producers and consumers.
+template 
+class ConcurrentQueue {
+ public:
+  explicit ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != &other) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeueImpl(lck, out);
+  }
+
+  bool empty() const {
+std::unique_lock lck(mtx_);
+return queue_.emptyImpl(lck);
+  }
+
+  size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  void checkLock(std::unique_lock& lck) const {
+if (!lck.owns_lock()) {
+  throw std::logic_error("Caller of protected functions of ConcurrentQueue 
should own the lock!"); 
+}
+  }
+
+  bool tryDequeueImpl(std::unique_lock& lck, T& out) {
+checkLock(lck);
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+
+  bool emptyImpl(std::unique_lock& lck) const {
+checkLock(lck);
+return queue_.empty();
+  }
+
+  mutable std::mutex mtx_;
+ private:
+  std::deque queue_;
+};
+
+
+// A ConcurrentQueue extended with a condition variable to be able to block 
and wait for incoming data
+template 
+class ConditionConcurrentQueue : private ConcurrentQueue {
+ public:
+  explicit ConditionConcurrentQueue(bool start = true) : ConcurrentQueue{}, 
running_{start} {}
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = 
delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = 
delete;
+
+  using ConcurrentQueue::size;
+  using ConcurrentQueue::empty;
+  using ConcurrentQueue::clear;
+
+
+  template 
+  void enqueue(Args&&... args) {
+ConcurrentQueue::enqueue(std::forward(args)...);
+if (running_) {
+  cv_.notify_one();
+}
+  }
+  
+  bool dequeueWait(T& out) {
+std::unique_lock lck(this->mtx_);
+cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); 
 // Only wake up if there is something to return or stopped 
+return running_ && ConcurrentQueue::tryDequeueImpl(lck, out);
+  }
+
+  template< class Rep, class Period >
+  bool dequeueWaitFor(T& out, const std::chrono::duration& time) {
+std::unique_lock lck(this->mtx_);
+cv_.wait_for(lck, time, [this, &lck]{ return !running_ || 
!this->emptyImpl(lck); });  // Wake up with timeout or in case there is 
something to do
+return running_ && ConcurrentQueue::tryDequeueImpl(lck, out);
+  }
+
+  bool tryDequeue(T& out) {
+std::unique_lock lck(this->mtx_);
+return running_ && ConcurrentQ

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400203526
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -29,11 +29,12 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
+
+// Provides a queue API and guarantees no race conditions in case of multiple 
producers and consumers.
 
 Review comment:
   Added


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400041324
 
 

 ##
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
+
+TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
+  utils::ConcurrentQueue queue;
+  std::vector results;
+
+  std::thread producer([&queue]() {
+  queue.enqueue("ba");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
 
 Review comment:
   In case  there would be no sleeps here, producer most probably just inserts 
everything before the consumer is even started. 
   Which would also make the last testcases useless, where the goal is to prove 
that elements inserted later (when some are already looped in the queue) also 
get to a consumer. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400039598
 
 

 ##
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
 
 Review comment:
   Did the 2nd
   
   Btw I don't think using directives are any bad in test cpp files.
   In headers that get included to production code I would definitely avoid 
them as well. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400040217
 
 

 ##
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
+
+TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
+  utils::ConcurrentQueue queue;
+  std::vector results;
+
+  std::thread producer([&queue]() {
+  queue.enqueue("ba");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
+  queue.enqueue("dum");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
+  queue.enqueue("tss");
+});
+
+  std::thread consumer([&queue, &results]() {
+ while (results.size() < 3) {
+   std::string s;
+   if (queue.tryDequeue(s)) {
+ results.push_back(s);
+   } else {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+   }
+ }
+});
+
+  producer.join();
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+
+TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") {
+  utils::ConditionConcurrentQueue queue(true);
+  std::vector results;
+
+  std::thread producer([&queue]() {
+queue.enqueue("ba");
+std::this_thread::sleep_for(std::chrono::milliseconds(3));
+queue.enqueue("dum");
+std::this_thread::sleep_for(std::chrono::milliseconds(3));
+queue.enqueue("tss");
+  });
+
+  std::thread consumer([&queue, &results]() {
+std::string s;
+while (queue.dequeue(s)) {
+  results.push_back(s);
+}
+  });
+
+  producer.join();
+
+  queue.stop();
+
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+
+/* In this testcase the consumer thread puts back all items to the queue to 
consume again
+ * Even in this case the ones inserted later by the producer  should be 
consumed */
+TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") {
+  utils::ConcurrentQueue queue;
+  std::set results;
+
+  std::thread producer([&queue]() {
+  queue.enqueue("ba");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
+  queue.enqueue("dum");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
+  queue.enqueue("tss");
+});
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400039479
 
 

 ##
 File path: libminifi/src/utils/ThreadPool.cpp
 ##
 @@ -64,8 +64,10 @@ void ThreadPool::run_tasks(std::shared_ptr 
thread) {
 }
   }
 } else {
-  std::unique_lock lock(worker_queue_mutex_);
-  tasks_available_.wait(lock);
+  // This means that the threadpool is running, but the ConcurrentQueue is 
stopped -> shouldn't happen
+  if (running_.load()) {
+std::this_thread::sleep_for(std::chrono::milliseconds(1));
+  }
 
 Review comment:
   There is no logger here. 
   It's not a critical error that worth an exception/dump and it might happen 
during startup/shutdown for a very short time. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400039598
 
 

 ##
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
 
 Review comment:
   Did the 2nd


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400038906
 
 

 ##
 File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
 ##
 @@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "../TestBase.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "utils/StringUtils.h"
+
+using namespace org::apache::nifi::minifi::utils;
+
+TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") {
+  utils::ConcurrentQueue queue;
+  std::vector results;
+
+  std::thread producer([&queue]() {
+  queue.enqueue("ba");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
+  queue.enqueue("dum");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
+  queue.enqueue("tss");
+});
+
+  std::thread consumer([&queue, &results]() {
+ while (results.size() < 3) {
+   std::string s;
+   if (queue.tryDequeue(s)) {
+ results.push_back(s);
+   } else {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+   }
+ }
+});
+
+  producer.join();
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+
+TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") {
+  utils::ConditionConcurrentQueue queue(true);
+  std::vector results;
+
+  std::thread producer([&queue]() {
+queue.enqueue("ba");
+std::this_thread::sleep_for(std::chrono::milliseconds(3));
+queue.enqueue("dum");
+std::this_thread::sleep_for(std::chrono::milliseconds(3));
+queue.enqueue("tss");
+  });
+
+  std::thread consumer([&queue, &results]() {
+std::string s;
+while (queue.dequeue(s)) {
+  results.push_back(s);
+}
+  });
+
+  producer.join();
+
+  queue.stop();
+
+  consumer.join();
+
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
+
+
+/* In this testcase the consumer thread puts back all items to the queue to 
consume again
+ * Even in this case the ones inserted later by the producer  should be 
consumed */
+TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") {
+  utils::ConcurrentQueue queue;
+  std::set results;
+
+  std::thread producer([&queue]() {
+  queue.enqueue("ba");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
+  queue.enqueue("dum");
+  std::this_thread::sleep_for(std::chrono::milliseconds(3));
+  queue.enqueue("tss");
+});
+
+  std::thread consumer([&queue, &results]() {
+while (results.size() < 3) {
+  std::string s;
+  if (queue.tryDequeue(s)) {
+results.insert(s);
+queue.enqueue(std::move(s));
+  } else {
+std::this_thread::sleep_for(std::chrono::milliseconds(1));
+  }
+}
+  });
+
+  producer.join();
+
+  // Give some time for the consumer to loop over the queue
+  std::this_thread::sleep_for(std::chrono::milliseconds(10));
+
+  consumer.join();
 
 Review comment:
   Good point, removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400038743
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != &other) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue 
should own the lock!");
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
+};
+
+template 
+class ConditionConcurrentQueue : private ConcurrentQueue {
 
 Review comment:
   Added comments to explain


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400038581
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != &other) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue 
should own the lock!");
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
+};
+
+template 
+class ConditionConcurrentQueue : private ConcurrentQueue {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), 
running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = 
delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = 
delete;
+
+  using ConcurrentQueue::size;
+  using ConcurrentQueue::empty;
+  using ConcurrentQueue::clear;
+
+
+  template 
+  void enqueue(Args&&... args) {
+ConcurrentQueue::enqueue(std::forward(args)...);
+if (running_) {
+  cv_.notify_one();
+}
+  }
+  
+  bool dequeue(T& out) {
 
 Review comment:
   Added wait_for, so now it support:
   
   - tryDequeue
   - DequeueWait
   - DequeueWaitFor


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400038135
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != &other) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue 
should own the lock!");
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
 
 Review comment:
   The container is now private, the mutex isn't as you have to acquire that in 
derived classes as well. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400037700
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != &other) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue 
should own the lock!");
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
+};
+
+template 
+class ConditionConcurrentQueue : private ConcurrentQueue {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), 
running_{start} {};
 
 Review comment:
   Fixed them


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400037604
 
 

 ##
 File path: libminifi/include/utils/MinifiConcurrentQueue.h
 ##
 @@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
 
 Review comment:
   Removed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-30 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400037378
 
 

 ##
 File path: libminifi/include/utils/ThreadPool.h
 ##
 @@ -330,20 +331,18 @@ class ThreadPool {
 // integrated power manager
   std::shared_ptr thread_manager_;
   // thread queue for the recently deceased threads.
-  moodycamel::ConcurrentQueue> 
deceased_thread_queue_;
+  ConcurrentQueue> deceased_thread_queue_;
 // worker queue of worker objects
-  moodycamel::ConcurrentQueue> worker_queue_;
+  ConditionConcurrentQueue> worker_queue_;
   std::priority_queue, std::vector>, 
DelayedTaskComparator> delayed_worker_queue_;
-// notification for available work
-  std::condition_variable tasks_available_;
+// mutex to  protect task status and delayed queue   
+  std::mutex worker_queue_mutex_;
 
 Review comment:
   Removed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-27 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399540702
 
 

 ##
 File path: libminifi/include/utils/ThreadPool.h
 ##
 @@ -303,8 +303,9 @@ class ThreadPool {
* Drain will notify tasks to stop following notification
*/
   void drain() {
+worker_queue_.stop();
 while (current_workers_ > 0) {
-  tasks_available_.notify_one();
+  std::this_thread::sleep_for(std::chrono::milliseconds(1));
 
 Review comment:
   Added


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-27 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399540634
 
 

 ##
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != &other) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  virtual size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  virtual void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  return false;
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
+};
+
+template 
+class ConditionConcurrentQueue : public ConcurrentQueue {
 
 Review comment:
   Changed to private inheritance


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-23 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396808517
 
 

 ##
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != &other) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  virtual size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  virtual void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  return false;
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
+};
+
+template 
+class ConditionConcurrentQueue : public ConcurrentQueue {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), 
running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = 
delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = 
delete;
+  
+  template 
+  void enqueue(Args&&... args) {
+ConcurrentQueue::enqueue(std::forward(args)...);
+if (running_) {
+  cv_.notify_one();
+}
+  }
+  
+  bool tryDequeue(T& out) override {
+std::unique_lock lck(this->mtx_);
+if (running_ && this->queue_.empty()) {
 
 Review comment:
   I removed first and tests started to fail, so I reverted it. 
   
   Now I added some debug prints there to see what's going on, removed the ifs 
and everything worked. 
   It seems to work now properly, not sure what happened with my build. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-23 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396808517
 
 

 ##
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != &other) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  virtual size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  virtual void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  return false;
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
+};
+
+template 
+class ConditionConcurrentQueue : public ConcurrentQueue {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), 
running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = 
delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = 
delete;
+  
+  template 
+  void enqueue(Args&&... args) {
+ConcurrentQueue::enqueue(std::forward(args)...);
+if (running_) {
+  cv_.notify_one();
+}
+  }
+  
+  bool tryDequeue(T& out) override {
+std::unique_lock lck(this->mtx_);
+if (running_ && this->queue_.empty()) {
 
 Review comment:
   I removed first and tests started to fail, so I reverted it. 
   
   Now I added some debug prints there to see what's going on, removed the if 
and everything worked. 
   It seems to work now properly, not sure what happened with my build. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-23 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396725596
 
 

 ##
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != &other) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  virtual size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  virtual void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  return false;
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
+};
+
+template 
+class ConditionConcurrentQueue : public ConcurrentQueue {
 
 Review comment:
   When I was reading the private inheritance idea, my first though was "I love 
it". 
   
   Although I realised that private inheritance would made me rewrap or 
copy/paste all the functions (size(), clear()) I would like to inherit from the 
base class without any modification on them. 
   
   In case it can somehow be avoided (I found no way, but naturally open to 
suggestions), I absolutely favour the private inheritance. Otherwise - to avoid 
code duplication or needless wrappers - I would stay with the current 
implementation. 
   
   Virtual functions are removed in the meanwhile to address that part of your 
concerns. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-23 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396722653
 
 

 ##
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != &other) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  virtual size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  virtual void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  return false;
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
+};
+
+template 
+class ConditionConcurrentQueue : public ConcurrentQueue {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), 
running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = 
delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = 
delete;
+  
+  template 
+  void enqueue(Args&&... args) {
+ConcurrentQueue::enqueue(std::forward(args)...);
+if (running_) {
+  cv_.notify_one();
+}
+  }
+  
+  bool tryDequeue(T& out) override {
+std::unique_lock lck(this->mtx_);
+if (running_ && this->queue_.empty()) {
 
 Review comment:
   Unfortunately not. 
   ```wait causes the current thread to block until the condition variable is 
notified or a spurious wakeup occurs, optionally looping until some predicate 
is satisfied.```
   I think this means the predicate can only be used to wait *again* for 
further notification in case it's not fulfilled (in case of spurious wakeup for 
eg), but cannot avoid waiting. 
   
   I gave it a try, but removing the if significantly changed the behaviour, 
calls block even if there are tasks in the queue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-23 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396722653
 
 

 ##
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != &other) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  virtual size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  virtual void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  return false;
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
+};
+
+template 
+class ConditionConcurrentQueue : public ConcurrentQueue {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), 
running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = 
delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = 
delete;
+  
+  template 
+  void enqueue(Args&&... args) {
+ConcurrentQueue::enqueue(std::forward(args)...);
+if (running_) {
+  cv_.notify_one();
+}
+  }
+  
+  bool tryDequeue(T& out) override {
+std::unique_lock lck(this->mtx_);
+if (running_ && this->queue_.empty()) {
 
 Review comment:
   Unfortunately not. 
   ```wait causes the current thread to block until the condition variable is 
notified or a spurious wakeup occurs, optionally looping until some predicate 
is satisfied.```
   I think this means the predicate can only be used to wait *again* for 
further notification in case it's not fulfilled (in case of spurious wakeup for 
eg), but cannot avoid waiting. 
   
   I gave it a try, but removing the if significantly changed the behaviour, 
calls block even if there is are tasks in the queue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-23 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396716663
 
 

 ##
 File path: libminifi/src/utils/ThreadPool.cpp
 ##
 @@ -64,8 +64,10 @@ void ThreadPool::run_tasks(std::shared_ptr 
thread) {
 }
   }
 } else {
-  std::unique_lock lock(worker_queue_mutex_);
-  tasks_available_.wait(lock);
+  // This means that the threadpool is running, but the ConcurrentQueue is 
stopped -> shouldn't happen
+  if (running_.load()) {
+std::this_thread::sleep_for(std::chrono::milliseconds(0));
 
 Review comment:
   Meant to sleep for 1 msec, corrected that. 
   
   In case this somehow occurs in a transient state (during startup or 
shutdown), this prevent the worker threads to busy wait (dequeue fails 
immediately and the workers would retry without sleep).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-23 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396714686
 
 

 ##
 File path: libminifi/include/utils/ThreadPool.h
 ##
 @@ -303,8 +303,9 @@ class ThreadPool {
* Drain will notify tasks to stop following notification
*/
   void drain() {
+worker_queue_.stop();
 while (current_workers_ > 0) {
-  tasks_available_.notify_one();
+  std::this_thread::sleep_for(std::chrono::milliseconds(1));
 
 Review comment:
   @bakaid is right. 
   Stopping the queue wakes up all the worker threads actually doing nothing 
(waiting for work), but there can be worker threads doing some useful work 
(ontrigger calls, c2 heartbeats, whatever). These should end in a timely manner 
and stopping the queue guarantees that they don't pick up new tasks, but I 
found no better option to wait for that. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-23 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396714686
 
 

 ##
 File path: libminifi/include/utils/ThreadPool.h
 ##
 @@ -303,8 +303,9 @@ class ThreadPool {
* Drain will notify tasks to stop following notification
*/
   void drain() {
+worker_queue_.stop();
 while (current_workers_ > 0) {
-  tasks_available_.notify_one();
+  std::this_thread::sleep_for(std::chrono::milliseconds(1));
 
 Review comment:
   @bakaid is right. 
   Stopping the queue wakes up all the worker threads actually doing nothing 
(waiting for work), but there can be worker threads doing some useful work 
(ontrigger calls, c2 heartbeats, whatever). These should end in a timely 
manner, but I found no better option to wait for that. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-23 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396713099
 
 

 ##
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != &other) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  virtual size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  virtual void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  return false;
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
+};
+
+template 
+class ConditionConcurrentQueue : public ConcurrentQueue {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), 
running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = 
delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = 
delete;
+  
+  template 
+  void enqueue(Args&&... args) {
+ConcurrentQueue::enqueue(std::forward(args)...);
+if (running_) {
+  cv_.notify_one();
+}
+  }
+  
+  bool tryDequeue(T& out) override {
+std::unique_lock lck(this->mtx_);
+if (running_ && this->queue_.empty()) {
+  cv_.wait(lck, [this]{ return !running_ || !this->queue_.empty(); });  // 
Only wake up if there is something to return or stopped 
+}
+return running_ && ConcurrentQueue::tryDequeue(lck, out);
+  }
+  
+  void stop() {
+std::lock_guard guard(this->mtx_);
+running_ = false;
+cv_.notify_all();
+  }
+
+  void start() {
 
 Review comment:
   You can construct in started state.
   
   Running means consumers are allowed to wait for notifications (blocking 
calls). 
   Stopping prevents it and wakes them up to make sure no threads hang when no 
further data is expected to arrive. 
   
   Added a comment to the bool member to explain the purpose. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-23 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396711286
 
 

 ##
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != &other) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  virtual size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  virtual void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  return false;
 
 Review comment:
   Agreed, logic error is thrown here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-23 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396711671
 
 

 ##
 File path: libminifi/include/utils/ThreadPool.h
 ##
 @@ -330,20 +331,18 @@ class ThreadPool {
 // integrated power manager
   std::shared_ptr thread_manager_;
   // thread queue for the recently deceased threads.
-  moodycamel::ConcurrentQueue> 
deceased_thread_queue_;
+  ConcurrentQueue> deceased_thread_queue_;
 // worker queue of worker objects
-  moodycamel::ConcurrentQueue> worker_queue_;
+  ConditionConcurrentQueue> worker_queue_;
   std::priority_queue, std::vector>, 
DelayedTaskComparator> delayed_worker_queue_;
-// notification for available work
-  std::condition_variable tasks_available_;
+// mutex to  protect task status and delayed queue   
+  std::mutex worker_queue_mutex_;
 
 Review comment:
   Yes, still used to protect the delayed queue and the status of current 
tasks. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-23 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396711012
 
 

 ##
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != &other) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  virtual size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  virtual void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  return false;
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
+};
+
+template 
+class ConditionConcurrentQueue : public ConcurrentQueue {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), 
running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = 
delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = 
delete;
+  
+  template 
+  void enqueue(Args&&... args) {
+ConcurrentQueue::enqueue(std::forward(args)...);
+if (running_) {
+  cv_.notify_one();
+}
+  }
+  
+  bool tryDequeue(T& out) override {
+std::unique_lock lck(this->mtx_);
+if (running_ && this->queue_.empty()) {
+  cv_.wait(lck, [this]{ return !running_ || !this->queue_.empty(); });  // 
Only wake up if there is something to return or stopped 
+}
+return running_ && ConcurrentQueue::tryDequeue(lck, out);
+  }
+  
+  void stop() {
+std::lock_guard guard(this->mtx_);
+running_ = false;
+cv_.notify_all();
+  }
+
+  void start() {
+std::lock_guard guard(this->mtx_);
+if (!running_) {
+  running_ = true;
+  if (!this->queue_.empty()) {
+   cv_.notify_all();
 
 Review comment:
   Thanks, fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool

2020-03-23 Thread GitBox
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove 
moodycamel::concurrentqueue from threadpool
URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396710845
 
 

 ##
 File path: libminifi/include/utils/ConcurrentQueue.h
 ##
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H
+
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template 
+class ConcurrentQueue {
+ public:
+  ConcurrentQueue() = default;
+  virtual ~ConcurrentQueue() = default;
+
+  ConcurrentQueue(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
+  ConcurrentQueue(ConcurrentQueue&& other)
+: ConcurrentQueue(std::move(other), 
std::lock_guard(other.mutex_)) {}
+
+  ConcurrentQueue& operator=(ConcurrentQueue&& other) {
+if (this != &other) {
+  std::lock(mtx_, other.mtx_);
+  std::lock_guard lk1(mtx_, std::adopt_lock);
+  std::lock_guard lk2(other.mtx_, std::adopt_lock);
+  queue_.swap(other.queue_);
+}
+return *this;
+  }
+
+  virtual bool tryDequeue(T& out) {
+std::unique_lock lck(mtx_);
+return tryDequeue(lck, out);
+  }
+
+  virtual bool empty() const {
+std::lock_guard guard(mtx_);
+return queue_.empty();
+  }
+
+  virtual size_t size() const {
+std::lock_guard guard(mtx_);
+return queue_.size();
+  }
+
+  virtual void clear() {
+std::lock_guard guard(mtx_);
+queue_.clear();
+  }
+
+  template 
+  void enqueue(Args&&... args) {
+std::lock_guard guard(mtx_);
+queue_.emplace_back(std::forward(args)...);
+  }
+
+ private:
+   ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&)
+: queue_( std::move(other.queue_) ) {}
+
+ protected:
+  bool tryDequeue(std::unique_lock& lck, T& out) {
+if (!lck.owns_lock()) {
+  return false;
+}
+if (queue_.empty()) {
+  return false;
+}
+out = std::move(queue_.front());
+queue_.pop_front();
+return true;
+  }
+  std::deque queue_;
+  mutable std::mutex mtx_;
+};
+
+template 
+class ConditionConcurrentQueue : public ConcurrentQueue {
+ public:
+  ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), 
running_{start} {};
+  
+  ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
+  ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = 
delete;
+  ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
+  ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = 
delete;
+  
+  template 
+  void enqueue(Args&&... args) {
+ConcurrentQueue::enqueue(std::forward(args)...);
+if (running_) {
+  cv_.notify_one();
+}
+  }
+  
+  bool tryDequeue(T& out) override {
 
 Review comment:
   Renamed to "dequeue", tryDequeue is also available as a non-blocking 
alternative. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services