This is an automated email from the ASF dual-hosted git repository. aboda pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit bab6f93fa5b1e565cace7293bea57cd9372ba197 Author: Arpad Boda <[email protected]> AuthorDate: Wed Feb 12 16:36:25 2020 +0100 MINIFICPP-1158 - Event driven processors can starve each other Signed-off-by: Arpad Boda <[email protected]> Approved by szaszm and bakaid on GH This closes #735 --- extensions/coap/tests/CoapIntegrationBase.h | 2 +- .../integration/UpdateAttributeIntegrationTest.cpp | 30 +- .../standard-processors/processors/GetTCP.cpp | 3 +- extensions/standard-processors/processors/GetTCP.h | 4 +- libminifi/include/CronDrivenSchedulingAgent.h | 10 +- libminifi/include/EventDrivenSchedulingAgent.h | 21 +- libminifi/include/FlowController.h | 30 +- libminifi/include/SchedulingAgent.h | 82 +---- libminifi/include/ThreadedSchedulingAgent.h | 8 +- libminifi/include/TimerDrivenSchedulingAgent.h | 12 +- .../core/controller/ControllerServiceProvider.h | 5 +- .../controller/StandardControllerServiceProvider.h | 14 +- libminifi/include/core/state/UpdateController.h | 4 +- libminifi/include/properties/Configure.h | 1 + libminifi/include/utils/Monitors.h | 174 +++++++++ libminifi/include/utils/ThreadPool.h | 387 +++------------------ libminifi/src/CPPLINT.cfg | 1 - libminifi/src/Configure.cpp | 1 + libminifi/src/CronDrivenSchedulingAgent.cpp | 13 +- libminifi/src/EventDrivenSchedulingAgent.cpp | 38 +- libminifi/src/FlowController.cpp | 33 +- libminifi/src/SchedulingAgent.cpp | 29 +- libminifi/src/ThreadedSchedulingAgent.cpp | 22 +- libminifi/src/TimerDrivenSchedulingAgent.cpp | 13 +- libminifi/src/core/Processor.cpp | 1 + libminifi/src/utils/ThreadPool.cpp | 250 +++++++++++++ libminifi/test/resources/TestUpdateAttribute.yml | 18 +- libminifi/test/unit/BackTraceTests.cpp | 8 +- libminifi/test/unit/ThreadPoolTests.cpp | 8 +- 29 files changed, 630 insertions(+), 592 deletions(-) diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h index 85af8c6..83935bf 100644 --- a/extensions/coap/tests/CoapIntegrationBase.h +++ b/extensions/coap/tests/CoapIntegrationBase.h @@ -34,7 +34,7 @@ int ssl_enable(void *ssl_context, void *user_data) { class CoapIntegrationBase : public IntegrationBase { public: - CoapIntegrationBase(uint64_t waitTime = 60000) + CoapIntegrationBase(uint64_t waitTime = 5000) : IntegrationBase(waitTime), server(nullptr) { } diff --git a/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp b/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp index a525404..40da9e2 100644 --- a/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp +++ b/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp @@ -36,37 +36,25 @@ class TestHarness : public IntegrationBase { public: - TestHarness() { - log_entry_found = false; - } - - void testSetup() { + void testSetup() override { LogTestController::getInstance().setTrace<minifi::FlowController>(); - LogTestController::getInstance().setTrace<core::ProcessSession>(); - LogTestController::getInstance().setTrace<core::ProcessContextExpr>(); + LogTestController::getInstance().setTrace<core::ProcessSession>(); + LogTestController::getInstance().setTrace<core::ProcessContextExpr>(); LogTestController::getInstance().setInfo<processors::LogAttribute>(); } - void cleanup() { - } - - void runAssertions() { - assert(log_entry_found); - } + void cleanup() override {} - void waitToVerifyProcessor() { - // This test takes a while to complete -> wait at most 10 secs - log_entry_found = LogTestController::getInstance().contains("key:route_check_attr value:good", std::chrono::seconds(10)); - log_entry_found = LogTestController::getInstance().contains("key:variable_attribute value:replacement_value", std::chrono::seconds(10)); + void runAssertions() override { + assert(LogTestController::getInstance().contains("key:route_check_attr value:good")); + assert(LogTestController::getInstance().contains("key:variable_attribute value:replacement_value")); + assert(LogTestController::getInstance().contains("ProcessSession rollback", std::chrono::seconds(1)) == false); // No rollback happened } - void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) { + void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) override { // inject the variable into the context. configuration->set("nifi.variable.test", "replacement_value"); } - - protected: - bool log_entry_found; }; int main(int argc, char **argv) { diff --git a/extensions/standard-processors/processors/GetTCP.cpp b/extensions/standard-processors/processors/GetTCP.cpp index afa32dc..5a8ab19 100644 --- a/extensions/standard-processors/processors/GetTCP.cpp +++ b/extensions/standard-processors/processors/GetTCP.cpp @@ -225,8 +225,7 @@ void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, co } } - utils::ThreadPool<int> pool = utils::ThreadPool<int>(concurrent_handlers_); - client_thread_pool_ = std::move(pool); + client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_); client_thread_pool_.start(); running_ = true; diff --git a/extensions/standard-processors/processors/GetTCP.h b/extensions/standard-processors/processors/GetTCP.h index 7b9e0a5..8796669 100644 --- a/extensions/standard-processors/processors/GetTCP.h +++ b/extensions/standard-processors/processors/GetTCP.h @@ -69,9 +69,9 @@ class SocketAfterExecute : public utils::AfterExecute<int> { return false; } - virtual int64_t wait_time() { + virtual std::chrono::milliseconds wait_time() { // wait 500ms - return 500; + return std::chrono::milliseconds(500); } protected: diff --git a/libminifi/include/CronDrivenSchedulingAgent.h b/libminifi/include/CronDrivenSchedulingAgent.h index 0943570..9abe3b5 100644 --- a/libminifi/include/CronDrivenSchedulingAgent.h +++ b/libminifi/include/CronDrivenSchedulingAgent.h @@ -41,16 +41,18 @@ class CronDrivenSchedulingAgent : public ThreadedSchedulingAgent { * Create a new event driven scheduling agent. */ CronDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, - std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration) - : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration) { + std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration, + utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool) + : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) { } // Destructor virtual ~CronDrivenSchedulingAgent() { } // Run function for the thread - uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory); + utils::TaskRescheduleInfo run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, + const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override; - virtual void stop() { + void stop() override { std::lock_guard<std::mutex> locK(mutex_); schedules_.clear(); last_exec_.clear(); diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h index b434de5..295becc 100644 --- a/libminifi/include/EventDrivenSchedulingAgent.h +++ b/libminifi/include/EventDrivenSchedulingAgent.h @@ -20,6 +20,8 @@ #ifndef __EVENT_DRIVEN_SCHEDULING_AGENT_H__ #define __EVENT_DRIVEN_SCHEDULING_AGENT_H__ +#define DEFAULT_TIME_SLICE_MS 500 + #include "core/logging/Logger.h" #include "core/Processor.h" #include "core/ProcessContext.h" @@ -39,14 +41,19 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent { * Create a new event driven scheduling agent. */ EventDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, - std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration) - : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration) { - } - // Destructor - virtual ~EventDrivenSchedulingAgent() { + std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration, + utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool) + : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) { + int slice = configuration->getInt(Configure::nifi_flow_engine_event_driven_time_slice, DEFAULT_TIME_SLICE_MS); + if (slice < 10 || 1000 < slice) { + throw Exception(FLOW_EXCEPTION, std::string(Configure::nifi_flow_engine_event_driven_time_slice) + " is out of reasonable range!"); + } + time_slice_ = std::chrono::milliseconds(slice); } + // Run function for the thread - uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory); + utils::TaskRescheduleInfo run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, + const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override; private: // Prevent default copy constructor and assignment operation @@ -54,6 +61,8 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent { EventDrivenSchedulingAgent(const EventDrivenSchedulingAgent &parent); EventDrivenSchedulingAgent &operator=(const EventDrivenSchedulingAgent &parent); + std::chrono::milliseconds time_slice_; + }; } /* namespace minifi */ diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 2e40bb9..5602452 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -63,9 +63,6 @@ namespace minifi { */ class FlowController : public core::controller::ControllerServiceProvider, public state::StateManager { public: - static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10; - static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5; - /** * Flow controller constructor */ @@ -86,22 +83,6 @@ class FlowController : public core::controller::ControllerServiceProvider, publi // Destructor virtual ~FlowController(); - // Set MAX TimerDrivenThreads - virtual void setMaxTimerDrivenThreads(int number) { - max_timer_driven_threads_ = number; - } - // Get MAX TimerDrivenThreads - virtual int getMaxTimerDrivenThreads() { - return max_timer_driven_threads_; - } - // Set MAX EventDrivenThreads - virtual void setMaxEventDrivenThreads(int number) { - max_event_driven_threads_ = number; - } - // Get MAX EventDrivenThreads - virtual int getMaxEventDrivenThreads() { - return max_event_driven_threads_; - } // Get the provenance repository virtual std::shared_ptr<core::Repository> getProvenanceRepository() { return this->provenance_repo_; @@ -222,7 +203,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi * Enables the controller service services * @param serviceNode service node which will be disabled, along with linked services. */ - virtual std::future<uint64_t> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); /** * Enables controller services @@ -234,7 +215,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi * Disables controller services * @param serviceNode service node which will be disabled, along with linked services. */ - virtual std::future<uint64_t> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); /** * Gets all controller services. @@ -355,11 +336,6 @@ class FlowController : public core::controller::ControllerServiceProvider, publi std::string properties_file_name_; // Root Process Group std::shared_ptr<core::ProcessGroup> root_; - // MAX Timer Driven Threads - int max_timer_driven_threads_; - // MAX Event Driven Threads - int max_event_driven_threads_; - // FlowFile Repo // Whether it is running std::atomic<bool> running_; std::atomic<bool> updating_; @@ -380,6 +356,8 @@ class FlowController : public core::controller::ControllerServiceProvider, publi std::shared_ptr<core::ContentRepository> content_repo_; + // Thread pool for schedulers + utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_; // Flow Engines // Flow Timer Scheduler std::shared_ptr<TimerDrivenSchedulingAgent> timer_scheduler_; diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index 6993302..13d7ded 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -28,6 +28,7 @@ #include <algorithm> #include <thread> #include "utils/CallBackTimer.h" +#include "utils/Monitors.h" #include "utils/TimeUtil.h" #include "utils/ThreadPool.h" #include "utils/BackTrace.h" @@ -49,67 +50,6 @@ namespace apache { namespace nifi { namespace minifi { -/** - * Uses the wait time for a given worker to determine if it is eligible to run - */ -class TimerAwareMonitor : public utils::AfterExecute<uint64_t> { - public: - TimerAwareMonitor(std::atomic<bool> *run_monitor) - : current_wait_(0), - run_monitor_(run_monitor) { - } - explicit TimerAwareMonitor(TimerAwareMonitor &&other) - : AfterExecute(std::move(other)), - run_monitor_(std::move(other.run_monitor_)) { - current_wait_.store(other.current_wait_.load()); - } - virtual bool isFinished(const uint64_t &result) { - current_wait_.store(result); - if (*run_monitor_) { - return false; - } - return true; - } - virtual bool isCancelled(const uint64_t &result) { - if (*run_monitor_) { - return false; - } - return true; - } - /** - * Time to wait before re-running this task if necessary - * @return milliseconds since epoch after which we are eligible to re-run this task. - */ - virtual int64_t wait_time() { - return current_wait_.load(); - } - protected: - - std::atomic<uint64_t> current_wait_; - std::atomic<bool> *run_monitor_; -}; - -class SingleRunMonitor : public TimerAwareMonitor { - public: - SingleRunMonitor(std::atomic<bool> *run_monitor) - : TimerAwareMonitor(run_monitor) { - } - explicit SingleRunMonitor(TimerAwareMonitor &&other) - : TimerAwareMonitor(std::move(other)) { - } - virtual bool isFinished(const uint64_t &result) { - if (result == 0) { - return true; - } else { - current_wait_.store(result); - if (*run_monitor_) { - return false; - } - return true; - } - } -}; - // SchedulingAgent Class class SchedulingAgent { public: @@ -118,25 +58,18 @@ class SchedulingAgent { * Create a new scheduling agent. */ SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo, - std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration) - : admin_yield_duration_(0), + std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration, utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool) + : admin_yield_duration_(), bored_yield_duration_(0), configure_(configuration), content_repo_(content_repo), + thread_pool_(thread_pool), controller_service_provider_(controller_service_provider), logger_(logging::LoggerFactory<SchedulingAgent>::getLogger()), alert_time_(configuration->getInt(Configure::nifi_flow_engine_alert_period, SCHEDULING_WATCHDOG_DEFAULT_ALERT_PERIOD_MS)) { running_ = false; repo_ = repo; flow_repo_ = flow_repo; - /** - * To facilitate traces we cannot use daemon threads -- this could potentially cause blocking on I/O; however, it's a better path - * to be able to debug why an agent doesn't work and still allow a restart via updates in these cases. - */ - auto csThreads = configure_->getInt(Configure::nifi_flow_engine_threads, 2); - auto pool = utils::ThreadPool<uint64_t>(csThreads, false, controller_service_provider, "SchedulingAgent"); - thread_pool_ = std::move(pool); - thread_pool_.start(); if (alert_time_ > std::chrono::milliseconds(0)) { std::function<void(void)> f = std::bind(&SchedulingAgent::watchDogFunc, this); @@ -166,7 +99,6 @@ class SchedulingAgent { // stop virtual void stop() { running_ = false; - thread_pool_.shutdown(); } std::vector<BackTrace> getTraces() { @@ -175,8 +107,8 @@ class SchedulingAgent { void watchDogFunc(); - virtual std::future<uint64_t> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); - virtual std::future<uint64_t> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); + virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); // schedule, overwritten by different DrivenSchedulingAgent virtual void schedule(std::shared_ptr<core::Processor> processor) = 0; // unschedule, overwritten by different DrivenSchedulingAgent @@ -202,7 +134,7 @@ class SchedulingAgent { std::shared_ptr<core::ContentRepository> content_repo_; // thread pool for components. - utils::ThreadPool<uint64_t> thread_pool_; + utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool_; // controller service provider reference std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider_; diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h index ba0998a..c530796 100644 --- a/libminifi/include/ThreadedSchedulingAgent.h +++ b/libminifi/include/ThreadedSchedulingAgent.h @@ -45,8 +45,8 @@ class ThreadedSchedulingAgent : public SchedulingAgent { * Create a new threaded scheduling agent. */ ThreadedSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo, - std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration) - : SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration), + std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration, utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool) + : SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool), logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) { } // Destructor @@ -54,7 +54,7 @@ class ThreadedSchedulingAgent : public SchedulingAgent { } // Run function for the thread - virtual uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, + virtual utils::TaskRescheduleInfo run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) = 0; public: @@ -73,6 +73,8 @@ class ThreadedSchedulingAgent : public SchedulingAgent { ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent); ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent); std::shared_ptr<logging::Logger> logger_; + + std::set<std::string> processors_running_; // Set just for easy usage }; } /* namespace minifi */ diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h index 8398b3a..10aaf77 100644 --- a/libminifi/include/TimerDrivenSchedulingAgent.h +++ b/libminifi/include/TimerDrivenSchedulingAgent.h @@ -38,17 +38,17 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent { * Create a new processor */ TimerDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, - std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure) - : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure), + std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure, + utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool) + : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure, thread_pool), logger_(logging::LoggerFactory<TimerDrivenSchedulingAgent>::getLogger()) { } - // Destructor - virtual ~TimerDrivenSchedulingAgent() { - } + /** * Run function that accepts the processor, context and session factory. */ - uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory); + utils::TaskRescheduleInfo run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, + const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override; private: // Prevent default copy constructor and assignment operation diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h index 18ac5de..6a147e1 100644 --- a/libminifi/include/core/controller/ControllerServiceProvider.h +++ b/libminifi/include/core/controller/ControllerServiceProvider.h @@ -26,6 +26,7 @@ #include "ControllerServiceNode.h" #include "ControllerServiceMap.h" #include "core/ClassLoader.h" +#include "utils/Monitors.h" namespace org { namespace apache { @@ -97,7 +98,7 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo * Enables the provided controller service * @param serviceNode controller service node. */ - virtual std::future<uint64_t> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) = 0; + virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) = 0; /** * Enables the provided controller service nodes @@ -109,7 +110,7 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo * Disables the provided controller service node * @param serviceNode controller service node. */ - virtual std::future<uint64_t> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0; + virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0; /** * Gets a list of all controller services. diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h index cc1d51e..6ce6651 100644 --- a/libminifi/include/core/controller/StandardControllerServiceProvider.h +++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h @@ -103,15 +103,12 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ } - std::future<uint64_t> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) { + std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) { if (serviceNode->canEnable()) { return agent_->enableControllerService(serviceNode); } else { - std::future<uint64_t> no_run = std::async(std::launch::async, []() { - uint64_t ret = 0; - return ret; - }); + std::future<utils::TaskRescheduleInfo> no_run = std::async(std::launch::deferred, utils::TaskRescheduleInfo::Done); return no_run; } } @@ -135,14 +132,11 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ } } - std::future<uint64_t> disableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) { + std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) { if (!IsNullOrEmpty(serviceNode.get()) && serviceNode->enabled()) { return agent_->disableControllerService(serviceNode); } else { - std::future<uint64_t> no_run = std::async(std::launch::async, []() { - uint64_t ret = 0; - return ret; - }); + std::future<utils::TaskRescheduleInfo> no_run = std::async(std::launch::deferred, utils::TaskRescheduleInfo::Done); return no_run; } } diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h index 4d70d99..8c76e58 100644 --- a/libminifi/include/core/state/UpdateController.h +++ b/libminifi/include/core/state/UpdateController.h @@ -150,14 +150,14 @@ class UpdateRunner : public utils::AfterExecute<Update> { return !*running_; } - virtual int64_t wait_time() { + virtual std::chrono::milliseconds wait_time() { return delay_; } protected: std::atomic<bool> *running_; - int64_t delay_; + std::chrono::milliseconds delay_; }; diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h index 89dbe0e..9250b5b 100644 --- a/libminifi/include/properties/Configure.h +++ b/libminifi/include/properties/Configure.h @@ -47,6 +47,7 @@ class Configure : public Properties { static const char *nifi_flow_configuration_file_backup_update; static const char *nifi_flow_engine_threads; static const char *nifi_flow_engine_alert_period; + static const char *nifi_flow_engine_event_driven_time_slice; static const char *nifi_administrative_yield_duration; static const char *nifi_bored_yield_duration; static const char *nifi_graceful_shutdown_seconds; diff --git a/libminifi/include/utils/Monitors.h b/libminifi/include/utils/Monitors.h new file mode 100644 index 0000000..a9ff485 --- /dev/null +++ b/libminifi/include/utils/Monitors.h @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef NIFI_MINIFI_CPP_MONITORS_H +#define NIFI_MINIFI_CPP_MONITORS_H + +#include <chrono> +#include <atomic> +#if defined(WIN32) +#include <future> // This is required to work around a VS2017 bug, see the details below +#endif + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +/** + * Worker task helper that determines + * whether or not we will run + */ +template<typename T> +class AfterExecute { + public: + virtual ~AfterExecute() { + + } + + explicit AfterExecute() { + + } + + explicit AfterExecute(AfterExecute &&other) { + + } + virtual bool isFinished(const T &result) = 0; + virtual bool isCancelled(const T &result) = 0; + /** + * Time to wait before re-running this task if necessary + * @return milliseconds since epoch after which we are eligible to re-run this task. + */ + virtual std::chrono::milliseconds wait_time() = 0; +}; + +/** + * Uses the wait time for a given worker to determine if it is eligible to run + */ +class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> { + public: + TimerAwareMonitor(std::atomic<bool> *run_monitor) + : current_wait_(std::chrono::milliseconds(0)), + run_monitor_(run_monitor) { + } + virtual bool isFinished(const std::chrono::milliseconds &result) override { + current_wait_.store(result); + if (*run_monitor_) { + return false; + } + return true; + } + virtual bool isCancelled(const std::chrono::milliseconds &result) override { + if (*run_monitor_) { + return false; + } + return true; + } + /** + * Time to wait before re-running this task if necessary + * @return milliseconds since epoch after which we are eligible to re-run this task. + */ + virtual std::chrono::milliseconds wait_time() override { + return current_wait_.load(); + } + protected: + + std::atomic<std::chrono::milliseconds> current_wait_; + std::atomic<bool> *run_monitor_; +}; + +class SingleRunMonitor : public utils::AfterExecute<bool>{ + public: + SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100)) + : retry_interval_(retry_interval) { + } + + virtual bool isFinished(const bool &result) override { + return result; + } + virtual bool isCancelled(const bool &result) override { + return false; + } + virtual std::chrono::milliseconds wait_time() override { + return retry_interval_; + } + protected: + const std::chrono::milliseconds retry_interval_; +}; + + +struct TaskRescheduleInfo { + TaskRescheduleInfo(bool result, std::chrono::milliseconds wait_time) + : finished_(result), wait_time_(wait_time){} + std::chrono::milliseconds wait_time_; + bool finished_; + + static TaskRescheduleInfo Done() { + return TaskRescheduleInfo(true, std::chrono::milliseconds(0)); + } + + static TaskRescheduleInfo RetryIn(std::chrono::milliseconds interval) { + return TaskRescheduleInfo(false, interval); + } + + static TaskRescheduleInfo RetryImmediately() { + return TaskRescheduleInfo(false, std::chrono::milliseconds(0)); + } + +#if defined(WIN32) + // https://developercommunity.visualstudio.com/content/problem/60897/c-shared-state-futuresstate-default-constructs-the.html + // Because of this bug we need to have this object default constructible, which makes no sense otherwise. Hack. + private: + TaskRescheduleInfo() : wait_time_(std::chrono::milliseconds(0)), finished_(true) {} + friend class std::_Associated_state<TaskRescheduleInfo>; +#endif +}; + +class ComplexMonitor : public utils::AfterExecute<TaskRescheduleInfo> { + public: + ComplexMonitor() = default; + + virtual bool isFinished(const TaskRescheduleInfo &result) override { + if (result.finished_) { + return true; + } + current_wait_.store(result.wait_time_); + return false; + } + virtual bool isCancelled(const TaskRescheduleInfo &result) override { + return false; + } + /** + * Time to wait before re-running this task if necessary + * @return milliseconds since epoch after which we are eligible to re-run this task. + */ + virtual std::chrono::milliseconds wait_time() override { + return current_wait_.load(); + } + + private: + std::atomic<std::chrono::milliseconds> current_wait_ {std::chrono::milliseconds(0)}; +}; + +} /* namespace utils */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif //NIFI_MINIFI_CPP_MONITORS_H \ No newline at end of file diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index 5bbd3f6..2554dc2 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -30,6 +30,7 @@ #include <functional> #include "BackTrace.h" +#include "Monitors.h" #include "core/expect.h" #include "controllers/ThreadManagementService.h" #include "concurrentqueue.h" @@ -42,33 +43,6 @@ namespace minifi { namespace utils { /** - * Worker task helper that determines - * whether or not we will run - */ -template<typename T> -class AfterExecute { - public: - virtual ~AfterExecute() { - - } - - explicit AfterExecute() { - - } - - explicit AfterExecute(AfterExecute &&other) { - - } - virtual bool isFinished(const T &result) = 0; - virtual bool isCancelled(const T &result) = 0; - /** - * Time to wait before re-running this task if necessary - * @return milliseconds since epoch after which we are eligible to re-run this task. - */ - virtual int64_t wait_time() = 0; -}; - -/** * Worker task * purpose: Provides a wrapper for the functor * and returns a future based on the template argument. @@ -78,7 +52,7 @@ class Worker { public: explicit Worker(std::function<T()> &task, const std::string &identifier, std::unique_ptr<AfterExecute<T>> run_determinant) : identifier_(identifier), - time_slice_(0), + next_exec_time_(std::chrono::steady_clock::now()), task(task), run_determinant_(std::move(run_determinant)) { promise = std::make_shared<std::promise<T>>(); @@ -86,7 +60,7 @@ class Worker { explicit Worker(std::function<T()> &task, const std::string &identifier) : identifier_(identifier), - time_slice_(0), + next_exec_time_(std::chrono::steady_clock::now()), task(task), run_determinant_(nullptr) { promise = std::make_shared<std::promise<T>>(); @@ -94,7 +68,7 @@ class Worker { explicit Worker(const std::string identifier = "") : identifier_(identifier), - time_slice_(0) { + next_exec_time_(std::chrono::steady_clock::now()) { } virtual ~Worker() { @@ -104,9 +78,9 @@ class Worker { /** * Move constructor for worker tasks */ - Worker(Worker &&other) + Worker (Worker &&other) noexcept : identifier_(std::move(other.identifier_)), - time_slice_(std::move(other.time_slice_)), + next_exec_time_(std::move(other.next_exec_time_)), task(std::move(other.task)), run_determinant_(std::move(other.run_determinant_)), promise(other.promise) { @@ -125,7 +99,7 @@ class Worker { promise->set_value(result); return false; } - time_slice_ = increment_time(run_determinant_->wait_time()); + next_exec_time_ += run_determinant_->wait_time(); return true; } @@ -133,59 +107,52 @@ class Worker { identifier_ = identifier; } - virtual uint64_t getTimeSlice() { - return time_slice_; + virtual std::chrono::time_point<std::chrono::steady_clock> getNextExecutionTime() const { + return next_exec_time_; } - virtual uint64_t getWaitTime() { + virtual std::chrono::milliseconds getWaitTime() const { return run_determinant_->wait_time(); } Worker<T>(const Worker<T>&) = delete; - Worker<T>& operator =(const Worker<T>&) = delete; + Worker<T>& operator= (const Worker<T>&) = delete; - Worker<T>& operator =(Worker<T> &&); + Worker<T>& operator= (Worker<T> &&) noexcept; - std::shared_ptr<std::promise<T>> getPromise(); + std::shared_ptr<std::promise<T>> getPromise() const; - const std::string &getIdentifier() { + const std::string &getIdentifier() const { return identifier_; } - protected: - - inline uint64_t increment_time(const uint64_t &time) { - std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now(); - auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count(); - return millis + time; - } - +protected: std::string identifier_; - uint64_t time_slice_; + std::chrono::time_point<std::chrono::steady_clock> next_exec_time_; std::function<T()> task; std::unique_ptr<AfterExecute<T>> run_determinant_; std::shared_ptr<std::promise<T>> promise; }; template<typename T> -class WorkerComparator { +class DelayedTaskComparator { public: bool operator()(Worker<T> &a, Worker<T> &b) { - return a.getTimeSlice() < b.getTimeSlice(); + return a.getNextExecutionTime() > b.getNextExecutionTime(); } }; template<typename T> -Worker<T>& Worker<T>::operator =(Worker<T> && other) { +Worker<T>& Worker<T>::operator =(Worker<T> && other) noexcept { task = std::move(other.task); promise = other.promise; - time_slice_ = std::move(other.time_slice_); + next_exec_time_ = std::move(other.next_exec_time_); identifier_ = std::move(other.identifier_); run_determinant_ = std::move(other.run_determinant_); return *this; } template<typename T> -std::shared_ptr<std::promise<T>> Worker<T>::getPromise() { +std::shared_ptr<std::promise<T>> Worker<T>::getPromise() const { return promise; } @@ -231,18 +198,10 @@ class ThreadPool { thread_manager_ = nullptr; } - ThreadPool(const ThreadPool<T> &&other) - : daemon_threads_(std::move(other.daemon_threads_)), - thread_reduction_count_(0), - max_worker_threads_(std::move(other.max_worker_threads_)), - adjust_threads_(false), - running_(false), - controller_service_provider_(std::move(other.controller_service_provider_)), - thread_manager_(std::move(other.thread_manager_)), - name_(std::move(other.name_)) { - current_workers_ = 0; - task_count_ = 0; - } + ThreadPool(const ThreadPool<T> &other) = delete; + ThreadPool<T>& operator=(const ThreadPool<T> &other) = delete; + ThreadPool(ThreadPool<T> &&other) = delete; + ThreadPool<T>& operator=(ThreadPool<T> &&other) = delete; ~ThreadPool() { shutdown(); @@ -268,8 +227,16 @@ class ThreadPool { /** * Returns true if a task is running. */ - bool isRunning(const std::string &identifier) { - return task_status_[identifier] == true; + bool isTaskRunning(const std::string &identifier) const { + try { + return task_status_.at(identifier) == true; + } catch (const std::out_of_range &e) { + return false; + } + } + + bool isRunning() const { + return running_.load(); } std::vector<BackTrace> getTraces() { @@ -304,44 +271,24 @@ class ThreadPool { */ void setMaxConcurrentTasks(uint16_t max) { std::lock_guard<std::recursive_mutex> lock(manager_mutex_); - if (running_) { + bool was_running = running_; + if (was_running) { shutdown(); } max_worker_threads_ = max; - if (!running_) + if (was_running) start(); } - ThreadPool<T> operator=(const ThreadPool<T> &other) = delete; - ThreadPool(const ThreadPool<T> &other) = delete; - - ThreadPool<T> &operator=(ThreadPool<T> &&other) { + void setControllerServiceProvider(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider) { std::lock_guard<std::recursive_mutex> lock(manager_mutex_); - if (other.running_) { - other.shutdown(); - } - if (running_) { + bool was_running = running_; + if (was_running) { shutdown(); } - max_worker_threads_ = std::move(other.max_worker_threads_); - daemon_threads_ = std::move(other.daemon_threads_); - current_workers_ = 0; - thread_reduction_count_ = 0; - - thread_queue_ = std::move(other.thread_queue_); - worker_queue_ = std::move(other.worker_queue_); - - controller_service_provider_ = std::move(other.controller_service_provider_); - thread_manager_ = std::move(other.thread_manager_); - - adjust_threads_ = false; - - if (!running_) { + controller_service_provider_ = controller_service_provider; + if (was_running) start(); - } - - name_ = other.name_; - return *this; } protected: @@ -372,6 +319,8 @@ class ThreadPool { std::vector<std::shared_ptr<WorkerThread>> thread_queue_; // manager thread std::thread manager_thread_; +// the thread responsible for putting delayed tasks to the worker queue when they had to be put + std::thread delayed_scheduler_thread_; // conditional that's used to adjust the threads std::atomic<bool> adjust_threads_; // atomic running boolean @@ -384,9 +333,11 @@ class ThreadPool { moodycamel::ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_; // worker queue of worker objects moodycamel::ConcurrentQueue<Worker<T>> worker_queue_; - std::priority_queue<Worker<T>, std::vector<Worker<T>>, WorkerComparator<T>> worker_priority_queue_; + std::priority_queue<Worker<T>, std::vector<Worker<T>>, DelayedTaskComparator<T>> delayed_worker_queue_; // notification for available work std::condition_variable tasks_available_; +// notification for new delayed tasks that's before the current ones + std::condition_variable delayed_task_available_; // map to identify if a task should be std::map<std::string, bool> task_status_; // manager mutex @@ -410,247 +361,9 @@ class ThreadPool { * Runs worker tasks */ void run_tasks(std::shared_ptr<WorkerThread> thread); -}; -template<typename T> -bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) { - { - std::unique_lock<std::mutex> lock(worker_queue_mutex_); - task_status_[task.getIdentifier()] = true; - } - future = std::move(task.getPromise()->get_future()); - bool enqueued = worker_queue_.enqueue(std::move(task)); - if (running_) { - tasks_available_.notify_one(); - } - - task_count_++; - - return enqueued; -} - -template<typename T> -void ThreadPool<T>::manageWorkers() { - for (int i = 0; i < max_worker_threads_; i++) { - std::stringstream thread_name; - thread_name << name_ << " #" << i; - auto worker_thread = std::make_shared<WorkerThread>(thread_name.str()); - worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread)); - thread_queue_.push_back(worker_thread); - current_workers_++; - } - - if (daemon_threads_) { - for (auto &thread : thread_queue_) { - thread->thread_.detach(); - } - } - -// likely don't have a thread manager - if (LIKELY(nullptr != thread_manager_)) { - while (running_) { - auto waitperiod = std::chrono::milliseconds(1) * 500; - { - if (thread_manager_->isAboveMax(current_workers_)) { - auto max = thread_manager_->getMaxConcurrentTasks(); - auto differential = current_workers_ - max; - thread_reduction_count_ += differential; - } else if (thread_manager_->shouldReduce()) { - if (current_workers_ > 1) - thread_reduction_count_++; - thread_manager_->reduce(); - } else if (thread_manager_->canIncrease() && max_worker_threads_ - current_workers_ > 0) { // increase slowly - std::unique_lock<std::mutex> lock(worker_queue_mutex_); - auto worker_thread = std::make_shared<WorkerThread>(); - worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread)); - if (daemon_threads_) { - worker_thread->thread_.detach(); - } - thread_queue_.push_back(worker_thread); - current_workers_++; - } - } - { - std::shared_ptr<WorkerThread> thread_ref; - while (deceased_thread_queue_.try_dequeue(thread_ref)) { - std::unique_lock<std::mutex> lock(worker_queue_mutex_); - if (thread_ref->thread_.joinable()) - thread_ref->thread_.join(); - thread_queue_.erase(std::remove(thread_queue_.begin(), thread_queue_.end(), thread_ref), thread_queue_.end()); - } - } - std::this_thread::sleep_for(waitperiod); - } - } else { - for (auto &thread : thread_queue_) { - if (thread->thread_.joinable()) - thread->thread_.join(); - } - } -} -template<typename T> -void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) { - auto waitperiod = std::chrono::milliseconds(1) * 100; - thread->is_running_ = true; - uint64_t wait_decay_ = 0; - uint64_t yield_backoff = 10; // start at 10 ms - while (running_.load()) { - if (UNLIKELY(thread_reduction_count_ > 0)) { - if (--thread_reduction_count_ >= 0) { - deceased_thread_queue_.enqueue(thread); - thread->is_running_ = false; - break; - } else { - thread_reduction_count_++; - } - } - // if we exceed 500ms of wait due to not being able to run any tasks and there are tasks available, meaning - // they are eligible to run per the fact that the thread pool isn't shut down and the tasks are in a runnable state - // BUT they've been continually timesliced, we will lower the wait decay to 100ms and continue incrementing from - // there. This ensures we don't have arbitrarily long sleep cycles. - if (wait_decay_ > 500000000L) { - wait_decay_ = 100000000L; - } - // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible - // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially - // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should - // be more likely to run. This is intentional. - - if (wait_decay_ > 2000) { - std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_)); - } - - if (current_workers_ < max_worker_threads_) { - // we are in a reduced state. due to thread management - // let's institute a backoff up to 500ms - if (yield_backoff < 500) { - yield_backoff += 10; - } - std::this_thread::sleep_for(std::chrono::milliseconds(yield_backoff)); - } else { - yield_backoff = 10; - } - Worker<T> task; - - bool prioritized_task = false; - - if (!prioritized_task) { - if (!worker_queue_.try_dequeue(task)) { - std::unique_lock<std::mutex> lock(worker_queue_mutex_); - if (worker_priority_queue_.size() > 0) { - // this is safe as we are going to immediately pop the queue - while (!worker_priority_queue_.empty()) { - task = std::move(const_cast<Worker<T>&>(worker_priority_queue_.top())); - worker_priority_queue_.pop(); - worker_queue_.enqueue(std::move(task)); - continue; - } - - } - tasks_available_.wait_for(lock, waitperiod); - continue; - } else { - std::unique_lock<std::mutex> lock(worker_queue_mutex_); - if (!task_status_[task.getIdentifier()]) { - continue; - } - } - - bool wait_to_run = false; - if (task.getTimeSlice() > 1) { - double wt = (double) task.getWaitTime(); - auto now = std::chrono::system_clock::now().time_since_epoch(); - auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count(); - - // if our differential is < 10% of the wait time we will not put the task into a wait state - // since requeuing will break the time slice contract. - if ((double) task.getTimeSlice() > ms && ((double) (task.getTimeSlice() - ms)) > (wt * .10)) { - wait_to_run = true; - } - } - // if we have to wait we re-queue the worker. - if (wait_to_run) { - { - std::unique_lock<std::mutex> lock(worker_queue_mutex_); - if (!task_status_[task.getIdentifier()]) { - continue; - } - // put it on the priority queue - worker_priority_queue_.push(std::move(task)); - } - - wait_decay_ += 25; - continue; - } - } - const bool task_renew = task.run(); - wait_decay_ = 0; - if (task_renew) { - - if (UNLIKELY(task_count_ > current_workers_)) { - // even if we have more work to do we will not - std::unique_lock<std::mutex> lock(worker_queue_mutex_); - if (!task_status_[task.getIdentifier()]) { - continue; - } - - worker_priority_queue_.push(std::move(task)); - } else { - worker_queue_.enqueue(std::move(task)); - } - } - } - current_workers_--; -} -template<typename T> -void ThreadPool<T>::start() { - if (nullptr != controller_service_provider_) { - auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager"); - thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr; - } else { - thread_manager_ = nullptr; - } - std::lock_guard<std::recursive_mutex> lock(manager_mutex_); - if (!running_) { - running_ = true; - manager_thread_ = std::move(std::thread(&ThreadPool::manageWorkers, this)); - if (worker_queue_.size_approx() > 0) { - tasks_available_.notify_all(); - } - } -} - -template<typename T> -void ThreadPool<T>::stopTasks(const std::string &identifier) { - std::unique_lock<std::mutex> lock(worker_queue_mutex_); - task_status_[identifier] = false; -} - -template<typename T> -void ThreadPool<T>::shutdown() { - if (running_.load()) { - std::lock_guard<std::recursive_mutex> lock(manager_mutex_); - running_.store(false); - - drain(); - task_status_.clear(); - if (manager_thread_.joinable()) - manager_thread_.join(); - { - std::unique_lock<std::mutex> lock(worker_queue_mutex_); - for(const auto &thread : thread_queue_){ - if (thread->thread_.joinable()) - thread->thread_.join(); - } - thread_queue_.clear(); - current_workers_ = 0; - while (worker_queue_.size_approx() > 0) { - Worker<T> task; - worker_queue_.try_dequeue(task); - } - } - } -} + void manage_delayed_queue(); +}; } /* namespace utils */ } /* namespace minifi */ diff --git a/libminifi/src/CPPLINT.cfg b/libminifi/src/CPPLINT.cfg index 9205687..bba5060 100644 --- a/libminifi/src/CPPLINT.cfg +++ b/libminifi/src/CPPLINT.cfg @@ -1,3 +1,2 @@ -set noparent filter=-build/include_order,-build/include_alpha exclude_files=ResourceClaim.cpp diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index a874a93..38b9c19 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -29,6 +29,7 @@ const char *Configure::nifi_flow_configuration_file_exit_failure = "nifi.flow.co const char *Configure::nifi_flow_configuration_file_backup_update = "nifi.flow.configuration.backup.on.update"; const char *Configure::nifi_flow_engine_threads = "nifi.flow.engine.threads"; const char *Configure::nifi_flow_engine_alert_period = "nifi.flow.engine.alert.period"; +const char *Configure::nifi_flow_engine_event_driven_time_slice = "nifi.flow.engine.event.driven.time.slice"; const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration"; const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration"; const char *Configure::nifi_graceful_shutdown_seconds = "nifi.flowcontroller.graceful.shutdown.period"; diff --git a/libminifi/src/CronDrivenSchedulingAgent.cpp b/libminifi/src/CronDrivenSchedulingAgent.cpp index 41ffa96..53c2522 100644 --- a/libminifi/src/CronDrivenSchedulingAgent.cpp +++ b/libminifi/src/CronDrivenSchedulingAgent.cpp @@ -32,7 +32,7 @@ namespace apache { namespace nifi { namespace minifi { -uint64_t CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, +utils::TaskRescheduleInfo CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { if (this->running_ && processor->isRunning()) { std::chrono::system_clock::time_point leap_nanos; @@ -52,7 +52,7 @@ uint64_t CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> & // we may be woken up a little early so that we can honor our time. // in this case we can return the next time to run with the expectation // that the wakeup mechanism gets more granular. - return std::chrono::duration_cast<std::chrono::milliseconds>(result - from).count(); + return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(result - from)); } } else { Bosma::Cron schedule(processor->getCronPeriod()); @@ -67,16 +67,15 @@ uint64_t CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> & if (processor->isYield()) { // Honor the yield - return processor->getYieldTime(); + return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime())); } else if (shouldYield && this->bored_yield_duration_ > 0) { // No work to do or need to apply back pressure - return this->bored_yield_duration_; + return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(this->bored_yield_duration_)); } } - auto sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(result - from).count(); - return sleep_time; + return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(result - from)); } - return 0; + return utils::TaskRescheduleInfo::Done(); } } /* namespace minifi */ diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp index c56ac58..ef771b9 100644 --- a/libminifi/src/EventDrivenSchedulingAgent.cpp +++ b/libminifi/src/EventDrivenSchedulingAgent.cpp @@ -19,9 +19,6 @@ */ #include "EventDrivenSchedulingAgent.h" #include <chrono> -#include <memory> -#include <thread> -#include <iostream> #include "core/Processor.h" #include "core/ProcessContext.h" #include "core/ProcessSessionFactory.h" @@ -32,28 +29,25 @@ namespace apache { namespace nifi { namespace minifi { -uint64_t EventDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, +utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { - while (this->running_) { - bool shouldYield = this->onTrigger(processor, processContext, sessionFactory); - - if (processor->isYield()) { - // Honor the yield - return processor->getYieldTime(); - } else if (shouldYield && this->bored_yield_duration_ > 0) { - // No work to do or need to apply back pressure - return this->bored_yield_duration_; - } - - // Block until work is available - - processor->waitForWork(1000); - - if (!processor->isWorkAvailable()) { - return 1000; + if (this->running_) { + auto start_time = std::chrono::steady_clock::now(); + // trigger processor until it has work to do, but no more than half a sec + while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) { + bool shouldYield = this->onTrigger(processor, processContext, sessionFactory); + if (processor->isYield()) { + // Honor the yield + return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime())); + } else if (shouldYield) { + // No work to do or need to apply back pressure + return utils::TaskRescheduleInfo::RetryIn( + std::chrono::milliseconds((this->bored_yield_duration_ > 0) ? this->bored_yield_duration_ : 10)); // No work left to do, stand by + } } + return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue work as soon as a thread is available } - return 0; + return utils::TaskRescheduleInfo::Done(); } } /* namespace minifi */ diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 9de7822..31160c8 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -74,8 +74,6 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo std::unique_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<core::ContentRepository> content_repo, const std::string name, bool headless_mode) : core::controller::ControllerServiceProvider(core::getClassName<FlowController>()), root_(nullptr), - max_timer_driven_threads_(0), - max_event_driven_threads_(0), running_(false), updating_(false), c2_enabled_(true), @@ -84,6 +82,7 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo flow_file_repo_(flow_file_repo), protocol_(0), controller_service_map_(std::make_shared<core::controller::ControllerServiceMap>()), + thread_pool_(2, false, nullptr, "Flowcontroller threadpool"), timer_scheduler_(nullptr), event_scheduler_(nullptr), cron_scheduler_(nullptr), @@ -101,14 +100,11 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo } id_generator_->generate(uuid_); setUUID(uuid_); - flow_update_ = false; // Setup the default values if (flow_configuration_ != nullptr) { configuration_filename_ = flow_configuration_->getConfigurationPath(); } - max_event_driven_threads_ = DEFAULT_MAX_EVENT_DRIVEN_THREAD; - max_timer_driven_threads_ = DEFAULT_MAX_TIMER_DRIVEN_THREAD; running_ = false; initialized_ = false; c2_initialized_ = false; @@ -247,6 +243,7 @@ int16_t FlowController::stop(bool force, uint64_t timeToWait) { this->timer_scheduler_->stop(); this->event_scheduler_->stop(); this->cron_scheduler_->stop(); + this->thread_pool_.shutdown(); running_ = false; } return 0; @@ -313,21 +310,25 @@ void FlowController::load(const std::shared_ptr<core::ProcessGroup> &root, bool controller_service_provider_ = flow_configuration_->getControllerServiceProvider(); + auto base_shared_ptr = std::dynamic_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this()); + + if (!thread_pool_.isRunning() || reload) { + thread_pool_.shutdown(); + thread_pool_.setMaxConcurrentTasks(configuration_->getInt(Configure::nifi_flow_engine_threads, 2)); + thread_pool_.setControllerServiceProvider(base_shared_ptr); + thread_pool_.start(); + } + if (nullptr == timer_scheduler_ || reload) { - timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>( - std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_, - configuration_); + timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_); } + if (nullptr == event_scheduler_ || reload) { - event_scheduler_ = std::make_shared<EventDrivenSchedulingAgent>( - std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_, - configuration_); + event_scheduler_ = std::make_shared<EventDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_); } if (nullptr == cron_scheduler_ || reload) { - cron_scheduler_ = std::make_shared<CronDrivenSchedulingAgent>( - std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_, - configuration_); + cron_scheduler_ = std::make_shared<CronDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_); } std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setRootGroup(root_); @@ -769,7 +770,7 @@ void FlowController::removeControllerService(const std::shared_ptr<core::control * Enables the controller service services * @param serviceNode service node which will be disabled, along with linked services. */ -std::future<uint64_t> FlowController::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { +std::future<utils::TaskRescheduleInfo> FlowController::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { return controller_service_provider_->enableControllerService(serviceNode); } @@ -784,7 +785,7 @@ void FlowController::enableControllerServices(std::vector<std::shared_ptr<core:: * Disables controller services * @param serviceNode service node which will be disabled, along with linked services. */ -std::future<uint64_t> FlowController::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { +std::future<utils::TaskRescheduleInfo> FlowController::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { return controller_service_provider_->disableControllerService(serviceNode); } diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp index a8684c3..65a79ae 100644 --- a/libminifi/src/SchedulingAgent.cpp +++ b/libminifi/src/SchedulingAgent.cpp @@ -26,6 +26,7 @@ #include "Exception.h" #include "core/Processor.h" #include "utils/ScopeGuard.h" +#include "utils/GeneralUtils.h" namespace org { namespace apache { @@ -40,41 +41,41 @@ bool SchedulingAgent::hasWorkToDo(std::shared_ptr<core::Processor> processor) { return false; } -std::future<uint64_t> SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { +std::future<utils::TaskRescheduleInfo> SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { logger_->log_info("Enabling CSN in SchedulingAgent %s", serviceNode->getName()); // reference the enable function from serviceNode - std::function<uint64_t()> f_ex = [serviceNode] { + std::function<utils::TaskRescheduleInfo()> f_ex = [serviceNode] { serviceNode->enable(); - return 0; + return utils::TaskRescheduleInfo::Done(); }; // only need to run this once. - std::unique_ptr<SingleRunMonitor> monitor = std::unique_ptr<SingleRunMonitor>(new SingleRunMonitor(&running_)); - utils::Worker<uint64_t> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor)); + auto monitor = utils::make_unique<utils::ComplexMonitor>(); + utils::Worker<utils::TaskRescheduleInfo> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor)); // move the functor into the thread pool. While a future is returned // we aren't terribly concerned with the result. - std::future<uint64_t> future; + std::future<utils::TaskRescheduleInfo> future; thread_pool_.execute(std::move(functor), future); if (future.valid()) future.wait(); return future; } -std::future<uint64_t> SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { +std::future<utils::TaskRescheduleInfo> SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { logger_->log_info("Disabling CSN in SchedulingAgent %s", serviceNode->getName()); // reference the disable function from serviceNode - std::function<uint64_t()> f_ex = [serviceNode] { + std::function<utils::TaskRescheduleInfo()> f_ex = [serviceNode] { serviceNode->disable(); - return 0; + return utils::TaskRescheduleInfo::Done(); }; // only need to run this once. - std::unique_ptr<SingleRunMonitor> monitor = std::unique_ptr<SingleRunMonitor>(new SingleRunMonitor(&running_)); - utils::Worker<uint64_t> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor)); + auto monitor = utils::make_unique<utils::ComplexMonitor>(); + utils::Worker<utils::TaskRescheduleInfo> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor)); // move the functor into the thread pool. While a future is returned // we aren't terribly concerned with the result. - std::future<uint64_t> future; + std::future<utils::TaskRescheduleInfo> future; thread_pool_.execute(std::move(functor), future); if (future.valid()) future.wait(); @@ -121,10 +122,6 @@ bool SchedulingAgent::onTrigger(const std::shared_ptr<core::Processor> &processo try { processor->onTrigger(processContext, sessionFactory); processor->decrementActiveTask(); - } catch (Exception &exception) { - // Normal exception - logger_->log_debug("Caught Exception %s", exception.what()); - processor->decrementActiveTask(); } catch (std::exception &exception) { logger_->log_debug("Caught Exception %s", exception.what()); processor->yield(admin_yield_duration_); diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp index 0b072d8..c096fbb 100644 --- a/libminifi/src/ThreadedSchedulingAgent.cpp +++ b/libminifi/src/ThreadedSchedulingAgent.cpp @@ -35,6 +35,7 @@ #include "core/ProcessContextBuilder.h" #include "core/ProcessSession.h" #include "core/ProcessSessionFactory.h" +#include "utils/GeneralUtils.h" namespace org { namespace apache { @@ -44,7 +45,7 @@ namespace minifi { void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processor) { std::lock_guard<std::mutex> lock(mutex_); - admin_yield_duration_ = 0; + admin_yield_duration_ = 100; // We should prevent burning CPU in case of rollbacks std::string yieldValue; if (configure_->get(Configure::nifi_administrative_yield_duration, yieldValue)) { @@ -67,7 +68,7 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo return; } - if (thread_pool_.isRunning(processor->getUUIDStr())) { + if (thread_pool_.isTaskRunning(processor->getUUIDStr())) { logger_->log_warn("Can not schedule threads for processor %s because there are existing threads running", processor->getName()); return; } @@ -92,25 +93,30 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo // reference the disable function from serviceNode processor->incrementActiveTasks(); - std::function<uint64_t()> f_ex = [agent, processor, processContext, sessionFactory] () { + std::function<utils::TaskRescheduleInfo()> f_ex = [agent, processor, processContext, sessionFactory] () { return agent->run(processor, processContext, sessionFactory); }; // create a functor that will be submitted to the thread pool. - std::unique_ptr<TimerAwareMonitor> monitor = std::unique_ptr<TimerAwareMonitor>(new TimerAwareMonitor(&running_)); - utils::Worker<uint64_t> functor(f_ex, processor->getUUIDStr(), std::move(monitor)); + auto monitor = utils::make_unique<utils::ComplexMonitor>(); + utils::Worker<utils::TaskRescheduleInfo> functor(f_ex, processor->getUUIDStr(), std::move(monitor)); // move the functor into the thread pool. While a future is returned // we aren't terribly concerned with the result. - std::future<uint64_t> future; + std::future<utils::TaskRescheduleInfo> future; thread_pool_.execute(std::move(functor), future); } logger_->log_debug("Scheduled thread %d concurrent workers for for process %s", processor->getMaxConcurrentTasks(), processor->getName()); + processors_running_.insert(processor->getUUIDStr()); return; } void ThreadedSchedulingAgent::stop() { SchedulingAgent::stop(); - thread_pool_.shutdown(); + std::lock_guard<std::mutex> lock(mutex_); + for (const auto& p : processors_running_) { + logger_->log_error("SchedulingAgent is stopped before processor was unscheduled: %s", p); + thread_pool_.stopTasks(p); + } } void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> processor) { @@ -127,6 +133,8 @@ void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> proces processor->clearActiveTask(); processor->setScheduledState(core::STOPPED); + + processors_running_.erase(processor->getUUIDStr()); } } /* namespace minifi */ diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp index 13a3439..1b6b7f6 100644 --- a/libminifi/src/TimerDrivenSchedulingAgent.cpp +++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp @@ -29,20 +29,21 @@ namespace apache { namespace nifi { namespace minifi { -uint64_t TimerDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, +utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { - while (this->running_ && processor->isRunning()) { + if (this->running_ && processor->isRunning()) { bool shouldYield = this->onTrigger(processor, processContext, sessionFactory); if (processor->isYield()) { // Honor the yield - return processor->getYieldTime(); + return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime())); } else if (shouldYield && this->bored_yield_duration_ > 0) { // No work to do or need to apply back pressure - return this->bored_yield_duration_; + return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(this->bored_yield_duration_)); } - return processor->getSchedulingPeriodNano() / 1000000; + return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::nanoseconds(processor->getSchedulingPeriodNano()))); } - return processor->getSchedulingPeriodNano() / 1000000; + return utils::TaskRescheduleInfo::Done(); } } /* namespace minifi */ diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index d3e579f..46d0794 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -268,6 +268,7 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const bool Processor::isWorkAvailable() { // We have work if any incoming connection has work + std::lock_guard<std::mutex> lock(mutex_); bool hasWork = false; try { diff --git a/libminifi/src/utils/ThreadPool.cpp b/libminifi/src/utils/ThreadPool.cpp new file mode 100644 index 0000000..039e136 --- /dev/null +++ b/libminifi/src/utils/ThreadPool.cpp @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/ThreadPool.h" +#include "core/state/StateManager.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template<typename T> +void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) { + thread->is_running_ = true; + while (running_.load()) { + if (UNLIKELY(thread_reduction_count_ > 0)) { + if (--thread_reduction_count_ >= 0) { + deceased_thread_queue_.enqueue(thread); + thread->is_running_ = false; + break; + } else { + thread_reduction_count_++; + } + } + + Worker<T> task; + if (worker_queue_.try_dequeue(task)) { + { + std::unique_lock<std::mutex> lock(worker_queue_mutex_); + if (!task_status_[task.getIdentifier()]) { + continue; + } + } + if (task.run()) { + if (task.getNextExecutionTime() <= std::chrono::steady_clock::now()) { + // it can be rescheduled again as soon as there is a worker available + worker_queue_.enqueue(std::move(task)); + continue; + } + // Task will be put to the delayed queue as next exec time is in the future + std::unique_lock<std::mutex> lock(worker_queue_mutex_); + bool need_to_notify = + delayed_worker_queue_.empty() || + task.getNextExecutionTime() < delayed_worker_queue_.top().getNextExecutionTime(); + + delayed_worker_queue_.push(std::move(task)); + if (need_to_notify) { + delayed_task_available_.notify_all(); + } + } + } else { + std::unique_lock<std::mutex> lock(worker_queue_mutex_); + tasks_available_.wait(lock); + } + } + current_workers_--; +} + +template<typename T> +void ThreadPool<T>::manage_delayed_queue() { + while (running_) { + std::unique_lock<std::mutex> lock(worker_queue_mutex_); + + // Put the tasks ready to run in the worker queue + while (!delayed_worker_queue_.empty() && + delayed_worker_queue_.top().getNextExecutionTime() <= std::chrono::steady_clock::now()) { + // I'm very sorry for this - committee must has been seriously drunk when the interface of prio queue was submitted. + Worker<T> task = std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top())); + delayed_worker_queue_.pop(); + worker_queue_.enqueue(std::move(task)); + tasks_available_.notify_one(); + } + if (delayed_worker_queue_.empty()) { + delayed_task_available_.wait(lock); + } else { + auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>( + delayed_worker_queue_.top().getNextExecutionTime() - std::chrono::steady_clock::now()); + delayed_task_available_.wait_for(lock, (std::max)(wait_time, std::chrono::milliseconds(1))); + } + } +} + +template<typename T> +bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) { + { + std::unique_lock<std::mutex> lock(worker_queue_mutex_); + task_status_[task.getIdentifier()] = true; + } + future = std::move(task.getPromise()->get_future()); + bool enqueued = worker_queue_.enqueue(std::move(task)); + if (running_) { + tasks_available_.notify_one(); + } + + task_count_++; + + return enqueued; +} + +template<typename T> +void ThreadPool<T>::manageWorkers() { + for (int i = 0; i < max_worker_threads_; i++) { + std::stringstream thread_name; + thread_name << name_ << " #" << i; + auto worker_thread = std::make_shared<WorkerThread>(thread_name.str()); + worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread)); + thread_queue_.push_back(worker_thread); + current_workers_++; + } + + if (daemon_threads_) { + for (auto &thread : thread_queue_) { + thread->thread_.detach(); + } + } + + if (nullptr != thread_manager_) { + while (running_) { + auto waitperiod = std::chrono::milliseconds(500); + { + std::unique_lock<std::recursive_mutex> lock(manager_mutex_, std::try_to_lock); + if (!lock.owns_lock()) { + // Threadpool is being stopped/started or config is being changed, better wait a bit + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + if (thread_manager_->isAboveMax(current_workers_)) { + auto max = thread_manager_->getMaxConcurrentTasks(); + auto differential = current_workers_ - max; + thread_reduction_count_ += differential; + } else if (thread_manager_->shouldReduce()) { + if (current_workers_ > 1) + thread_reduction_count_++; + thread_manager_->reduce(); + } else if (thread_manager_->canIncrease() && max_worker_threads_ > current_workers_) { // increase slowly + std::unique_lock<std::mutex> lock(worker_queue_mutex_); + auto worker_thread = std::make_shared<WorkerThread>(); + worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread)); + if (daemon_threads_) { + worker_thread->thread_.detach(); + } + thread_queue_.push_back(worker_thread); + current_workers_++; + } + std::shared_ptr<WorkerThread> thread_ref; + while (deceased_thread_queue_.try_dequeue(thread_ref)) { + std::unique_lock<std::mutex> lock(worker_queue_mutex_); + if (thread_ref->thread_.joinable()) + thread_ref->thread_.join(); + thread_queue_.erase(std::remove(thread_queue_.begin(), thread_queue_.end(), thread_ref), thread_queue_.end()); + } + } + std::this_thread::sleep_for(waitperiod); + } + } else { + for (auto &thread : thread_queue_) { + if (thread->thread_.joinable()) + thread->thread_.join(); + } + } +} + +template<typename T> +void ThreadPool<T>::start() { + if (nullptr != controller_service_provider_) { + auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager"); + thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr; + } else { + thread_manager_ = nullptr; + } + + std::lock_guard<std::recursive_mutex> lock(manager_mutex_); + if (!running_) { + running_ = true; + manager_thread_ = std::move(std::thread(&ThreadPool::manageWorkers, this)); + if (worker_queue_.size_approx() > 0) { + tasks_available_.notify_all(); + } + + std::lock_guard<std::mutex> quee_lock(worker_queue_mutex_); + delayed_scheduler_thread_ = std::thread(&ThreadPool<T>::manage_delayed_queue, this); + } +} + +template<typename T> +void ThreadPool<T>::stopTasks(const std::string &identifier) { + std::unique_lock<std::mutex> lock(worker_queue_mutex_); + task_status_[identifier] = false; +} + +template<typename T> +void ThreadPool<T>::shutdown() { + if (running_.load()) { + std::lock_guard<std::recursive_mutex> lock(manager_mutex_); + running_.store(false); + + drain(); + + task_status_.clear(); + if (manager_thread_.joinable()) { + manager_thread_.join(); + } + + delayed_task_available_.notify_all(); + if (delayed_scheduler_thread_.joinable()) { + delayed_scheduler_thread_.join(); + } + + for (const auto &thread : thread_queue_) { + if (thread->thread_.joinable()) + thread->thread_.join(); + } + + thread_queue_.clear(); + current_workers_ = 0; + while (!delayed_worker_queue_.empty()) { + delayed_worker_queue_.pop(); + } + + while (worker_queue_.size_approx() > 0) { + Worker<T> task; + worker_queue_.try_dequeue(task); + } + } +} + +template class utils::ThreadPool<utils::TaskRescheduleInfo>; +template class utils::ThreadPool<int>; +template class utils::ThreadPool<bool>; +template class utils::ThreadPool<state::Update>; + +} /* namespace utils */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ diff --git a/libminifi/test/resources/TestUpdateAttribute.yml b/libminifi/test/resources/TestUpdateAttribute.yml index f741bfb..522484e 100644 --- a/libminifi/test/resources/TestUpdateAttribute.yml +++ b/libminifi/test/resources/TestUpdateAttribute.yml @@ -35,36 +35,30 @@ Processors: id: 2438e3c8-015a-1000-79ca-83af40ec1992 class: org.apache.nifi.processors.standard.UpdateAttribute max concurrent tasks: 1 - scheduling strategy: TIMER_DRIVEN - scheduling period: 1 sec + scheduling strategy: EVENT_DRIVEN penalization period: 30 sec yield period: 1 sec run duration nanos: 0 - auto-terminated relationships list: failure Properties: route_condition_attr: true - name: roa id: 2438e3c8-015a-1000-79ca-83af40ec1993 class: org.apache.nifi.processors.standard.RouteOnAttribute max concurrent tasks: 1 - scheduling strategy: TIMER_DRIVEN - scheduling period: 1 sec + scheduling strategy: EVENT_DRIVEN penalization period: 30 sec yield period: 1 sec run duration nanos: 0 - auto-terminated relationships list: failure Properties: route_matched: ${route_condition_attr} - name: up2 id: 2438e3c8-015a-1000-79ca-83af40ec1994 class: org.apache.nifi.processors.standard.UpdateAttribute max concurrent tasks: 1 - scheduling strategy: TIMER_DRIVEN - scheduling period: 1 sec + scheduling strategy: EVENT_DRIVEN penalization period: 30 sec yield period: 1 sec run duration nanos: 0 - auto-terminated relationships list: failure Properties: route_check_attr: good variable_attribute: ${nifi.variable.test} @@ -72,12 +66,12 @@ Processors: id: 2438e3c8-015a-1000-79ca-83af40ec1995 class: org.apache.nifi.processors.standard.LogAttribute max concurrent tasks: 1 - scheduling strategy: TIMER_DRIVEN - scheduling period: 1 sec + scheduling strategy: EVENT_DRIVEN penalization period: 30 sec yield period: 1 sec run duration nanos: 0 - auto-terminated relationships list: success + auto-terminated relationships list: + - success Properties: Connections: diff --git a/libminifi/test/unit/BackTraceTests.cpp b/libminifi/test/unit/BackTraceTests.cpp index 816ff63..806f95c 100644 --- a/libminifi/test/unit/BackTraceTests.cpp +++ b/libminifi/test/unit/BackTraceTests.cpp @@ -41,14 +41,14 @@ class WorkerNumberExecutions : public utils::AfterExecute<int> { ~WorkerNumberExecutions() { } - virtual bool isFinished(const int &result) { + bool isFinished(const int &result) override { if (result > 0 && ++runs < tasks) { return false; } else { return true; } } - virtual bool isCancelled(const int &result) { + bool isCancelled(const int &result) override { return false; } @@ -56,9 +56,9 @@ class WorkerNumberExecutions : public utils::AfterExecute<int> { return runs; } - virtual int64_t wait_time() { + std::chrono::milliseconds wait_time() override { // wait 50ms - return 50; + return std::chrono::milliseconds(50); } protected: diff --git a/libminifi/test/unit/ThreadPoolTests.cpp b/libminifi/test/unit/ThreadPoolTests.cpp index 6849aa6..48301b1 100644 --- a/libminifi/test/unit/ThreadPoolTests.cpp +++ b/libminifi/test/unit/ThreadPoolTests.cpp @@ -41,14 +41,14 @@ class WorkerNumberExecutions : public utils::AfterExecute<int> { ~WorkerNumberExecutions() { } - virtual bool isFinished(const int &result) { + bool isFinished(const int &result) override { if (result > 0 && ++runs < tasks) { return false; } else { return true; } } - virtual bool isCancelled(const int &result) { + bool isCancelled(const int &result) override { return false; } @@ -56,9 +56,9 @@ class WorkerNumberExecutions : public utils::AfterExecute<int> { return runs; } - virtual int64_t wait_time() { + std::chrono::milliseconds wait_time() override { // wait 50ms - return 50; + return std::chrono::milliseconds(50); } protected:
