[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400061549 ## File path: libminifi/src/utils/ThreadPool.cpp ## @@ -64,8 +64,10 @@ void ThreadPool::run_tasks(std::shared_ptr thread) { } } } else { - std::unique_lock lock(worker_queue_mutex_); - tasks_available_.wait(lock); + // This means that the threadpool is running, but the ConcurrentQueue is stopped -> shouldn't happen + if (running_.load()) { +std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } Review comment: thx for the explanation. Feel free to close the thread. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400045447 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -29,11 +29,12 @@ namespace nifi { namespace minifi { namespace utils { + +// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers. Review comment: I would add: - Guarantees that the elements are dequeued in the order of insertion This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400043152 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != ) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!"); +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; Review comment: It's fine and expected that we use it in libminifi. What I'd like is that we only support dependency on public members for libminifi API users and treat `protected`/`private` as "internal". The language doesn't provide tools for that (like package-private in Java), so that's an API documentation issue. We could also use pimpl, but that would mean a lot of boilerplate, an extra level of indirection and no inlining, so I'm against that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400060410 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +using namespace org::apache::nifi::minifi::utils; + +TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { + utils::ConcurrentQueue queue; + std::vector results; + + std::thread producer([]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); Review comment: I see your point, but we should cover data races as well. Can we have at least a handful of test cases with rapid insertions, i.e. with no sleep, to cover both problems? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400048134 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -81,25 +82,38 @@ class ConcurrentQueue { : queue_( std::move(other.queue_) ) {} protected: - bool tryDequeue(std::unique_lock& lck, T& out) { + void checkLock(std::unique_lock& lck) const { if (!lck.owns_lock()) { - throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!"); + throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!"); } + } + + bool tryDequeueImpl(std::unique_lock& lck, T& out) { +checkLock(lck); if (queue_.empty()) { return false; } out = std::move(queue_.front()); queue_.pop_front(); return true; } Review comment: I like that you used protected functions taking `std::unique_lock` to make it an error to use the non-locking functions without a lock. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400057782 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + + +// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers. +template +class ConcurrentQueue { + public: + explicit ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != ) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeueImpl(lck, out); + } + + bool empty() const { +std::unique_lock lck(mtx_); +return queue_.emptyImpl(lck); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + void checkLock(std::unique_lock& lck) const { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!"); +} + } + + bool tryDequeueImpl(std::unique_lock& lck, T& out) { +checkLock(lck); +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + + bool emptyImpl(std::unique_lock& lck) const { +checkLock(lck); +return queue_.empty(); + } + + mutable std::mutex mtx_; + private: + std::deque queue_; +}; + + +// A ConcurrentQueue extended with a condition variable to be able to block and wait for incoming data +template +class ConditionConcurrentQueue : private ConcurrentQueue { + public: + explicit ConditionConcurrentQueue(bool start = true) : ConcurrentQueue{}, running_{start} {} + + ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete; + ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete; + + using ConcurrentQueue::size; + using ConcurrentQueue::empty; + using ConcurrentQueue::clear; + + + template + void enqueue(Args&&... args) { +ConcurrentQueue::enqueue(std::forward(args)...); +if (running_) { + cv_.notify_one(); +} + } + + bool dequeueWait(T& out) { +std::unique_lock lck(this->mtx_); +cv_.wait(lck, [this, ]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped +return running_ && ConcurrentQueue::tryDequeueImpl(lck, out); + } + + template< class Rep, class Period > + bool dequeueWaitFor(T& out, const std::chrono::duration& time) { +std::unique_lock lck(this->mtx_); +cv_.wait_for(lck, time, [this, ]{ return !running_ || !this->emptyImpl(lck); }); // Wake up with timeout or in case there is something to do +return running_ && ConcurrentQueue::tryDequeueImpl(lck, out); + } + + bool tryDequeue(T& out) { +std::unique_lock lck(this->mtx_); +return running_ &&
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400051711 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -119,10 +133,22 @@ class ConditionConcurrentQueue : private ConcurrentQueue { } } - bool dequeue(T& out) { + bool dequeueWait(T& out) { +std::unique_lock lck(this->mtx_); +cv_.wait(lck, [this, ]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped +return running_ && ConcurrentQueue::tryDequeueImpl(lck, out); + } + + template< class Rep, class Period > + bool dequeueWaitFor(T& out, const std::chrono::duration& time) { Review comment: I like that you used a template here, not just a concrete duration type like `milliseconds`, which is more common in the code base. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400056195 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + + +// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers. +template +class ConcurrentQueue { + public: + explicit ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != ) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeueImpl(lck, out); + } + + bool empty() const { +std::unique_lock lck(mtx_); +return queue_.emptyImpl(lck); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + void checkLock(std::unique_lock& lck) const { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!"); +} + } + + bool tryDequeueImpl(std::unique_lock& lck, T& out) { +checkLock(lck); +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + + bool emptyImpl(std::unique_lock& lck) const { +checkLock(lck); +return queue_.empty(); + } + + mutable std::mutex mtx_; + private: + std::deque queue_; +}; + + +// A ConcurrentQueue extended with a condition variable to be able to block and wait for incoming data Review comment: I would like to add: - `stop` interrupts all consumers without a chance to consume remaining elements in the queue - started means queue elements can be consumed/dequeued. - It's possible to enqueue elements regardless of the running state. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400043883 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -29,11 +29,12 @@ namespace nifi { namespace minifi { namespace utils { + +// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers. template class ConcurrentQueue { public: - ConcurrentQueue() = default; - virtual ~ConcurrentQueue() = default; + explicit ConcurrentQueue() = default; Review comment: I don't see the need to make this explicit, although it doesn't hurt much. It prevents code like this: `ConcurrentQueue make_queue() { return {}; }` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399652688 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +using namespace org::apache::nifi::minifi::utils; + +TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { + utils::ConcurrentQueue queue; + std::vector results; + + std::thread producer([]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); +}); + + std::thread consumer([, ]() { + while (results.size() < 3) { + std::string s; + if (queue.tryDequeue(s)) { + results.push_back(s); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } +}); + + producer.join(); + consumer.join(); + + REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); +} + + +TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") { + utils::ConditionConcurrentQueue queue(true); + std::vector results; + + std::thread producer([]() { +queue.enqueue("ba"); +std::this_thread::sleep_for(std::chrono::milliseconds(3)); +queue.enqueue("dum"); +std::this_thread::sleep_for(std::chrono::milliseconds(3)); +queue.enqueue("tss"); + }); + + std::thread consumer([, ]() { +std::string s; +while (queue.dequeue(s)) { + results.push_back(s); +} + }); + + producer.join(); + + queue.stop(); + + consumer.join(); + + REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); +} + + +/* In this testcase the consumer thread puts back all items to the queue to consume again + * Even in this case the ones inserted later by the producer should be consumed */ +TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") { + utils::ConcurrentQueue queue; + std::set results; + + std::thread producer([]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); +}); Review comment: there seems to be one extra level of indentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399648668 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != ) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } Review comment: Just an interesting note: the compiler will optimize out the extra boolean of `std::unique_lock` (vs. `std::lock_guard`) if it's not used. The point is that the locked version's signature is fine, there's absolutely no reason to optimize. Proof: https://godbolt.org/z/ZgIf0f I'm only surprised that the compiler emits the same assembly twice instead of labeling the same code twice. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399644084 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +using namespace org::apache::nifi::minifi::utils; + +TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { + utils::ConcurrentQueue queue; + std::vector results; + + std::thread producer([]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); +}); + + std::thread consumer([, ]() { + while (results.size() < 3) { + std::string s; + if (queue.tryDequeue(s)) { + results.push_back(s); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } +}); + + producer.join(); + consumer.join(); + + REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); +} + + +TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") { + utils::ConditionConcurrentQueue queue(true); + std::vector results; + + std::thread producer([]() { +queue.enqueue("ba"); +std::this_thread::sleep_for(std::chrono::milliseconds(3)); +queue.enqueue("dum"); +std::this_thread::sleep_for(std::chrono::milliseconds(3)); +queue.enqueue("tss"); + }); + + std::thread consumer([, ]() { +std::string s; +while (queue.dequeue(s)) { + results.push_back(s); +} + }); + + producer.join(); + + queue.stop(); + + consumer.join(); + + REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); +} + + +/* In this testcase the consumer thread puts back all items to the queue to consume again + * Even in this case the ones inserted later by the producer should be consumed */ +TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") { + utils::ConcurrentQueue queue; + std::set results; + + std::thread producer([]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); +}); + + std::thread consumer([, ]() { +while (results.size() < 3) { + std::string s; + if (queue.tryDequeue(s)) { +results.insert(s); +queue.enqueue(std::move(s)); + } else { +std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } +} + }); + + producer.join(); + + // Give some time for the consumer to loop over the queue + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + consumer.join(); Review comment: The wait is redundant since `consumer.join()` will wait until the consumer finishes execution which is until it reaches the first "tss". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399650366 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +using namespace org::apache::nifi::minifi::utils; Review comment: You can either remove the `utils::` prefix from the symbol references from this namespace in this file or change this line to `namespace utils = org::apache::nifi::minifi::utils;`. I personally have no preference, but abseil (google) recommends no using-directives at all. Abseil: "Tip of the Week #153: Don't use using-directives" https://abseil.io/tips/153 The C++ Core Guidelines allow this usage of using-directives: http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Rs-using This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399639532 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; Review comment: The virtual destructor is no longer needed with private inheritance. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399644887 ## File path: libminifi/src/utils/ThreadPool.cpp ## @@ -64,8 +64,10 @@ void ThreadPool::run_tasks(std::shared_ptr thread) { } } } else { - std::unique_lock lock(worker_queue_mutex_); - tasks_available_.wait(lock); + // This means that the threadpool is running, but the ConcurrentQueue is stopped -> shouldn't happen + if (running_.load()) { +std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } Review comment: If it shouldn't happen and we're handling it, then we should log the event and consider throw/abort and generating a core dump for debugging. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399642420 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != ) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!"); +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : private ConcurrentQueue { Review comment: It would be nice to have comments explaining the purpose of each class and the guarantees. This way the user doesn't have to read the code to use the classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399640042 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != ) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!"); +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; Review comment: Can we mark this API private to extension developers? It would be nice to have the freedom to change the underlying container later, e.g. to a lock-free queue that keeps order, without breaking API. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399640583 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != ) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!"); +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : private ConcurrentQueue { + public: + ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), running_{start} {}; + + ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete; + ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete; + + using ConcurrentQueue::size; + using ConcurrentQueue::empty; + using ConcurrentQueue::clear; + + + template + void enqueue(Args&&... args) { +ConcurrentQueue::enqueue(std::forward(args)...); +if (running_) { + cv_.notify_one(); +} + } + + bool dequeue(T& out) { Review comment: I think a non-blocking `tryDequeue` would be valuable in this class. Also, it would be nice to be able to wait for a specified duration or until a given time point, using `cv_.wait_for()` and `cv_.wait_until()`. My idea of the naming is (not very creative, but obvious): - Indefinite blocking: `dequeue_wait` - Blocking for duration: `dequeue_wait_for` - Blocking until timestamp: `dequeue_wait_until` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399643197 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +using namespace org::apache::nifi::minifi::utils; + +TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { + utils::ConcurrentQueue queue; + std::vector results; + + std::thread producer([]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); Review comment: I suggest avoiding all sleeps in the producers to speed up the test cases and to have the chance to discover race conditions that would only result in data races with tighter timings. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r397293761 ## File path: libminifi/include/utils/ThreadPool.h ## @@ -330,20 +331,18 @@ class ThreadPool { // integrated power manager std::shared_ptr thread_manager_; // thread queue for the recently deceased threads. - moodycamel::ConcurrentQueue> deceased_thread_queue_; + ConcurrentQueue> deceased_thread_queue_; // worker queue of worker objects - moodycamel::ConcurrentQueue> worker_queue_; + ConditionConcurrentQueue> worker_queue_; std::priority_queue, std::vector>, DelayedTaskComparator> delayed_worker_queue_; -// notification for available work - std::condition_variable tasks_available_; +// mutex to protect task status and delayed queue + std::mutex worker_queue_mutex_; Review comment: Thanks and nevermind, I mixed the queues. But there is something that can still be fixed here: whitespaces at the end of line 338 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399639717 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != ) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!"); +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : private ConcurrentQueue { + public: + ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), running_{start} {}; Review comment: - This constructor should be `explicit` - redundant semicolon at the end of the line - inconsistency between initializers (direct-initialization vs direct-list-initialization), but this is not a big deal - I think `start = true` is a better default, because RAII and first start is initialization IMO, because the class is only useful when started. If the thread pool class uses late initialization (start), it should override the default. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r397169633 ## File path: libminifi/include/utils/ThreadPool.h ## @@ -303,8 +303,9 @@ class ThreadPool { * Drain will notify tasks to stop following notification */ void drain() { +worker_queue_.stop(); while (current_workers_ > 0) { - tasks_available_.notify_one(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); Review comment: Could you add a code comment explaining this, including the fact that this is a best-effort solution? It will be helpful to future readers of the code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r397167251 ## File path: libminifi/include/utils/ConcurrentQueue.h ## @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != ) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + virtual bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + virtual bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + virtual size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + virtual void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + return false; +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : public ConcurrentQueue { Review comment: In the case of any "implemented in terms of" relationship, the new class would need to provide all of the required functionality and delegate to the inner class. This means boilerplate, as you pointed out, regardless of whether it's private inheritance or composition. One way to reduce boilerplate in the case of private inheritance and unmodified member functions is via using-declarations. It still requires iterating the member functions that are to be exposed without modification, but they are shorter than providing wrapper implementations. In my opinion, cleaner dependencies (in this case) add more value than avoided boilerplate, but this is subjective. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396522340 ## File path: libminifi/include/utils/ConcurrentQueue.h ## @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != ) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + virtual bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + virtual bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + virtual size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + virtual void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + return false; Review comment: If we are in an unknown state, crash and restart by the service manager is better than throwing and/or logging something and then continuing somewhere somehow, IMHO. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396535599 ## File path: libminifi/include/utils/ThreadPool.h ## @@ -303,8 +303,9 @@ class ThreadPool { * Drain will notify tasks to stop following notification */ void drain() { +worker_queue_.stop(); while (current_workers_ > 0) { - tasks_available_.notify_one(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); Review comment: What is the purpose of this sleep? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396521172 ## File path: libminifi/include/utils/ConcurrentQueue.h ## @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != ) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + virtual bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + virtual bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + virtual size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + virtual void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + return false; +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : public ConcurrentQueue { + public: + ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), running_{start} {}; + + ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete; + ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete; + + template + void enqueue(Args&&... args) { +ConcurrentQueue::enqueue(std::forward(args)...); +if (running_) { + cv_.notify_one(); +} + } + + bool tryDequeue(T& out) override { +std::unique_lock lck(this->mtx_); +if (running_ && this->queue_.empty()) { + cv_.wait(lck, [this]{ return !running_ || !this->queue_.empty(); }); // Only wake up if there is something to return or stopped +} +return running_ && ConcurrentQueue::tryDequeue(lck, out); + } + + void stop() { +std::lock_guard guard(this->mtx_); +running_ = false; +cv_.notify_all(); + } + + void start() { Review comment: What does `start` mean? Why do I have to start my queue after creating it? Same for `isRunning`. If it serves some purpose, we need a comment explaining it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396515674 ## File path: libminifi/include/utils/ConcurrentQueue.h ## @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != ) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + virtual bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + virtual bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + virtual size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + virtual void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + return false; +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : public ConcurrentQueue { Review comment: I think it's not a good design to allow dynamic polymorphism between these containers, despite the "is a" relationship being present. My intuition screams design issue but can't fully grasp what is the root cause. I'll try to make some points below. Performance: I prefer not to have virtual functions on containers, as most or all users will know their requirements against their container. This design violates the zero overhead principle by imposing virtual calls on users that don't need notification capabilities. Hierarchy: I think the relationship here is an added [aspect](https://en.wikipedia.org/wiki/Aspect-oriented_programming) rather than a hierarchy. http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#c120-use-class-hierarchies-to-represent-concepts-with-inherent-hierarchical-structure-only Inheritance: - The inheritance here is both interface and implementation inheritance. http://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#c129-when-designing-a-class-hierarchy-distinguish-between-implementation-inheritance-and-interface-inheritance - Leaving the class open for extension makes it possible for subclasses to violate the invariants of the base class, violating encapsulation. Do we really need runtime polymorphism? If yes, I'd make it possible through concept-based polymorphism (via type erasure) without affecting the implementation. In either case, I'd make ConcurrentQueue closed and ConditionConcurrentQueue a wrapper around ConcurrentQueue ("implemented in terms of") rather than a public subclass ("is a"). To access the mutex, I recommend this to be a private inheritance with only the mutex of the base class marked as protected, or some other way of leaking the mutex. I feel like my above arguments are weak, and my proposed design is not very sound. As always, I welcome discussion.
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396518120 ## File path: libminifi/include/utils/ThreadPool.h ## @@ -330,20 +331,18 @@ class ThreadPool { // integrated power manager std::shared_ptr thread_manager_; // thread queue for the recently deceased threads. - moodycamel::ConcurrentQueue> deceased_thread_queue_; + ConcurrentQueue> deceased_thread_queue_; // worker queue of worker objects - moodycamel::ConcurrentQueue> worker_queue_; + ConditionConcurrentQueue> worker_queue_; std::priority_queue, std::vector>, DelayedTaskComparator> delayed_worker_queue_; -// notification for available work - std::condition_variable tasks_available_; +// mutex to protect task status and delayed queue + std::mutex worker_queue_mutex_; Review comment: Is this mutex still used? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services