szaszm commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r416553035
########## File path: libminifi/include/utils/MinifiConcurrentQueue.h ########## @@ -21,14 +21,32 @@ #include <deque> #include <mutex> #include <condition_variable> +#include <utility> #include <stdexcept> +#include <type_traits> namespace org { namespace apache { namespace nifi { namespace minifi { namespace utils { +namespace detail { +template<typename...> +using void_t = void; + +template<typename /* FunType */, typename T, typename = void> +struct TryMoveCall { + template<typename Fun> + static void call(Fun&& fun, T& elem) { std::forward<Fun>(fun)(elem); } +}; + +template<typename FunType, typename T> +struct TryMoveCall<FunType, T, void_t<decltype(std::declval<FunType>()(std::declval<T>()))>> { Review comment: Please add a comment explaining that due to partial specialization rules this specialization takes precedence whenever the expression inside `void_t` is well-formed, i.e. whenever the function can take an rvalue ([xvalue or prvalue](http://eel.is/c++draft/basic.lval#1)). Something along the lines of "move or pass rvalue ref if possible, fall back to copy or pass by lvalue ref otherwise" is also fine IMO. Metaprogramming magic is usually worth an explanatory comment, especially since not all of our API users are C++ experts. ########## File path: libminifi/include/utils/MinifiConcurrentQueue.h ########## @@ -127,33 +147,58 @@ class ConditionConcurrentQueue : private ConcurrentQueue<T> { using ConcurrentQueue<T>::empty; using ConcurrentQueue<T>::clear; - template <typename... Args> void enqueue(Args&&... args) { ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...); if (running_) { cv_.notify_one(); } } - + bool dequeueWait(T& out) { + if (!running_) { + return false; + } std::unique_lock<std::mutex> lck(this->mtx_); - cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped - return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out); + cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped + return ConcurrentQueue<T>::tryDequeueImpl(lck, out); + } + + template<typename Functor> + bool dequeueApplyWait(Functor&& fun) { + if (!running_) { + return false; + } + std::unique_lock<std::mutex> lck(this->mtx_); + cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped + return ConcurrentQueue<T>::dequeueApplyImpl(lck, std::forward<Functor>(fun)); } template< class Rep, class Period > bool dequeueWaitFor(T& out, const std::chrono::duration<Rep, Period>& time) { + if (!running_) { + return false; + } std::unique_lock<std::mutex> lck(this->mtx_); cv_.wait_for(lck, time, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Wake up with timeout or in case there is something to do - return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out); + return ConcurrentQueue<T>::tryDequeueImpl(lck, out); Review comment: What loop? I think the problem Arpad refers to is the check without the mutex lock at the beginning. Unsynchronized accesses are undefined behavior [according to the standard](http://eel.is/c++draft/intro.races#21), even if it feels like it "should work". ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org