szaszm commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r415924891
########## File path: libminifi/include/utils/MinifiConcurrentQueue.h ########## @@ -32,7 +33,7 @@ namespace utils { // Provides a queue API and guarantees no race conditions in case of multiple producers and consumers. // Guarantees elements to be dequeued in order of insertion -template <typename T> +template <typename T, typename = typename std::enable_if<std::is_nothrow_move_constructible<T>::value>::type> Review comment: A `static_assert` instead of `enable_if` would be more clear. We may also want to restrict the requirements to the affected member functions templates only by placing the dependent `static_assert` declarations inside the appropriate member function template bodies, instead of preventing class instantiation. This would make it possible to make restricted use of the queue even for types that satisfy one requirement but not the other. Needed restrictions: - `tryDequeueImpl` should require `std::is_nothrow_move_assignable` or fall back to copy (`std::move_if_noexcept`) to keep exception safety. This issue is not introduced by your changes. - `dequeueApplyImpl` should require `std::is_nothrow_move_constructible` or fall back to copy in a similar fashion. ########## File path: libminifi/include/utils/MinifiConcurrentQueue.h ########## @@ -57,6 +58,12 @@ class ConcurrentQueue { return tryDequeueImpl(lck, out); } + template<typename Functor> + bool dequeueApply(Functor&& fun) { Review comment: Simpler name suggestion: `consume` This naming is also used by boost lockfree queue: https://www.boost.org/doc/libs/1_66_0/doc/html/boost/lockfree/queue.html#id-1_3_22_6_3_1_1_1_11_12-bb ########## File path: libminifi/include/utils/MinifiConcurrentQueue.h ########## @@ -99,6 +106,19 @@ class ConcurrentQueue { return true; } + template<typename Functor> + bool dequeueApplyImpl(std::unique_lock<std::mutex>& lck, Functor&& fun) { + checkLock(lck); + if (queue_.empty()) { + return false; + } + T elem = std::move(queue_.front()); + queue_.pop_front(); + lck.unlock(); + fun(elem); Review comment: We should forward the function object to its call operator. `std::forward<Functor>(fun)(elem);` ########## File path: libminifi/include/utils/MinifiConcurrentQueue.h ########## @@ -127,33 +147,58 @@ class ConditionConcurrentQueue : private ConcurrentQueue<T> { using ConcurrentQueue<T>::empty; using ConcurrentQueue<T>::clear; - template <typename... Args> void enqueue(Args&&... args) { ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...); if (running_) { cv_.notify_one(); } } - + bool dequeueWait(T& out) { + if (!running_) { + return false; + } std::unique_lock<std::mutex> lck(this->mtx_); - cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped - return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out); + cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped + return ConcurrentQueue<T>::tryDequeueImpl(lck, out); + } + + template<typename Functor> + bool dequeueApplyWait(Functor&& fun) { + if (!running_) { + return false; + } + std::unique_lock<std::mutex> lck(this->mtx_); + cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped + return ConcurrentQueue<T>::dequeueApplyImpl(lck, std::forward<Functor>(fun)); } template< class Rep, class Period > bool dequeueWaitFor(T& out, const std::chrono::duration<Rep, Period>& time) { + if (!running_) { + return false; + } std::unique_lock<std::mutex> lck(this->mtx_); cv_.wait_for(lck, time, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Wake up with timeout or in case there is something to do - return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out); + return ConcurrentQueue<T>::tryDequeueImpl(lck, out); Review comment: The new code tries to dequeue anyway when interrupted (stopped) while the old code fails all in-progress operations. Did you intend to change this behavior? Changing it is possible without breaking API as we did not yet release this class. It, however, requires serious thought. If the change is intended, I'd like to ask @arpadboda for review as he designed and implemented this class and knows the related code well. ########## File path: libminifi/include/utils/MinifiConcurrentQueue.h ########## @@ -99,6 +106,19 @@ class ConcurrentQueue { return true; } + template<typename Functor> + bool dequeueApplyImpl(std::unique_lock<std::mutex>& lck, Functor&& fun) { + checkLock(lck); + if (queue_.empty()) { + return false; + } + T elem = std::move(queue_.front()); + queue_.pop_front(); + lck.unlock(); Review comment: I think the postcondition of an unlocked `lck` is worth documenting in a member function comment block. ########## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ########## @@ -123,37 +234,79 @@ TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") { } /* The same test as above, but covering the ConditionConcurrentQueue */ -TEST_CASE("TestConditionConqurrentQueue::testQueueWithReAdd", "[TestConditionQueueWithReAdd]") { +TEST_CASE("TestConditionConqurrentQueue::testConditionQueueWithReAdd", "[TestConditionQueueWithInfReAdd]") { + using namespace MinifiConcurrentQueueTestProducersConsumers; utils::ConditionConcurrentQueue<std::string> queue(true); std::set<std::string> results; - std::thread producer([&queue]() { - queue.enqueue("ba"); - std::this_thread::sleep_for(std::chrono::milliseconds(3)); - queue.enqueue("dum"); - std::this_thread::sleep_for(std::chrono::milliseconds(3)); - queue.enqueue("tss"); - }); + std::thread producer{ getSimpleProducerThread(queue) }; + std::thread consumer{ getInfiniteReaddingDequeueConsumerThread(queue, results) }; + producer.join(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + queue.stop(); + consumer.join(); + REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); +} - std::thread consumer([&queue, &results]() { - std::string s; - while (queue.dequeueWait(s)) { - results.insert(s); - queue.enqueue(std::move(s)); - } - }); +TEST_CASE("TestConditionConqurrentQueue::testConditionQueueDequeueWaitForWithSignal", "[testConditionQueueDequeueWaitForWithSignal]") { + using namespace MinifiConcurrentQueueTestProducersConsumers; + utils::ConditionConcurrentQueue<std::string> queue(true); + std::set<std::string> results; + + std::thread producer{ getSimpleProducerThread(queue) }; + std::thread consumer{ getDequeueWaitForConsumerThread(queue, results) }; producer.join(); + consumer.join(); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); +} - queue.stop(); +TEST_CASE("TestConditionConqurrentQueue::testConditionQueueDequeueApplyWaitForWithSignal", "[testConditionQueueDequeueApplyWaitForWithSignal]") { + using namespace MinifiConcurrentQueueTestProducersConsumers; + utils::ConditionConcurrentQueue<std::string> queue(true); + std::set<std::string> results; + std::thread producer{ getSimpleProducerThread(queue) }; + std::thread consumer{ getDequeueWaitForConsumerThread(queue, results) }; + producer.join(); consumer.join(); REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); } +TEST_CASE("TestConditionConqurrentQueue::testConditionQueueDequeueWaitForNoSignal", "[testConditionQueueDequeueWaitForNoSignal]") { + using namespace MinifiConcurrentQueueTestProducersConsumers; + utils::ConditionConcurrentQueue<std::string> queue(true); + std::set<std::string> results; + std::mutex mutex; + std::unique_lock<std::mutex> lock(mutex); + + std::thread producer{ getBlockedProducerThread(queue, mutex) }; + std::thread consumer{ getDequeueWaitForConsumerThread(queue, results) }; + consumer.join(); + lock.unlock(); + producer.join(); + + REQUIRE(0 == results.size()); +} + +TEST_CASE("TestConditionConqurrentQueue::testConditionQueueDequeueApplyWaitForNoSignal", "[testConditionQueueDequeueApplyWaitForNoSignal]") { + using namespace MinifiConcurrentQueueTestProducersConsumers; + utils::ConditionConcurrentQueue<std::string> queue(true); + std::set<std::string> results; + std::mutex mutex; + std::unique_lock<std::mutex> lock(mutex); + + std::thread producer{ getBlockedProducerThread(queue, mutex) }; + std::thread consumer{ getDequeueWaitForConsumerThread(queue, results) }; Review comment: There are some copy-paste errors with the wrong functions being called. Could you double-check them? This test is identical to the above but its name suggests that it shouldn't be. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org