szaszm commented on a change in pull request #773:
URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r416553035



##########
File path: libminifi/include/utils/MinifiConcurrentQueue.h
##########
@@ -21,14 +21,32 @@
 #include <deque>
 #include <mutex>
 #include <condition_variable>
+#include <utility>
 #include <stdexcept>
+#include <type_traits>
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace utils {
 
+namespace detail {
+template<typename...>
+using void_t = void;
+
+template<typename /* FunType */, typename T, typename = void>
+struct TryMoveCall {
+    template<typename Fun>
+    static void call(Fun&& fun, T& elem) { std::forward<Fun>(fun)(elem); }
+};
+
+template<typename FunType, typename T>
+struct TryMoveCall<FunType, T, 
void_t<decltype(std::declval<FunType>()(std::declval<T>()))>> {

Review comment:
       Please add a comment explaining that due to partial specialization rules 
this specialization takes precedence whenever the expression inside `void_t` is 
well-formed, i.e. whenever the function can take an rvalue ([xvalue or 
prvalue](http://eel.is/c++draft/basic.lval#1)).
   
   Something along the lines of "move or pass rvalue ref if possible, fall back 
to copy or pass by lvalue ref otherwise" is also fine IMO.
   
   Metaprogramming magic is usually worth an explanatory comment, especially 
since not all of our API users are C++ experts.

##########
File path: libminifi/include/utils/MinifiConcurrentQueue.h
##########
@@ -127,33 +147,58 @@ class ConditionConcurrentQueue : private 
ConcurrentQueue<T> {
   using ConcurrentQueue<T>::empty;
   using ConcurrentQueue<T>::clear;
 
-
   template <typename... Args>
   void enqueue(Args&&... args) {
     ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
     if (running_) {
       cv_.notify_one();
     }
   }
-  
+
   bool dequeueWait(T& out) {
+    if (!running_) {
+      return false;
+    }
     std::unique_lock<std::mutex> lck(this->mtx_);
-    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); 
 // Only wake up if there is something to return or stopped 
-    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); 
 // Only wake up if there is something to return or stopped
+    return ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+  }
+
+  template<typename Functor>
+  bool dequeueApplyWait(Functor&& fun) {
+    if (!running_) {
+      return false;
+    }
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); 
 // Only wake up if there is something to return or stopped
+    return ConcurrentQueue<T>::dequeueApplyImpl(lck, 
std::forward<Functor>(fun));
   }
 
   template< class Rep, class Period >
   bool dequeueWaitFor(T& out, const std::chrono::duration<Rep, Period>& time) {
+    if (!running_) {
+      return false;
+    }
     std::unique_lock<std::mutex> lck(this->mtx_);
     cv_.wait_for(lck, time, [this, &lck]{ return !running_ || 
!this->emptyImpl(lck); });  // Wake up with timeout or in case there is 
something to do
-    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+    return ConcurrentQueue<T>::tryDequeueImpl(lck, out);

Review comment:
       What loop?
   
   I think the problem Arpad refers to is the check without the mutex lock at 
the beginning. Unsynchronized accesses are undefined behavior [according to the 
standard](http://eel.is/c++draft/intro.races#21), even if it feels like it 
"should work".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to