[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400205341 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!"); +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; Review comment: I think that topic is way more general than this PR. I would be happy to a see a doc about the interfaces we consider public API, but that doesn't exist yet. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400204455 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +using namespace org::apache::nifi::minifi::utils; + +TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { + utils::ConcurrentQueue queue; + std::vector results; + + std::thread producer([&queue]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); Review comment: Done, added a new testcases that deals with a lot of elements without sleeps. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400203410 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + + +// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers. +template +class ConcurrentQueue { + public: + explicit ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeueImpl(lck, out); + } + + bool empty() const { +std::unique_lock lck(mtx_); +return queue_.emptyImpl(lck); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + void checkLock(std::unique_lock& lck) const { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!"); +} + } + + bool tryDequeueImpl(std::unique_lock& lck, T& out) { +checkLock(lck); +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + + bool emptyImpl(std::unique_lock& lck) const { +checkLock(lck); +return queue_.empty(); + } + + mutable std::mutex mtx_; + private: + std::deque queue_; +}; + + +// A ConcurrentQueue extended with a condition variable to be able to block and wait for incoming data Review comment: Added This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400203336 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + + +// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers. +template +class ConcurrentQueue { + public: + explicit ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeueImpl(lck, out); + } + + bool empty() const { +std::unique_lock lck(mtx_); +return queue_.emptyImpl(lck); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + void checkLock(std::unique_lock& lck) const { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!"); +} + } + + bool tryDequeueImpl(std::unique_lock& lck, T& out) { +checkLock(lck); +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + + bool emptyImpl(std::unique_lock& lck) const { +checkLock(lck); +return queue_.empty(); + } + + mutable std::mutex mtx_; + private: + std::deque queue_; +}; + + +// A ConcurrentQueue extended with a condition variable to be able to block and wait for incoming data +template +class ConditionConcurrentQueue : private ConcurrentQueue { + public: + explicit ConditionConcurrentQueue(bool start = true) : ConcurrentQueue{}, running_{start} {} + + ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete; + ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete; + + using ConcurrentQueue::size; + using ConcurrentQueue::empty; + using ConcurrentQueue::clear; + + + template + void enqueue(Args&&... args) { +ConcurrentQueue::enqueue(std::forward(args)...); +if (running_) { + cv_.notify_one(); +} + } + + bool dequeueWait(T& out) { +std::unique_lock lck(this->mtx_); +cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped +return running_ && ConcurrentQueue::tryDequeueImpl(lck, out); + } + + template< class Rep, class Period > + bool dequeueWaitFor(T& out, const std::chrono::duration& time) { +std::unique_lock lck(this->mtx_); +cv_.wait_for(lck, time, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Wake up with timeout or in case there is something to do +return running_ && ConcurrentQueue::tryDequeueImpl(lck, out); + } + + bool tryDequeue(T& out) { +std::unique_lock lck(this->mtx_); +return running_ && ConcurrentQ
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400203526 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -29,11 +29,12 @@ namespace nifi { namespace minifi { namespace utils { + +// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers. Review comment: Added This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400041324 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +using namespace org::apache::nifi::minifi::utils; + +TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { + utils::ConcurrentQueue queue; + std::vector results; + + std::thread producer([&queue]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); Review comment: In case there would be no sleeps here, producer most probably just inserts everything before the consumer is even started. Which would also make the last testcases useless, where the goal is to prove that elements inserted later (when some are already looped in the queue) also get to a consumer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400039598 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +using namespace org::apache::nifi::minifi::utils; Review comment: Did the 2nd Btw I don't think using directives are any bad in test cpp files. In headers that get included to production code I would definitely avoid them as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400040217 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +using namespace org::apache::nifi::minifi::utils; + +TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { + utils::ConcurrentQueue queue; + std::vector results; + + std::thread producer([&queue]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); +}); + + std::thread consumer([&queue, &results]() { + while (results.size() < 3) { + std::string s; + if (queue.tryDequeue(s)) { + results.push_back(s); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } +}); + + producer.join(); + consumer.join(); + + REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); +} + + +TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") { + utils::ConditionConcurrentQueue queue(true); + std::vector results; + + std::thread producer([&queue]() { +queue.enqueue("ba"); +std::this_thread::sleep_for(std::chrono::milliseconds(3)); +queue.enqueue("dum"); +std::this_thread::sleep_for(std::chrono::milliseconds(3)); +queue.enqueue("tss"); + }); + + std::thread consumer([&queue, &results]() { +std::string s; +while (queue.dequeue(s)) { + results.push_back(s); +} + }); + + producer.join(); + + queue.stop(); + + consumer.join(); + + REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); +} + + +/* In this testcase the consumer thread puts back all items to the queue to consume again + * Even in this case the ones inserted later by the producer should be consumed */ +TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") { + utils::ConcurrentQueue queue; + std::set results; + + std::thread producer([&queue]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); +}); Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400039479 ## File path: libminifi/src/utils/ThreadPool.cpp ## @@ -64,8 +64,10 @@ void ThreadPool::run_tasks(std::shared_ptr thread) { } } } else { - std::unique_lock lock(worker_queue_mutex_); - tasks_available_.wait(lock); + // This means that the threadpool is running, but the ConcurrentQueue is stopped -> shouldn't happen + if (running_.load()) { +std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } Review comment: There is no logger here. It's not a critical error that worth an exception/dump and it might happen during startup/shutdown for a very short time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400039598 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +using namespace org::apache::nifi::minifi::utils; Review comment: Did the 2nd This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400038906 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +using namespace org::apache::nifi::minifi::utils; + +TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { + utils::ConcurrentQueue queue; + std::vector results; + + std::thread producer([&queue]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); +}); + + std::thread consumer([&queue, &results]() { + while (results.size() < 3) { + std::string s; + if (queue.tryDequeue(s)) { + results.push_back(s); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } +}); + + producer.join(); + consumer.join(); + + REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); +} + + +TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") { + utils::ConditionConcurrentQueue queue(true); + std::vector results; + + std::thread producer([&queue]() { +queue.enqueue("ba"); +std::this_thread::sleep_for(std::chrono::milliseconds(3)); +queue.enqueue("dum"); +std::this_thread::sleep_for(std::chrono::milliseconds(3)); +queue.enqueue("tss"); + }); + + std::thread consumer([&queue, &results]() { +std::string s; +while (queue.dequeue(s)) { + results.push_back(s); +} + }); + + producer.join(); + + queue.stop(); + + consumer.join(); + + REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); +} + + +/* In this testcase the consumer thread puts back all items to the queue to consume again + * Even in this case the ones inserted later by the producer should be consumed */ +TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") { + utils::ConcurrentQueue queue; + std::set results; + + std::thread producer([&queue]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); +}); + + std::thread consumer([&queue, &results]() { +while (results.size() < 3) { + std::string s; + if (queue.tryDequeue(s)) { +results.insert(s); +queue.enqueue(std::move(s)); + } else { +std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } +} + }); + + producer.join(); + + // Give some time for the consumer to loop over the queue + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + consumer.join(); Review comment: Good point, removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400038743 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!"); +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : private ConcurrentQueue { Review comment: Added comments to explain This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400038581 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!"); +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : private ConcurrentQueue { + public: + ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), running_{start} {}; + + ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete; + ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete; + + using ConcurrentQueue::size; + using ConcurrentQueue::empty; + using ConcurrentQueue::clear; + + + template + void enqueue(Args&&... args) { +ConcurrentQueue::enqueue(std::forward(args)...); +if (running_) { + cv_.notify_one(); +} + } + + bool dequeue(T& out) { Review comment: Added wait_for, so now it support: - tryDequeue - DequeueWait - DequeueWaitFor This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400038135 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!"); +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; Review comment: The container is now private, the mutex isn't as you have to acquire that in derived classes as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400037700 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!"); +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : private ConcurrentQueue { + public: + ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), running_{start} {}; Review comment: Fixed them This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400037604 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; Review comment: Removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400037378 ## File path: libminifi/include/utils/ThreadPool.h ## @@ -330,20 +331,18 @@ class ThreadPool { // integrated power manager std::shared_ptr thread_manager_; // thread queue for the recently deceased threads. - moodycamel::ConcurrentQueue> deceased_thread_queue_; + ConcurrentQueue> deceased_thread_queue_; // worker queue of worker objects - moodycamel::ConcurrentQueue> worker_queue_; + ConditionConcurrentQueue> worker_queue_; std::priority_queue, std::vector>, DelayedTaskComparator> delayed_worker_queue_; -// notification for available work - std::condition_variable tasks_available_; +// mutex to protect task status and delayed queue + std::mutex worker_queue_mutex_; Review comment: Removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399540702 ## File path: libminifi/include/utils/ThreadPool.h ## @@ -303,8 +303,9 @@ class ThreadPool { * Drain will notify tasks to stop following notification */ void drain() { +worker_queue_.stop(); while (current_workers_ > 0) { - tasks_available_.notify_one(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); Review comment: Added This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r399540634 ## File path: libminifi/include/utils/ConcurrentQueue.h ## @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + virtual bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + virtual bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + virtual size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + virtual void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + return false; +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : public ConcurrentQueue { Review comment: Changed to private inheritance This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396808517 ## File path: libminifi/include/utils/ConcurrentQueue.h ## @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + virtual bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + virtual bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + virtual size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + virtual void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + return false; +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : public ConcurrentQueue { + public: + ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), running_{start} {}; + + ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete; + ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete; + + template + void enqueue(Args&&... args) { +ConcurrentQueue::enqueue(std::forward(args)...); +if (running_) { + cv_.notify_one(); +} + } + + bool tryDequeue(T& out) override { +std::unique_lock lck(this->mtx_); +if (running_ && this->queue_.empty()) { Review comment: I removed first and tests started to fail, so I reverted it. Now I added some debug prints there to see what's going on, removed the ifs and everything worked. It seems to work now properly, not sure what happened with my build. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396808517 ## File path: libminifi/include/utils/ConcurrentQueue.h ## @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + virtual bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + virtual bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + virtual size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + virtual void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + return false; +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : public ConcurrentQueue { + public: + ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), running_{start} {}; + + ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete; + ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete; + + template + void enqueue(Args&&... args) { +ConcurrentQueue::enqueue(std::forward(args)...); +if (running_) { + cv_.notify_one(); +} + } + + bool tryDequeue(T& out) override { +std::unique_lock lck(this->mtx_); +if (running_ && this->queue_.empty()) { Review comment: I removed first and tests started to fail, so I reverted it. Now I added some debug prints there to see what's going on, removed the if and everything worked. It seems to work now properly, not sure what happened with my build. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396725596 ## File path: libminifi/include/utils/ConcurrentQueue.h ## @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + virtual bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + virtual bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + virtual size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + virtual void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + return false; +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : public ConcurrentQueue { Review comment: When I was reading the private inheritance idea, my first though was "I love it". Although I realised that private inheritance would made me rewrap or copy/paste all the functions (size(), clear()) I would like to inherit from the base class without any modification on them. In case it can somehow be avoided (I found no way, but naturally open to suggestions), I absolutely favour the private inheritance. Otherwise - to avoid code duplication or needless wrappers - I would stay with the current implementation. Virtual functions are removed in the meanwhile to address that part of your concerns. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396722653 ## File path: libminifi/include/utils/ConcurrentQueue.h ## @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + virtual bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + virtual bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + virtual size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + virtual void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + return false; +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : public ConcurrentQueue { + public: + ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), running_{start} {}; + + ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete; + ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete; + + template + void enqueue(Args&&... args) { +ConcurrentQueue::enqueue(std::forward(args)...); +if (running_) { + cv_.notify_one(); +} + } + + bool tryDequeue(T& out) override { +std::unique_lock lck(this->mtx_); +if (running_ && this->queue_.empty()) { Review comment: Unfortunately not. ```wait causes the current thread to block until the condition variable is notified or a spurious wakeup occurs, optionally looping until some predicate is satisfied.``` I think this means the predicate can only be used to wait *again* for further notification in case it's not fulfilled (in case of spurious wakeup for eg), but cannot avoid waiting. I gave it a try, but removing the if significantly changed the behaviour, calls block even if there are tasks in the queue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396722653 ## File path: libminifi/include/utils/ConcurrentQueue.h ## @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + virtual bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + virtual bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + virtual size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + virtual void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + return false; +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : public ConcurrentQueue { + public: + ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), running_{start} {}; + + ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete; + ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete; + + template + void enqueue(Args&&... args) { +ConcurrentQueue::enqueue(std::forward(args)...); +if (running_) { + cv_.notify_one(); +} + } + + bool tryDequeue(T& out) override { +std::unique_lock lck(this->mtx_); +if (running_ && this->queue_.empty()) { Review comment: Unfortunately not. ```wait causes the current thread to block until the condition variable is notified or a spurious wakeup occurs, optionally looping until some predicate is satisfied.``` I think this means the predicate can only be used to wait *again* for further notification in case it's not fulfilled (in case of spurious wakeup for eg), but cannot avoid waiting. I gave it a try, but removing the if significantly changed the behaviour, calls block even if there is are tasks in the queue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396716663 ## File path: libminifi/src/utils/ThreadPool.cpp ## @@ -64,8 +64,10 @@ void ThreadPool::run_tasks(std::shared_ptr thread) { } } } else { - std::unique_lock lock(worker_queue_mutex_); - tasks_available_.wait(lock); + // This means that the threadpool is running, but the ConcurrentQueue is stopped -> shouldn't happen + if (running_.load()) { +std::this_thread::sleep_for(std::chrono::milliseconds(0)); Review comment: Meant to sleep for 1 msec, corrected that. In case this somehow occurs in a transient state (during startup or shutdown), this prevent the worker threads to busy wait (dequeue fails immediately and the workers would retry without sleep). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396714686 ## File path: libminifi/include/utils/ThreadPool.h ## @@ -303,8 +303,9 @@ class ThreadPool { * Drain will notify tasks to stop following notification */ void drain() { +worker_queue_.stop(); while (current_workers_ > 0) { - tasks_available_.notify_one(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); Review comment: @bakaid is right. Stopping the queue wakes up all the worker threads actually doing nothing (waiting for work), but there can be worker threads doing some useful work (ontrigger calls, c2 heartbeats, whatever). These should end in a timely manner and stopping the queue guarantees that they don't pick up new tasks, but I found no better option to wait for that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396714686 ## File path: libminifi/include/utils/ThreadPool.h ## @@ -303,8 +303,9 @@ class ThreadPool { * Drain will notify tasks to stop following notification */ void drain() { +worker_queue_.stop(); while (current_workers_ > 0) { - tasks_available_.notify_one(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); Review comment: @bakaid is right. Stopping the queue wakes up all the worker threads actually doing nothing (waiting for work), but there can be worker threads doing some useful work (ontrigger calls, c2 heartbeats, whatever). These should end in a timely manner, but I found no better option to wait for that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396713099 ## File path: libminifi/include/utils/ConcurrentQueue.h ## @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + virtual bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + virtual bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + virtual size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + virtual void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + return false; +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : public ConcurrentQueue { + public: + ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), running_{start} {}; + + ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete; + ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete; + + template + void enqueue(Args&&... args) { +ConcurrentQueue::enqueue(std::forward(args)...); +if (running_) { + cv_.notify_one(); +} + } + + bool tryDequeue(T& out) override { +std::unique_lock lck(this->mtx_); +if (running_ && this->queue_.empty()) { + cv_.wait(lck, [this]{ return !running_ || !this->queue_.empty(); }); // Only wake up if there is something to return or stopped +} +return running_ && ConcurrentQueue::tryDequeue(lck, out); + } + + void stop() { +std::lock_guard guard(this->mtx_); +running_ = false; +cv_.notify_all(); + } + + void start() { Review comment: You can construct in started state. Running means consumers are allowed to wait for notifications (blocking calls). Stopping prevents it and wakes them up to make sure no threads hang when no further data is expected to arrive. Added a comment to the bool member to explain the purpose. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396711286 ## File path: libminifi/include/utils/ConcurrentQueue.h ## @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + virtual bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + virtual bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + virtual size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + virtual void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + return false; Review comment: Agreed, logic error is thrown here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396711671 ## File path: libminifi/include/utils/ThreadPool.h ## @@ -330,20 +331,18 @@ class ThreadPool { // integrated power manager std::shared_ptr thread_manager_; // thread queue for the recently deceased threads. - moodycamel::ConcurrentQueue> deceased_thread_queue_; + ConcurrentQueue> deceased_thread_queue_; // worker queue of worker objects - moodycamel::ConcurrentQueue> worker_queue_; + ConditionConcurrentQueue> worker_queue_; std::priority_queue, std::vector>, DelayedTaskComparator> delayed_worker_queue_; -// notification for available work - std::condition_variable tasks_available_; +// mutex to protect task status and delayed queue + std::mutex worker_queue_mutex_; Review comment: Yes, still used to protect the delayed queue and the status of current tasks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396711012 ## File path: libminifi/include/utils/ConcurrentQueue.h ## @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + virtual bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + virtual bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + virtual size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + virtual void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + return false; +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : public ConcurrentQueue { + public: + ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), running_{start} {}; + + ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete; + ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete; + + template + void enqueue(Args&&... args) { +ConcurrentQueue::enqueue(std::forward(args)...); +if (running_) { + cv_.notify_one(); +} + } + + bool tryDequeue(T& out) override { +std::unique_lock lck(this->mtx_); +if (running_ && this->queue_.empty()) { + cv_.wait(lck, [this]{ return !running_ || !this->queue_.empty(); }); // Only wake up if there is something to return or stopped +} +return running_ && ConcurrentQueue::tryDequeue(lck, out); + } + + void stop() { +std::lock_guard guard(this->mtx_); +running_ = false; +cv_.notify_all(); + } + + void start() { +std::lock_guard guard(this->mtx_); +if (!running_) { + running_ = true; + if (!this->queue_.empty()) { + cv_.notify_all(); Review comment: Thanks, fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r396710845 ## File path: libminifi/include/utils/ConcurrentQueue.h ## @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + virtual bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + virtual bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + virtual size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + virtual void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + return false; +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : public ConcurrentQueue { + public: + ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), running_{start} {}; + + ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete; + ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete; + + template + void enqueue(Args&&... args) { +ConcurrentQueue::enqueue(std::forward(args)...); +if (running_) { + cv_.notify_one(); +} + } + + bool tryDequeue(T& out) override { Review comment: Renamed to "dequeue", tryDequeue is also available as a non-blocking alternative. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services