szaszm commented on a change in pull request #773:
URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r415924891



##########
File path: libminifi/include/utils/MinifiConcurrentQueue.h
##########
@@ -32,7 +33,7 @@ namespace utils {
 
 // Provides a queue API and guarantees no race conditions in case of multiple 
producers and consumers.
 // Guarantees elements to be dequeued in order of insertion
-template <typename T>
+template <typename T, typename = typename 
std::enable_if<std::is_nothrow_move_constructible<T>::value>::type>

Review comment:
       A `static_assert` instead of `enable_if` would be more clear. We may 
also want to restrict the requirements to the affected member functions 
templates only by placing the dependent `static_assert` declarations inside the 
appropriate member function template bodies, instead of preventing class 
instantiation. This would make it possible to make restricted use of the queue 
even for types that satisfy one requirement but not the other.
   
   Needed restrictions:
   - `tryDequeueImpl` should require `std::is_nothrow_move_assignable` or fall 
back to copy (`std::move_if_noexcept`) to keep exception safety. This issue is 
not introduced by your changes.
   - `dequeueApplyImpl` should require `std::is_nothrow_move_constructible` or 
fall back to copy in a similar fashion.

##########
File path: libminifi/include/utils/MinifiConcurrentQueue.h
##########
@@ -57,6 +58,12 @@ class ConcurrentQueue {
     return tryDequeueImpl(lck, out);
   }
 
+  template<typename Functor>
+  bool dequeueApply(Functor&& fun) {

Review comment:
       Simpler name suggestion: `consume`
   
   This naming is also used by boost lockfree queue: 
https://www.boost.org/doc/libs/1_66_0/doc/html/boost/lockfree/queue.html#id-1_3_22_6_3_1_1_1_11_12-bb

##########
File path: libminifi/include/utils/MinifiConcurrentQueue.h
##########
@@ -99,6 +106,19 @@ class ConcurrentQueue {
     return true;
   }
 
+  template<typename Functor>
+  bool dequeueApplyImpl(std::unique_lock<std::mutex>& lck, Functor&& fun) {
+    checkLock(lck);
+    if (queue_.empty()) {
+      return false;
+    }
+    T elem = std::move(queue_.front());
+    queue_.pop_front();
+    lck.unlock();
+    fun(elem);

Review comment:
       We should forward the function object to its call operator. 
`std::forward<Functor>(fun)(elem);`

##########
File path: libminifi/include/utils/MinifiConcurrentQueue.h
##########
@@ -127,33 +147,58 @@ class ConditionConcurrentQueue : private 
ConcurrentQueue<T> {
   using ConcurrentQueue<T>::empty;
   using ConcurrentQueue<T>::clear;
 
-
   template <typename... Args>
   void enqueue(Args&&... args) {
     ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
     if (running_) {
       cv_.notify_one();
     }
   }
-  
+
   bool dequeueWait(T& out) {
+    if (!running_) {
+      return false;
+    }
     std::unique_lock<std::mutex> lck(this->mtx_);
-    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); 
 // Only wake up if there is something to return or stopped 
-    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); 
 // Only wake up if there is something to return or stopped
+    return ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+  }
+
+  template<typename Functor>
+  bool dequeueApplyWait(Functor&& fun) {
+    if (!running_) {
+      return false;
+    }
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); 
 // Only wake up if there is something to return or stopped
+    return ConcurrentQueue<T>::dequeueApplyImpl(lck, 
std::forward<Functor>(fun));
   }
 
   template< class Rep, class Period >
   bool dequeueWaitFor(T& out, const std::chrono::duration<Rep, Period>& time) {
+    if (!running_) {
+      return false;
+    }
     std::unique_lock<std::mutex> lck(this->mtx_);
     cv_.wait_for(lck, time, [this, &lck]{ return !running_ || 
!this->emptyImpl(lck); });  // Wake up with timeout or in case there is 
something to do
-    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+    return ConcurrentQueue<T>::tryDequeueImpl(lck, out);

Review comment:
       The new code tries to dequeue anyway when interrupted (stopped) while 
the old code fails all in-progress operations. Did you intend to change this 
behavior?
   
   Changing it is possible without breaking API as we did not yet release this 
class. It, however, requires serious thought. If the change is intended, I'd 
like to ask @arpadboda for review as he designed and implemented this class and 
knows the related code well.

##########
File path: libminifi/include/utils/MinifiConcurrentQueue.h
##########
@@ -99,6 +106,19 @@ class ConcurrentQueue {
     return true;
   }
 
+  template<typename Functor>
+  bool dequeueApplyImpl(std::unique_lock<std::mutex>& lck, Functor&& fun) {
+    checkLock(lck);
+    if (queue_.empty()) {
+      return false;
+    }
+    T elem = std::move(queue_.front());
+    queue_.pop_front();
+    lck.unlock();

Review comment:
       I think the postcondition of an unlocked `lck` is worth documenting in a 
member function comment block.

##########
File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp
##########
@@ -123,37 +234,79 @@ TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", 
"[TestQueueWithReAdd]") {
 }
 
 /* The same test as above, but covering the ConditionConcurrentQueue */
-TEST_CASE("TestConditionConqurrentQueue::testQueueWithReAdd", 
"[TestConditionQueueWithReAdd]") {
+TEST_CASE("TestConditionConqurrentQueue::testConditionQueueWithReAdd", 
"[TestConditionQueueWithInfReAdd]") {
+  using namespace MinifiConcurrentQueueTestProducersConsumers;
   utils::ConditionConcurrentQueue<std::string> queue(true);
   std::set<std::string> results;
 
-  std::thread producer([&queue]()  {
-    queue.enqueue("ba");
-    std::this_thread::sleep_for(std::chrono::milliseconds(3));
-    queue.enqueue("dum");
-    std::this_thread::sleep_for(std::chrono::milliseconds(3));
-    queue.enqueue("tss");
-  });
+  std::thread producer{ getSimpleProducerThread(queue) };
+  std::thread consumer{ getInfiniteReaddingDequeueConsumerThread(queue, 
results) };
+  producer.join();
+  std::this_thread::sleep_for(std::chrono::milliseconds(10));
+  queue.stop();
+  consumer.join();
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
 
-  std::thread consumer([&queue, &results]() {
-    std::string s;
-    while (queue.dequeueWait(s)) {
-      results.insert(s);
-      queue.enqueue(std::move(s));
-    }
-  });
+TEST_CASE("TestConditionConqurrentQueue::testConditionQueueDequeueWaitForWithSignal",
 "[testConditionQueueDequeueWaitForWithSignal]") {
+  using namespace MinifiConcurrentQueueTestProducersConsumers;
+  utils::ConditionConcurrentQueue<std::string> queue(true);
+  std::set<std::string> results;
+
+  std::thread producer{ getSimpleProducerThread(queue) };
+  std::thread consumer{ getDequeueWaitForConsumerThread(queue, results) };
 
   producer.join();
+  consumer.join();
 
-  std::this_thread::sleep_for(std::chrono::milliseconds(10));
+  REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
+}
 
-  queue.stop();
+TEST_CASE("TestConditionConqurrentQueue::testConditionQueueDequeueApplyWaitForWithSignal",
 "[testConditionQueueDequeueApplyWaitForWithSignal]") {
+  using namespace MinifiConcurrentQueueTestProducersConsumers;
+  utils::ConditionConcurrentQueue<std::string> queue(true);
+  std::set<std::string> results;
 
+  std::thread producer{ getSimpleProducerThread(queue) };
+  std::thread consumer{ getDequeueWaitForConsumerThread(queue, results) };
+  producer.join();
   consumer.join();
 
   REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss");
 }
 
+TEST_CASE("TestConditionConqurrentQueue::testConditionQueueDequeueWaitForNoSignal",
 "[testConditionQueueDequeueWaitForNoSignal]") {
+  using namespace MinifiConcurrentQueueTestProducersConsumers;
+  utils::ConditionConcurrentQueue<std::string> queue(true);
+  std::set<std::string> results;
+  std::mutex mutex;
+  std::unique_lock<std::mutex> lock(mutex);
+
+  std::thread producer{ getBlockedProducerThread(queue, mutex) };
+  std::thread consumer{ getDequeueWaitForConsumerThread(queue, results) };
+  consumer.join();
+  lock.unlock();
+  producer.join();
+
+  REQUIRE(0 == results.size());
+}
+
+TEST_CASE("TestConditionConqurrentQueue::testConditionQueueDequeueApplyWaitForNoSignal",
 "[testConditionQueueDequeueApplyWaitForNoSignal]") {
+  using namespace MinifiConcurrentQueueTestProducersConsumers;
+  utils::ConditionConcurrentQueue<std::string> queue(true);
+  std::set<std::string> results;
+  std::mutex mutex;
+  std::unique_lock<std::mutex> lock(mutex);
+
+  std::thread producer{ getBlockedProducerThread(queue, mutex) };
+  std::thread consumer{ getDequeueWaitForConsumerThread(queue, results) };

Review comment:
       There are some copy-paste errors with the wrong functions being called. 
Could you double-check them?
   
   This test is identical to the above but its name suggests that it shouldn't 
be.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to