arpadboda commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r416046058
########## File path: libminifi/include/utils/MinifiConcurrentQueue.h ########## @@ -127,33 +147,58 @@ class ConditionConcurrentQueue : private ConcurrentQueue<T> { using ConcurrentQueue<T>::empty; using ConcurrentQueue<T>::clear; - template <typename... Args> void enqueue(Args&&... args) { ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...); if (running_) { cv_.notify_one(); } } - + bool dequeueWait(T& out) { + if (!running_) { + return false; + } std::unique_lock<std::mutex> lck(this->mtx_); - cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped - return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out); + cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped + return ConcurrentQueue<T>::tryDequeueImpl(lck, out); + } + + template<typename Functor> + bool dequeueApplyWait(Functor&& fun) { + if (!running_) { + return false; + } + std::unique_lock<std::mutex> lck(this->mtx_); + cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped + return ConcurrentQueue<T>::dequeueApplyImpl(lck, std::forward<Functor>(fun)); } template< class Rep, class Period > bool dequeueWaitFor(T& out, const std::chrono::duration<Rep, Period>& time) { + if (!running_) { + return false; + } std::unique_lock<std::mutex> lck(this->mtx_); cv_.wait_for(lck, time, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Wake up with timeout or in case there is something to do - return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out); + return ConcurrentQueue<T>::tryDequeueImpl(lck, out); Review comment: As first look this wouldn't hurt, but I would prefer checking the running flag in scope of the locked mutex to make sure. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org