This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit deab43e4419fa85a9d15ca0fbfdf23467142bf37
Author: Martin Zink <martinz...@apache.org>
AuthorDate: Thu May 25 12:12:58 2023 +0200

    MINIFICPP-2129 Refactor threadpool
    
    Signed-off-by: Gabor Gyimesi <gamezb...@gmail.com>
    
    This closes #1606
---
 extensions/rocksdb-repos/FlowFileLoader.cpp    |  6 +-
 extensions/rocksdb-repos/FlowFileLoader.h      |  2 +-
 libminifi/include/CronDrivenSchedulingAgent.h  |  2 +-
 libminifi/include/EventDrivenSchedulingAgent.h |  2 +-
 libminifi/include/FlowController.h             |  2 +-
 libminifi/include/SchedulingAgent.h            |  4 +-
 libminifi/include/ThreadedSchedulingAgent.h    |  2 +-
 libminifi/include/TimerDrivenSchedulingAgent.h |  2 +-
 libminifi/include/c2/C2Agent.h                 |  2 +-
 libminifi/include/core/Processor.h             |  1 +
 libminifi/include/utils/Monitors.h             | 82 ++++++---------------
 libminifi/include/utils/ThreadPool.h           | 98 +++++++-------------------
 libminifi/src/CronDrivenSchedulingAgent.cpp    |  3 +-
 libminifi/src/EventDrivenSchedulingAgent.cpp   |  2 +-
 libminifi/src/FlowController.cpp               |  2 +-
 libminifi/src/ThreadedSchedulingAgent.cpp      | 17 +----
 libminifi/src/TimerDrivenSchedulingAgent.cpp   |  2 +-
 libminifi/src/c2/C2Agent.cpp                   |  5 +-
 libminifi/src/utils/ThreadPool.cpp             | 80 ++++++++-------------
 libminifi/test/unit/BackTraceTests.cpp         | 97 +++++++++++--------------
 libminifi/test/unit/SchedulingAgentTests.cpp   | 57 ++++++++-------
 libminifi/test/unit/SocketTests.cpp            | 46 +++++-------
 libminifi/test/unit/ThreadPoolTests.cpp        | 64 ++++++-----------
 nanofi/include/cxx/Instance.h                  |  5 +-
 24 files changed, 210 insertions(+), 375 deletions(-)

diff --git a/extensions/rocksdb-repos/FlowFileLoader.cpp 
b/extensions/rocksdb-repos/FlowFileLoader.cpp
index 983cae449..133f7ecda 100644
--- a/extensions/rocksdb-repos/FlowFileLoader.cpp
+++ b/extensions/rocksdb-repos/FlowFileLoader.cpp
@@ -41,11 +41,11 @@ FlowFileLoader::~FlowFileLoader() {
 std::future<FlowFileLoader::FlowFilePtrVec> 
FlowFileLoader::load(std::vector<SwappedFlowFile> flow_files) {
   auto promise = std::make_shared<std::promise<FlowFilePtrVec>>();
   std::future<FlowFilePtrVec> future = promise->get_future();
-  utils::Worker<utils::TaskRescheduleInfo> task{[this, flow_files = 
std::move(flow_files), promise = std::move(promise)] {
+  utils::Worker task{[this, flow_files = std::move(flow_files), promise = 
std::move(promise)] {
       return loadImpl(flow_files, promise);
     },
-    "",  // doesn't matter that tasks alias by name, as we never actually 
query their status or stop a single task
-    std::make_unique<utils::ComplexMonitor>()};
+    ""};  // doesn't matter that tasks alias by name, as we never actually 
query their status or stop a single task
+
   // the dummy_future is for the return value of the Worker's lambda, 
rerunning this lambda
   // depends on run_determinant + result
   // we could create a custom run_determinant to instead determine if/when it 
should be rerun
diff --git a/extensions/rocksdb-repos/FlowFileLoader.h 
b/extensions/rocksdb-repos/FlowFileLoader.h
index 9c7ee6d51..2287c7ddf 100644
--- a/extensions/rocksdb-repos/FlowFileLoader.h
+++ b/extensions/rocksdb-repos/FlowFileLoader.h
@@ -52,7 +52,7 @@ class FlowFileLoader {
  private:
   utils::TaskRescheduleInfo loadImpl(const std::vector<SwappedFlowFile>& 
flow_files, const std::shared_ptr<std::promise<FlowFilePtrVec>>& output);
 
-  utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_{thread_count_, 
false, nullptr, "FlowFileLoaderThreadPool"};
+  utils::ThreadPool thread_pool_{thread_count_, nullptr, 
"FlowFileLoaderThreadPool"};
 
   gsl::not_null<minifi::internal::RocksDatabase*> db_;
 
diff --git a/libminifi/include/CronDrivenSchedulingAgent.h 
b/libminifi/include/CronDrivenSchedulingAgent.h
index 88ee42e6b..7d422e785 100644
--- a/libminifi/include/CronDrivenSchedulingAgent.h
+++ b/libminifi/include/CronDrivenSchedulingAgent.h
@@ -41,7 +41,7 @@ class CronDrivenSchedulingAgent : public 
ThreadedSchedulingAgent {
                             std::shared_ptr<core::Repository> flow_repo,
                             std::shared_ptr<core::ContentRepository> 
content_repo,
                             std::shared_ptr<Configure> configuration,
-                            utils::ThreadPool<utils::TaskRescheduleInfo>& 
thread_pool)
+                            utils::ThreadPool& thread_pool)
       : ThreadedSchedulingAgent(controller_service_provider, std::move(repo), 
std::move(flow_repo), std::move(content_repo), std::move(configuration), 
thread_pool) {
   }
 
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h 
b/libminifi/include/EventDrivenSchedulingAgent.h
index 07a173059..f8664cc25 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -37,7 +37,7 @@ class EventDrivenSchedulingAgent : public 
ThreadedSchedulingAgent {
  public:
   EventDrivenSchedulingAgent(const 
gsl::not_null<core::controller::ControllerServiceProvider*> 
controller_service_provider, std::shared_ptr<core::Repository> repo,
                              std::shared_ptr<core::Repository> flow_repo, 
std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<Configure> configuration,
-                             utils::ThreadPool<utils::TaskRescheduleInfo> 
&thread_pool)
+                             utils::ThreadPool &thread_pool)
       : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, 
content_repo, configuration, thread_pool) {
     using namespace std::literals::chrono_literals;
 
diff --git a/libminifi/include/FlowController.h 
b/libminifi/include/FlowController.h
index 1d3cbf9e5..84724920f 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -200,7 +200,7 @@ class FlowController : public 
core::controller::ForwardingControllerServiceProvi
   std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<FlowController>::getLogger();
 
   // Thread pool for schedulers
-  utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_;
+  utils::ThreadPool thread_pool_;
 };
 
 }  // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/SchedulingAgent.h 
b/libminifi/include/SchedulingAgent.h
index fec5e9598..4911f5e90 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -53,7 +53,7 @@ namespace org::apache::nifi::minifi {
 class SchedulingAgent {
  public:
   SchedulingAgent(const 
gsl::not_null<core::controller::ControllerServiceProvider*> 
controller_service_provider, std::shared_ptr<core::Repository> repo, 
std::shared_ptr<core::Repository> flow_repo,
-                  std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<Configure> configuration, 
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
+                  std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<Configure> configuration, utils::ThreadPool& thread_pool)
       : admin_yield_duration_(),
         bored_yield_duration_(0),
         configure_(configuration),
@@ -122,7 +122,7 @@ class SchedulingAgent {
   std::shared_ptr<core::Repository> flow_repo_;
 
   std::shared_ptr<core::ContentRepository> content_repo_;
-  utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool_;
+  utils::ThreadPool& thread_pool_;
   gsl::not_null<core::controller::ControllerServiceProvider*> 
controller_service_provider_;
 
  private:
diff --git a/libminifi/include/ThreadedSchedulingAgent.h 
b/libminifi/include/ThreadedSchedulingAgent.h
index e51279154..14a703948 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -40,7 +40,7 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
  public:
   ThreadedSchedulingAgent(const 
gsl::not_null<core::controller::ControllerServiceProvider*> 
controller_service_provider, std::shared_ptr<core::Repository> repo,
         std::shared_ptr<core::Repository> flow_repo, 
std::shared_ptr<core::ContentRepository> content_repo,
-        std::shared_ptr<Configure> configuration,  
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
+        std::shared_ptr<Configure> configuration,  utils::ThreadPool 
&thread_pool)
       : SchedulingAgent(controller_service_provider, repo, flow_repo, 
content_repo, configuration, thread_pool) {
   }
   ~ThreadedSchedulingAgent() override = default;
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h 
b/libminifi/include/TimerDrivenSchedulingAgent.h
index 9e74fec2d..b91522f96 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -32,7 +32,7 @@ class TimerDrivenSchedulingAgent : public 
ThreadedSchedulingAgent {
  public:
   TimerDrivenSchedulingAgent(const 
gsl::not_null<core::controller::ControllerServiceProvider*> 
controller_service_provider, std::shared_ptr<core::Repository> repo,
                              std::shared_ptr<core::Repository> flow_repo, 
std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<Configure> configure,
-                             utils::ThreadPool<utils::TaskRescheduleInfo> 
&thread_pool)
+                             utils::ThreadPool &thread_pool)
       : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, 
content_repo, configure, thread_pool) {
   }
 
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index 1c2e27b51..92d86afb3 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -227,7 +227,7 @@ class C2Agent : public state::UpdateController {
 
   std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<C2Agent>::getLogger();
 
-  utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_;
+  utils::ThreadPool thread_pool_;
 
   std::vector<utils::Identifier> task_ids_;
 
diff --git a/libminifi/include/core/Processor.h 
b/libminifi/include/core/Processor.h
index f2266ea4b..ee3141423 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -176,6 +176,7 @@ class Processor : public Connectable, public 
ConfigurableComponent, public state
 
   void clearYield();
 
+  std::chrono::steady_clock::time_point getYieldExpirationTime() const { 
return yield_expiration_; }
   std::chrono::steady_clock::duration getYieldTime() const;
   // Whether flow file queue full in any of the outgoing connection
   bool flowFilesOutGoingFull() const;
diff --git a/libminifi/include/utils/Monitors.h 
b/libminifi/include/utils/Monitors.h
index aed00ce2d..10e7411c4 100644
--- a/libminifi/include/utils/Monitors.h
+++ b/libminifi/include/utils/Monitors.h
@@ -23,90 +23,50 @@
 #if defined(WIN32)
 #include <future>  // This is required to work around a VS2017 bug, see the 
details below
 #endif
-#include "utils/gsl.h"
 
 namespace org::apache::nifi::minifi::utils {
 
-/**
- * Worker task helper that determines
- * whether or not we will run
- */
-template<typename T>
-class AfterExecute {
+class TaskRescheduleInfo {
  public:
-  virtual ~AfterExecute() = default;
-
-  AfterExecute() = default;
-  AfterExecute(AfterExecute&& /*other*/)  noexcept = default;
-  virtual bool isFinished(const T &result) = 0;
-  virtual bool isCancelled(const T &result) = 0;
-  /**
-   * Time to wait before re-running this task if necessary
-   * @return milliseconds since epoch after which we are eligible to re-run 
this task.
-   */
-  virtual std::chrono::steady_clock::duration wait_time() = 0;
-};
-
-/**
- * Uses the wait time for a given worker to determine if it is eligible to run
- */
+  TaskRescheduleInfo(bool result, std::chrono::steady_clock::time_point 
next_execution_time)
+    : next_execution_time_(next_execution_time), finished_(result) {}
 
-struct TaskRescheduleInfo {
-  TaskRescheduleInfo(bool result, std::chrono::steady_clock::duration 
wait_time)
-    : wait_time_(wait_time), finished_(result) {
-    gsl_Expects(wait_time >= std::chrono::milliseconds(0));
+  static TaskRescheduleInfo Done() {
+    return {true, std::chrono::steady_clock::time_point::min()};
   }
 
-  std::chrono::steady_clock::duration wait_time_;
-  bool finished_;
+  static TaskRescheduleInfo RetryAfter(std::chrono::steady_clock::time_point 
next_execution_time) {
+    return {false, next_execution_time};
+  }
 
-  static TaskRescheduleInfo Done() {
-    return {true, std::chrono::steady_clock::duration(0)};
+  static TaskRescheduleInfo RetryIn(std::chrono::steady_clock::duration 
duration) {
+    return {false, std::chrono::steady_clock::now()+duration};
   }
 
-  static TaskRescheduleInfo RetryIn(std::chrono::steady_clock::duration 
interval) {
-    return {false, interval};
+  static TaskRescheduleInfo RetryImmediately() {
+    return {false, std::chrono::steady_clock::time_point::min()};
   }
 
-  static TaskRescheduleInfo RetryAfter(std::chrono::steady_clock::time_point 
time_point) {
-    auto interval = std::max(time_point - std::chrono::steady_clock::now(), 
std::chrono::steady_clock::duration(0));
-    return {false, interval};
+  std::chrono::steady_clock::time_point getNextExecutionTime() const {
+    return next_execution_time_;
   }
 
-  static TaskRescheduleInfo RetryImmediately() {
-    return {false, std::chrono::steady_clock::duration(0)};
+  bool isFinished() const {
+    return finished_;
   }
 
+ private:
+  std::chrono::steady_clock::time_point next_execution_time_;
+  bool finished_;
+
 #if defined(WIN32)
 // 
https://developercommunity.visualstudio.com/content/problem/60897/c-shared-state-futuresstate-default-constructs-the.html
 // Because of this bug we need to have this object default constructible, 
which makes no sense otherwise. Hack.
  private:
-  TaskRescheduleInfo() : wait_time_(std::chrono::steady_clock::duration(0)), 
finished_(true) {}
+  TaskRescheduleInfo() : 
next_execution_time_(std::chrono::steady_clock::time_point::min()), 
finished_(true) {}
   friend class std::_Associated_state<TaskRescheduleInfo>;
 #endif
 };
 
-class ComplexMonitor : public utils::AfterExecute<TaskRescheduleInfo> {
- public:
-  ComplexMonitor() = default;
-
-  bool isFinished(const TaskRescheduleInfo &result) override {
-    if (result.finished_) {
-      return true;
-    }
-    current_wait_.store(result.wait_time_);
-    return false;
-  }
-  bool isCancelled(const TaskRescheduleInfo& /*result*/) override {
-    return false;
-  }
-
-  std::chrono::steady_clock::duration wait_time() override {
-    return current_wait_.load();
-  }
-
- private:
-  std::atomic<std::chrono::steady_clock::duration> current_wait_ 
{std::chrono::steady_clock::duration(0)};
-};
 
 }  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/ThreadPool.h 
b/libminifi/include/utils/ThreadPool.h
index dde867909..cafcb2b94 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -50,27 +50,17 @@ using TaskId = std::string;
  * purpose: Provides a wrapper for the functor
  * and returns a future based on the template argument.
  */
-template<typename T>
 class Worker {
  public:
-  explicit Worker(const std::function<T()> &task, const TaskId &identifier, 
std::unique_ptr<AfterExecute<T>> run_determinant)
-      : identifier_(identifier),
+  explicit Worker(const std::function<TaskRescheduleInfo()> &task, TaskId 
identifier)
+      : identifier_(std::move(identifier)),
         next_exec_time_(std::chrono::steady_clock::now()),
-        task(task),
-        run_determinant_(std::move(run_determinant)) {
-    promise = std::make_shared<std::promise<T>>();
+        task(task) {
+    promise = std::make_shared<std::promise<TaskRescheduleInfo>>();
   }
 
-  explicit Worker(const std::function<T()> &task, const TaskId &identifier)
-      : identifier_(identifier),
-        next_exec_time_(std::chrono::steady_clock::now()),
-        task(task),
-        run_determinant_(nullptr) {
-    promise = std::make_shared<std::promise<T>>();
-  }
-
-  explicit Worker(const TaskId& identifier = {})
-      : identifier_(identifier),
+  explicit Worker(TaskId  identifier = {})
+      : identifier_(std::move(identifier)),
         next_exec_time_(std::chrono::steady_clock::now()) {
   }
 
@@ -89,50 +79,40 @@ class Worker {
    *   true == run again
    */
   virtual bool run() {
-    T result = task();
-    if (run_determinant_ == nullptr || (run_determinant_->isFinished(result) 
|| run_determinant_->isCancelled(result))) {
+    TaskRescheduleInfo result = task();
+    if (result.isFinished()) {
       promise->set_value(result);
       return false;
     }
-    next_exec_time_ = std::max(next_exec_time_, 
std::chrono::steady_clock::now() + run_determinant_->wait_time());
-    return true;
-  }
 
-  virtual void setIdentifier(const TaskId& identifier) {
-    identifier_ = identifier;
+    next_exec_time_ = result.getNextExecutionTime();
+    return true;
   }
 
-  virtual std::chrono::steady_clock::time_point getNextExecutionTime() const {
+  [[nodiscard]] virtual std::chrono::steady_clock::time_point 
getNextExecutionTime() const {
     return next_exec_time_;
   }
 
-  std::shared_ptr<std::promise<T>> getPromise() const;
+  [[nodiscard]] std::shared_ptr<std::promise<TaskRescheduleInfo>> getPromise() 
const { return promise; }
 
-  const TaskId &getIdentifier() const {
+  [[nodiscard]] const TaskId &getIdentifier() const {
     return identifier_;
   }
 
  protected:
   TaskId identifier_;
   std::chrono::steady_clock::time_point next_exec_time_;
-  std::function<T()> task;
-  std::unique_ptr<AfterExecute<T>> run_determinant_;
-  std::shared_ptr<std::promise<T>> promise;
+  std::function<TaskRescheduleInfo()> task;
+  std::shared_ptr<std::promise<TaskRescheduleInfo>> promise;
 };
 
-template<typename T>
 class DelayedTaskComparator {
  public:
-  bool operator()(Worker<T> &a, Worker<T> &b) {
+  bool operator()(Worker &a, Worker &b) {
     return a.getNextExecutionTime() > b.getNextExecutionTime();
   }
 };
 
-template<typename T>
-std::shared_ptr<std::promise<T>> Worker<T>::getPromise() const {
-  return promise;
-}
-
 class WorkerThread {
  public:
   explicit WorkerThread(std::thread thread, const std::string &name = 
"NamelessWorker")
@@ -155,16 +135,15 @@ class WorkerThread {
  * ThreadPoolExecutor
  * Design: Locked control over a manager thread that controls the worker 
threads
  */
-template<typename T>
 class ThreadPool {
  public:
-  ThreadPool(int max_worker_threads = 2, bool daemon_threads = false,
+  ThreadPool(int max_worker_threads = 2,
              core::controller::ControllerServiceProvider* 
controller_service_provider = nullptr, std::string name = "NamelessPool");
 
-  ThreadPool(const ThreadPool<T> &other) = delete;
-  ThreadPool<T>& operator=(const ThreadPool<T> &other) = delete;
-  ThreadPool(ThreadPool<T> &&other) = delete;
-  ThreadPool<T>& operator=(ThreadPool<T> &&other) = delete;
+  ThreadPool(const ThreadPool &other) = delete;
+  ThreadPool& operator=(const ThreadPool &other) = delete;
+  ThreadPool(ThreadPool &&other) = delete;
+  ThreadPool& operator=(ThreadPool &&other) = delete;
 
   ~ThreadPool() {
     shutdown();
@@ -177,7 +156,7 @@ class ThreadPool {
    * the worker task
    * @param future future to move new promise to
    */
-  void execute(Worker<T> &&task, std::future<T> &future);
+  void execute(Worker &&task, std::future<TaskRescheduleInfo> &future);
 
   /**
    * attempts to stop tasks with the provided identifier.
@@ -286,59 +265,32 @@ class ThreadPool {
     }
   }
 
-// determines if threads are detached
-  bool daemon_threads_;
   std::atomic<int> thread_reduction_count_;
-// max worker threads
   int max_worker_threads_;
-// current worker tasks.
   std::atomic<int> current_workers_;
-// thread queue
   std::vector<std::shared_ptr<WorkerThread>> thread_queue_;
-// manager thread
   std::thread manager_thread_;
-// the thread responsible for putting delayed tasks to the worker queue when 
they had to be put
   std::thread delayed_scheduler_thread_;
-// conditional that's used to adjust the threads
   std::atomic<bool> adjust_threads_;
-// atomic running boolean
   std::atomic<bool> running_;
-// controller service provider
   core::controller::ControllerServiceProvider* controller_service_provider_;
-// integrated power manager
   std::shared_ptr<controllers::ThreadManagementService> thread_manager_;
-  // thread queue for the recently deceased threads.
   ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
-// worker queue of worker objects
-  ConditionConcurrentQueue<Worker<T>> worker_queue_;
-  std::priority_queue<Worker<T>, std::vector<Worker<T>>, 
DelayedTaskComparator<T>> delayed_worker_queue_;
-// mutex to  protect task status and delayed queue
+  ConditionConcurrentQueue<Worker> worker_queue_;
+  std::priority_queue<Worker, std::vector<Worker>, DelayedTaskComparator> 
delayed_worker_queue_;
   std::mutex worker_queue_mutex_;
-// notification for new delayed tasks that's before the current ones
   std::condition_variable delayed_task_available_;
-// map to identify if a task should be
   std::map<TaskId, bool> task_status_;
-// manager mutex
   std::recursive_mutex manager_mutex_;
-  // thread pool name
   std::string name_;
-  // count of running tasks by ID
   std::unordered_map<TaskId, uint32_t> running_task_count_by_id_;
-  // variable to signal task running completion
   std::condition_variable task_run_complete_;
 
   std::shared_ptr<core::logging::Logger> logger_;
 
-  /**
-   * Call for the manager to start worker threads
-   */
-  void manageWorkers();
 
-  /**
-   * Runs worker tasks
-   */
+  void manageWorkers();
   void run_tasks(const std::shared_ptr<WorkerThread>& thread);
-
   void manage_delayed_queue();
 };
 
diff --git a/libminifi/src/CronDrivenSchedulingAgent.cpp 
b/libminifi/src/CronDrivenSchedulingAgent.cpp
index fdc885747..905c309ed 100644
--- a/libminifi/src/CronDrivenSchedulingAgent.cpp
+++ b/libminifi/src/CronDrivenSchedulingAgent.cpp
@@ -32,7 +32,6 @@ utils::TaskRescheduleInfo 
CronDrivenSchedulingAgent::run(core::Processor* proces
   using namespace std::literals::chrono_literals;
   using std::chrono::ceil;
   using std::chrono::seconds;
-  using std::chrono::milliseconds;
   using std::chrono::time_point_cast;
   using std::chrono::system_clock;
 
@@ -58,7 +57,7 @@ utils::TaskRescheduleInfo 
CronDrivenSchedulingAgent::run(core::Processor* proces
       last_exec_[uuid] = current_time.get_local_time();
 
     if (processor->isYield())
-      return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
+      return 
utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
 
     if (auto next_trigger = 
schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time()))
       return 
utils::TaskRescheduleInfo::RetryIn(*next_trigger-current_time.get_local_time());
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp 
b/libminifi/src/EventDrivenSchedulingAgent.cpp
index b50d2d6f3..38073fa12 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -43,7 +43,7 @@ utils::TaskRescheduleInfo 
EventDrivenSchedulingAgent::run(core::Processor* proce
     while (processor->isRunning() && (std::chrono::steady_clock::now() - 
start_time < time_slice_)) {
       this->onTrigger(processor, processContext, sessionFactory);
       if (processor->isYield()) {
-        return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
+        return 
utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
       }
     }
     return utils::TaskRescheduleInfo::RetryImmediately();  // Let's continue 
work as soon as a thread is available
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 5f26a9562..451ac2d18 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -54,7 +54,7 @@ 
FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
     : 
core::controller::ForwardingControllerServiceProvider(core::className<FlowController>()),
       running_(false),
       initialized_(false),
-      thread_pool_(5, false, nullptr, "Flowcontroller threadpool"),
+      thread_pool_(5, nullptr, "Flowcontroller threadpool"),
       configuration_(std::move(configure)),
       provenance_repo_(std::move(provenance_repo)),
       flow_file_repo_(std::move(flow_file_repo)),
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp 
b/libminifi/src/ThreadedSchedulingAgent.cpp
index fe6cf46ad..7f4810162 100644
--- a/libminifi/src/ThreadedSchedulingAgent.cpp
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -34,17 +34,13 @@
 #include "core/ProcessorNode.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessContextBuilder.h"
-#include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
 #include "utils/ValueParser.h"
 
 using namespace std::literals::chrono_literals;
 
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 void ThreadedSchedulingAgent::schedule(core::Processor* processor) {
   std::lock_guard<std::mutex> lock(mutex_);
@@ -103,12 +99,8 @@ void ThreadedSchedulingAgent::schedule(core::Processor* 
processor) {
       return agent->run(processor, processContext, sessionFactory);
     };
 
-    // create a functor that will be submitted to the thread pool.
-    utils::Worker<utils::TaskRescheduleInfo> functor(f_ex, 
processor->getUUIDStr(), std::make_unique<utils::ComplexMonitor>());
-    // move the functor into the thread pool. While a future is returned
-    // we aren't terribly concerned with the result.
     std::future<utils::TaskRescheduleInfo> future;
-    thread_pool_.execute(std::move(functor), future);
+    thread_pool_.execute(utils::Worker{f_ex, processor->getUUIDStr()}, future);
   }
   logger_->log_debug("Scheduled thread %d concurrent workers for for process 
%s", processor->getMaxConcurrentTasks(), processor->getName());
   processors_running_.insert(processor->getUUID());
@@ -141,7 +133,4 @@ void ThreadedSchedulingAgent::unschedule(core::Processor* 
processor) {
   processors_running_.erase(processor->getUUID());
 }
 
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp 
b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index 113cee8f8..cbe5c2306 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -31,7 +31,7 @@ utils::TaskRescheduleInfo 
TimerDrivenSchedulingAgent::run(core::Processor* proce
     auto trigger_start_time = std::chrono::steady_clock::now();
     this->onTrigger(processor, processContext, sessionFactory);
     if (processor->isYield())
-      return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
+      return 
utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
 
     return utils::TaskRescheduleInfo::RetryAfter(trigger_start_time + 
processor->getSchedulingPeriod());
   }
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index b3b6fdef0..9891fb9de 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -58,7 +58,7 @@ C2Agent::C2Agent(std::shared_ptr<Configure> configuration,
       configuration_(std::move(configuration)),
       node_reporter_(std::move(node_reporter)),
       filesystem_(std::move(filesystem)),
-      thread_pool_(2, false, nullptr, "C2 threadpool"),
+      thread_pool_(2, nullptr, "C2 threadpool"),
       request_restart_(std::move(request_restart)),
       last_run_(std::chrono::steady_clock::now()) {
   if (!configuration_->getAgentClass()) {
@@ -102,9 +102,8 @@ void C2Agent::start() {
   for (const auto& function : functions_) {
     utils::Identifier uuid = utils::IdGenerator::getIdGenerator()->generate();
     task_ids_.push_back(uuid);
-    utils::Worker<utils::TaskRescheduleInfo> functor(function, 
uuid.to_string(), std::make_unique<utils::ComplexMonitor>());
     std::future<utils::TaskRescheduleInfo> future;
-    thread_pool_.execute(std::move(functor), future);
+    thread_pool_.execute(utils::Worker{function, uuid.to_string()}, future);
   }
   controller_running_ = true;
   thread_pool_.start();
diff --git a/libminifi/src/utils/ThreadPool.cpp 
b/libminifi/src/utils/ThreadPool.cpp
index 06c2eb4e1..b6343aaa9 100644
--- a/libminifi/src/utils/ThreadPool.cpp
+++ b/libminifi/src/utils/ThreadPool.cpp
@@ -16,28 +16,24 @@
  */
 
 #include "utils/ThreadPool.h"
-#include "core/state/UpdateController.h"
 
 using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::utils {
 
-template<typename T>
-ThreadPool<T>::ThreadPool(int max_worker_threads, bool daemon_threads, 
core::controller::ControllerServiceProvider* controller_service_provider, 
std::string name)
-    : daemon_threads_(daemon_threads),
-      thread_reduction_count_(0),
+ThreadPool::ThreadPool(int max_worker_threads, 
core::controller::ControllerServiceProvider* controller_service_provider, 
std::string name)
+    : thread_reduction_count_(0),
       max_worker_threads_(max_worker_threads),
       adjust_threads_(false),
       running_(false),
       controller_service_provider_(controller_service_provider),
       name_(std::move(name)),
-      logger_(core::logging::LoggerFactory<ThreadPool<T>>::getLogger()) {
+      logger_(core::logging::LoggerFactory<ThreadPool>::getLogger()) {
   current_workers_ = 0;
   thread_manager_ = nullptr;
 }
 
-template<typename T>
-void ThreadPool<T>::run_tasks(const std::shared_ptr<WorkerThread>& thread) {
+void ThreadPool::run_tasks(const std::shared_ptr<WorkerThread>& thread) {
   thread->is_running_ = true;
   while (running_.load()) {
     if (UNLIKELY(thread_reduction_count_ > 0)) {
@@ -50,7 +46,7 @@ void ThreadPool<T>::run_tasks(const 
std::shared_ptr<WorkerThread>& thread) {
       }
     }
 
-    Worker<T> task;
+    Worker task;
     if (worker_queue_.dequeueWait(task)) {
       {
         std::unique_lock<std::mutex> lock(worker_queue_mutex_);
@@ -101,15 +97,14 @@ void ThreadPool<T>::run_tasks(const 
std::shared_ptr<WorkerThread>& thread) {
   current_workers_--;
 }
 
-template<typename T>
-void ThreadPool<T>::manage_delayed_queue() {
+void ThreadPool::manage_delayed_queue() {
   while (running_) {
     std::unique_lock<std::mutex> lock(worker_queue_mutex_);
 
     // Put the tasks ready to run in the worker queue
     while (!delayed_worker_queue_.empty() && 
delayed_worker_queue_.top().getNextExecutionTime() <= 
std::chrono::steady_clock::now()) {
       // I'm very sorry for this - committee must has been seriously drunk 
when the interface of prio queue was submitted.
-      Worker<T> task = 
std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
+      Worker task = 
std::move(const_cast<Worker&>(delayed_worker_queue_.top()));
       delayed_worker_queue_.pop();
       worker_queue_.enqueue(std::move(task));
     }
@@ -122,30 +117,25 @@ void ThreadPool<T>::manage_delayed_queue() {
   }
 }
 
-template<typename T>
-void ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
+void ThreadPool::execute(Worker &&task, std::future<utils::TaskRescheduleInfo> 
&future) {
   {
     std::unique_lock<std::mutex> lock(worker_queue_mutex_);
     task_status_[task.getIdentifier()] = true;
   }
-  future = std::move(task.getPromise()->get_future());
+  future = task.getPromise()->get_future();
   worker_queue_.enqueue(std::move(task));
 }
 
-template<typename T>
-void ThreadPool<T>::manageWorkers() {
-  for (int i = 0; i < max_worker_threads_; i++) {
-    std::stringstream thread_name;
-    thread_name << name_ << " #" << i;
-    auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
-    worker_thread->thread_ = createThread([this, worker_thread] { 
run_tasks(worker_thread); });
-    thread_queue_.push_back(worker_thread);
-    current_workers_++;
-  }
-
-  if (daemon_threads_) {
-    for (auto &thread : thread_queue_) {
-      thread->thread_.detach();
+void ThreadPool::manageWorkers() {
+  {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+    for (int i = 0; i < max_worker_threads_; i++) {
+      std::stringstream thread_name;
+      thread_name << name_ << " #" << i;
+      auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
+      worker_thread->thread_ = createThread([this, worker_thread] { 
run_tasks(worker_thread); });
+      thread_queue_.push_back(worker_thread);
+      current_workers_++;
     }
   }
 
@@ -171,9 +161,6 @@ void ThreadPool<T>::manageWorkers() {
           std::unique_lock<std::mutex> worker_queue_lock(worker_queue_mutex_);
           auto worker_thread = std::make_shared<WorkerThread>();
           worker_thread->thread_ = createThread([this, worker_thread] { 
run_tasks(worker_thread); });
-          if (daemon_threads_) {
-            worker_thread->thread_.detach();
-          }
           thread_queue_.push_back(worker_thread);
           current_workers_++;
         }
@@ -195,8 +182,7 @@ void ThreadPool<T>::manageWorkers() {
   }
 }
 
-template<typename T>
-std::shared_ptr<controllers::ThreadManagementService> 
ThreadPool<T>::createThreadManager() const {
+std::shared_ptr<controllers::ThreadManagementService> 
ThreadPool::createThreadManager() const {
   if (!controller_service_provider_) {
     return nullptr;
   }
@@ -213,8 +199,7 @@ std::shared_ptr<controllers::ThreadManagementService> 
ThreadPool<T>::createThrea
   return thread_manager_service;
 }
 
-template<typename T>
-void ThreadPool<T>::start() {
+void ThreadPool::start() {
   std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
   if (!running_) {
     thread_manager_ = createThreadManager();
@@ -224,22 +209,21 @@ void ThreadPool<T>::start() {
     manager_thread_ = std::thread(&ThreadPool::manageWorkers, this);
 
     std::lock_guard<std::mutex> quee_lock(worker_queue_mutex_);
-    delayed_scheduler_thread_ = 
std::thread(&ThreadPool<T>::manage_delayed_queue, this);
+    delayed_scheduler_thread_ = std::thread(&ThreadPool::manage_delayed_queue, 
this);
   }
 }
 
-template<typename T>
-void ThreadPool<T>::stopTasks(const TaskId &identifier) {
+void ThreadPool::stopTasks(const TaskId &identifier) {
   std::unique_lock<std::mutex> lock(worker_queue_mutex_);
   task_status_[identifier] = false;
 
   // remove tasks belonging to identifier from worker_queue_
-  worker_queue_.remove([&] (const Worker<T>& worker) { return 
worker.getIdentifier() == identifier; });
+  worker_queue_.remove([&] (const Worker& worker) { return 
worker.getIdentifier() == identifier; });
 
   // also remove from delayed_worker_queue_
   decltype(delayed_worker_queue_) new_delayed_worker_queue;
   while (!delayed_worker_queue_.empty()) {
-    Worker<T> task = 
std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
+    Worker task = std::move(const_cast<Worker&>(delayed_worker_queue_.top()));
     delayed_worker_queue_.pop();
     if (task.getIdentifier() != identifier) {
       new_delayed_worker_queue.push(std::move(task));
@@ -254,22 +238,19 @@ void ThreadPool<T>::stopTasks(const TaskId &identifier) {
   });
 }
 
-template<typename T>
-void ThreadPool<T>::resume() {
+void ThreadPool::resume() {
   if (!worker_queue_.isRunning()) {
     worker_queue_.start();
   }
 }
 
-template<typename T>
-void ThreadPool<T>::pause() {
+void ThreadPool::pause() {
   if (worker_queue_.isRunning()) {
     worker_queue_.stop();
   }
 }
 
-template<typename T>
-void ThreadPool<T>::shutdown() {
+void ThreadPool::shutdown() {
   if (running_.load()) {
     std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
     running_.store(false);
@@ -307,9 +288,4 @@ void ThreadPool<T>::shutdown() {
   }
 }
 
-template class utils::ThreadPool<utils::TaskRescheduleInfo>;
-template class utils::ThreadPool<int>;
-template class utils::ThreadPool<bool>;
-template class utils::ThreadPool<state::Update>;
-
 }  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/test/unit/BackTraceTests.cpp 
b/libminifi/test/unit/BackTraceTests.cpp
index 46363638d..87840b13e 100644
--- a/libminifi/test/unit/BackTraceTests.cpp
+++ b/libminifi/test/unit/BackTraceTests.cpp
@@ -24,76 +24,61 @@
 #include "utils/BackTrace.h"
 #include "utils/Monitors.h"
 #include "utils/ThreadPool.h"
+#include "range/v3/algorithm/any_of.hpp"
 
 using namespace std::literals::chrono_literals;
 
-class WorkerNumberExecutions : public utils::AfterExecute<int> {
- public:
-  explicit WorkerNumberExecutions(int tasks)
-      : tasks(tasks) {
-  }
-
-  WorkerNumberExecutions(WorkerNumberExecutions && other) noexcept
-      : runs(other.runs),
-      tasks(other.tasks) {
-  }
-
-  bool isFinished(const int &result) override {
-    return result <= 0 || ++runs >= tasks;
-  }
-  bool isCancelled(const int& /*result*/) override {
-    return false;
-  }
-
-  std::chrono::steady_clock::duration wait_time() override {
-    return 50ms;
-  }
-
- protected:
-  int runs{0};
-  int tasks;
-};
-
 TEST_CASE("BT1", "[TPT1]") {
   const BackTrace trace = TraceResolver::getResolver().getBackTrace("BT1");
 #ifdef HAS_EXECINFO
-  REQUIRE(!trace.getTraces().empty());
+  CHECK(!trace.getTraces().empty());
 #endif
 }
 
-TEST_CASE("BT2", "[TPT2]") {
-  std::atomic<int> counter = 0;
-  utils::ThreadPool<int> pool(4);
-  pool.start();
-  std::this_thread::sleep_for(std::chrono::milliseconds(150));
-  for (int i = 0; i < 3; i++) {
-    std::unique_ptr<utils::AfterExecute<int>> after_execute = 
std::unique_ptr<utils::AfterExecute<int>>(new WorkerNumberExecutions(5));
-    utils::Worker<int> functor([&counter]() { return ++counter; }, "id", 
std::move(after_execute));
+void inner_function(std::atomic_flag& ready_to_check, std::atomic_flag& 
checking_done) {
+  ready_to_check.test_and_set();
+  ready_to_check.notify_all();
+  checking_done.wait(false);
+}
 
-    std::future<int> fut;
-    pool.execute(std::move(functor), fut);  // NOLINT(bugprone-use-after-move)
-  }
+void outer_function(std::atomic_flag& ready_to_check, std::atomic_flag& 
checking_done) {
+  inner_function(ready_to_check, checking_done);
+}
 
-  std::unique_ptr<utils::AfterExecute<int>> after_execute = 
std::unique_ptr<utils::AfterExecute<int>>(new WorkerNumberExecutions(5));
-  utils::Worker<int> functor([&counter]() { return ++counter; }, "id", 
std::move(after_execute));
 
-  std::future<int> fut;
-  pool.execute(std::move(functor), fut);  // NOLINT(bugprone-use-after-move)
+TEST_CASE("BT2", "[TPT2]") {
+  std::atomic_flag ready_for_checking;
+  std::atomic_flag done_with_checking;
 
-  std::vector<BackTrace> traces = pool.getTraces();
-  for (const auto &trace : traces) {
-    std::cerr << "Thread name: " << trace.getName() << std::endl;
-    const auto &trace_strings = trace.getTraces();
-#ifdef HAS_EXECINFO
-    REQUIRE(trace_strings.size() > 2);
-    for (const auto& trace_string : trace_strings) {
-      std::cerr << " - " << trace_string << std::endl;
-    }
-    if (trace_strings.at(0).find("sleep_for") != std::string::npos) {
-      REQUIRE(trace_strings.at(1).find("counterFunction") != 
std::string::npos);
+  constexpr std::string_view thread_pool_name = "Winnie the pool";
+  constexpr size_t number_of_worker_threads = 3;
+  utils::ThreadPool pool(number_of_worker_threads, nullptr, 
thread_pool_name.data());
+  utils::Worker worker([&]() -> utils::TaskRescheduleInfo {
+    outer_function(ready_for_checking, done_with_checking);
+    return utils::TaskRescheduleInfo::Done();
+  }, "id");
+  std::future<utils::TaskRescheduleInfo> future;
+  pool.execute(std::move(worker), future);
+
+  pool.start();
+  {
+    ready_for_checking.wait(false);
+    std::vector<BackTrace> traces = pool.getTraces();
+    CHECK(traces.size() <= number_of_worker_threads);
+    REQUIRE(!traces.empty());
+    for (const auto &trace : traces) {
+      CHECK(trace.getName().starts_with(thread_pool_name));
     }
+#ifdef HAS_EXECINFO
+    auto first_worker_trace = traces.front();
+    CHECK(ranges::any_of(first_worker_trace.getTraces(), [](const std::string& 
trace_line) { return trace_line.find("run_tasks") != trace_line.npos; }));
+#ifdef DEBUG
+    CHECK(ranges::any_of(first_worker_trace.getTraces(), [](const std::string& 
trace_line) { return trace_line.find("outer_function") != trace_line.npos; }));
+    CHECK(ranges::any_of(first_worker_trace.getTraces(), [](const std::string& 
trace_line) { return trace_line.find("inner_function") != trace_line.npos; }));
 #endif
+#endif
+    done_with_checking.test_and_set();
+    done_with_checking.notify_all();
   }
-  fut.wait();
+  future.wait();
 }
-
diff --git a/libminifi/test/unit/SchedulingAgentTests.cpp 
b/libminifi/test/unit/SchedulingAgentTests.cpp
index c3cf6638a..b3ffed29d 100644
--- a/libminifi/test/unit/SchedulingAgentTests.cpp
+++ b/libminifi/test/unit/SchedulingAgentTests.cpp
@@ -63,7 +63,7 @@ TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") {
   auto controller_services_ = 
std::make_shared<minifi::core::controller::ControllerServiceMap>();
   auto configuration = std::make_shared<minifi::Configure>();
   auto controller_services_provider_ = 
std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_,
 configuration);
-  utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool;
+  utils::ThreadPool thread_pool;
   auto count_proc = std::make_shared<CountOnTriggersProcessor>("count_proc");
   count_proc->incrementActiveTasks();
   count_proc->setScheduledState(core::RUNNING);
@@ -79,21 +79,21 @@ TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") {
     auto timer_driven_agent = 
std::make_shared<TimerDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
 test_repo, test_repo, content_repo, configuration, thread_pool);
     timer_driven_agent->start();
     auto first_task_reschedule_info = 
timer_driven_agent->run(count_proc.get(), context, factory);
-    CHECK(!first_task_reschedule_info.finished_);
-    CHECK(first_task_reschedule_info.wait_time_ <= 125ms);
+    CHECK(!first_task_reschedule_info.isFinished());
+    CHECK(first_task_reschedule_info.getNextExecutionTime() <= 
std::chrono::steady_clock::now() + 125ms);
     CHECK(count_proc->getNumberOfTriggers() == 1);
 
     count_proc->setOnTriggerDuration(50ms);
     auto second_task_reschedule_info = 
timer_driven_agent->run(count_proc.get(), context, factory);
 
-    CHECK(!second_task_reschedule_info.finished_);
-    CHECK(second_task_reschedule_info.wait_time_ <= 75ms);
+    CHECK(!second_task_reschedule_info.isFinished());
+    CHECK(first_task_reschedule_info.getNextExecutionTime() <= 
std::chrono::steady_clock::now() + 75ms);
     CHECK(count_proc->getNumberOfTriggers() == 2);
 
     count_proc->setOnTriggerDuration(150ms);
     auto third_task_reschedule_info = 
timer_driven_agent->run(count_proc.get(), context, factory);
-    CHECK(!third_task_reschedule_info.finished_);
-    CHECK(third_task_reschedule_info.wait_time_ == 0ms);
+    CHECK(!third_task_reschedule_info.isFinished());
+    CHECK(first_task_reschedule_info.getNextExecutionTime() < 
std::chrono::steady_clock::now());
     CHECK(count_proc->getNumberOfTriggers() == 3);
   }
 
@@ -101,33 +101,42 @@ TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") {
     auto event_driven_agent = 
std::make_shared<EventDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
 test_repo, test_repo, content_repo, configuration, thread_pool);
     event_driven_agent->start();
     auto first_task_reschedule_info = 
event_driven_agent->run(count_proc.get(), context, factory);
-    CHECK(!first_task_reschedule_info.finished_);
-    CHECK(first_task_reschedule_info.wait_time_ == 0ms);
+    CHECK(!first_task_reschedule_info.isFinished());
+    CHECK(first_task_reschedule_info.getNextExecutionTime() < 
std::chrono::steady_clock::now());
     auto count_num_after_one_schedule = count_proc->getNumberOfTriggers();
     CHECK(count_num_after_one_schedule > 100);
 
     auto second_task_reschedule_info = 
event_driven_agent->run(count_proc.get(), context, factory);
-    CHECK(!second_task_reschedule_info.finished_);
-    CHECK(second_task_reschedule_info.wait_time_ == 0ms);
+    CHECK(!second_task_reschedule_info.isFinished());
+    CHECK(second_task_reschedule_info.getNextExecutionTime() < 
std::chrono::steady_clock::now());
     auto count_num_after_two_schedule = count_proc->getNumberOfTriggers();
     CHECK(count_num_after_two_schedule > count_num_after_one_schedule+100);
   }
 
   SECTION("Cron Driven every year") {
+#ifdef WIN32
+    date::set_install(TZ_DATA_DIR);
+#endif
     count_proc->setCronPeriod("0 0 0 1 1 ?");
     auto cron_driven_agent = 
std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
 test_repo, test_repo, content_repo, configuration, thread_pool);
     cron_driven_agent->start();
     auto first_task_reschedule_info = cron_driven_agent->run(count_proc.get(), 
context, factory);
-    CHECK(!first_task_reschedule_info.finished_);
-    if (first_task_reschedule_info.wait_time_ > 1min) {  // To avoid possibly 
failing around dec 31 23:59:59
-      auto next_run_time_point = 
std::chrono::round<std::chrono::years>(std::chrono::system_clock::now() + 
first_task_reschedule_info.wait_time_);
-      CHECK(next_run_time_point == 
std::chrono::ceil<std::chrono::years>(std::chrono::system_clock::now()));
+    CHECK(!first_task_reschedule_info.isFinished());
+    if (first_task_reschedule_info.getNextExecutionTime() > 
std::chrono::steady_clock::now() + 1min) {  // To avoid possibly failing around 
dec 31 23:59:59
+      auto wait_time_till_next_execution_time = 
std::chrono::round<std::chrono::seconds>(first_task_reschedule_info.getNextExecutionTime()
 - std::chrono::steady_clock::now());
+
+      auto current_time = 
date::make_zoned<std::chrono::seconds>(date::current_zone(), 
std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now()));
+      auto current_year_month_day = 
date::year_month_day(date::floor<date::days>(current_time.get_local_time()));
+      auto new_years_day = 
date::make_zoned<std::chrono::seconds>(date::current_zone(), 
date::local_days{date::year{current_year_month_day.year()+date::years(1)}/date::January/1});
+
+      auto time_until_new_years_day = new_years_day.get_local_time() - 
current_time.get_local_time();
+
+      CHECK(std::chrono::round<std::chrono::minutes>(time_until_new_years_day 
- wait_time_till_next_execution_time) == 0min);
       CHECK(count_proc->getNumberOfTriggers() == 0);
 
       auto second_task_reschedule_info = 
cron_driven_agent->run(count_proc.get(), context, factory);
-      CHECK(!second_task_reschedule_info.finished_);
-      next_run_time_point = 
std::chrono::round<std::chrono::years>(std::chrono::system_clock::now() + 
first_task_reschedule_info.wait_time_);
-      CHECK(next_run_time_point == 
std::chrono::ceil<std::chrono::years>(std::chrono::system_clock::now()));
+      CHECK(!second_task_reschedule_info.isFinished());
+      
CHECK(std::chrono::round<std::chrono::minutes>(first_task_reschedule_info.getNextExecutionTime()
 - second_task_reschedule_info.getNextExecutionTime()) == 0min);
       CHECK(count_proc->getNumberOfTriggers() == 0);
     }
   }
@@ -137,14 +146,14 @@ TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") {
     auto cron_driven_agent = 
std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
 test_repo, test_repo, content_repo, configuration, thread_pool);
     cron_driven_agent->start();
     auto first_task_reschedule_info = cron_driven_agent->run(count_proc.get(), 
context, factory);
-    CHECK(!first_task_reschedule_info.finished_);
-    CHECK(first_task_reschedule_info.wait_time_ <= 1s);
+    CHECK(!first_task_reschedule_info.isFinished());
+    CHECK(first_task_reschedule_info.getNextExecutionTime() <= 
std::chrono::steady_clock::now() + 1s);
     CHECK(count_proc->getNumberOfTriggers() == 0);
 
-    std::this_thread::sleep_for(first_task_reschedule_info.wait_time_ + 1ms);
+    
std::this_thread::sleep_until(first_task_reschedule_info.getNextExecutionTime());
     auto second_task_reschedule_info = 
cron_driven_agent->run(count_proc.get(), context, factory);
-    CHECK(!second_task_reschedule_info.finished_);
-    CHECK(second_task_reschedule_info.wait_time_ <= 1s);
+    CHECK(!second_task_reschedule_info.isFinished());
+    CHECK(second_task_reschedule_info.getNextExecutionTime() <= 
std::chrono::steady_clock::now() + 1s);
     CHECK(count_proc->getNumberOfTriggers() == 1);
   }
 
@@ -153,7 +162,7 @@ TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") {
     auto cron_driven_agent = 
std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
 test_repo, test_repo, content_repo, configuration, thread_pool);
     cron_driven_agent->start();
     auto first_task_reschedule_info = cron_driven_agent->run(count_proc.get(), 
context, factory);
-    CHECK(first_task_reschedule_info.finished_);
+    CHECK(first_task_reschedule_info.isFinished());
   }
 }
 }  // namespace org::apache::nifi::minifi::testing
diff --git a/libminifi/test/unit/SocketTests.cpp 
b/libminifi/test/unit/SocketTests.cpp
index 5c23e236c..a8d91f44e 100644
--- a/libminifi/test/unit/SocketTests.cpp
+++ b/libminifi/test/unit/SocketTests.cpp
@@ -27,9 +27,13 @@
 #include "../Catch.h"
 #include "io/StreamFactory.h"
 #include "io/Sockets.h"
-#include "utils/ThreadPool.h"
 #include "properties/Configuration.h"
 
+#include <asio/thread_pool.hpp>
+#include <asio/awaitable.hpp>
+#include <asio/co_spawn.hpp>
+#include <asio/use_future.hpp>
+
 namespace minifi = org::apache::nifi::minifi;
 namespace io = minifi::io;
 using io::Socket;
@@ -163,7 +167,7 @@ TEST_CASE("TestSocketWriteTestAfterClose", "[TestSocket7]") 
{
 #ifdef OPENSSL_SUPPORT
 std::atomic<uint8_t> counter;
 std::mt19937_64 seed { std::random_device { }() };
-bool createSocket() {
+asio::awaitable<bool> createSocket() {
   counter++;
   std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
 
@@ -175,57 +179,43 @@ bool createSocket() {
     socketA->initialize();
   }
 
-  return true;
+  co_return true;
 }
-/**
- * MINIFI-320 was created to address reallocations within TLSContext
- * This test will create 20 threads that attempt to create contexts
- * to ensure we no longer see the segfaults.
- */
+
 TEST_CASE("TestTLSContextCreation", "[TestSocket8]") {
-  utils::ThreadPool<bool> pool(20, true);
+  constexpr size_t number_of_threads = 20;
+  asio::thread_pool pool(number_of_threads);
 
   std::vector<std::future<bool>> futures;
-  for (int i = 0; i < 20; i++) {
-    std::function<bool()> f_ex = createSocket;
-    utils::Worker<bool> functor(f_ex, "id");
-    std::future<bool> fut;
-    pool.execute(std::move(functor), fut);  // NOLINT(bugprone-use-after-move)
-    futures.push_back(std::move(fut));
+  futures.reserve(number_of_threads);
+  for (size_t i = 0; i < number_of_threads; i++) {
+    futures.push_back(asio::co_spawn(pool, createSocket(), asio::use_future));
   }
-  pool.start();
+  pool.join();
   for (auto &&future : futures) {
-    future.wait();
+    CHECK(future.valid());
   }
 
-  REQUIRE(20 == counter.load());
+  REQUIRE(number_of_threads == counter.load());
 }
 
-/**
- * MINIFI-329 was created in regards to an option existing but not
- * being properly evaluated.
- */
 TEST_CASE("TestTLSContextCreation2", "[TestSocket9]") {
   std::shared_ptr<minifi::Configure> configure = 
std::make_shared<minifi::Configure>();
   configure->set(minifi::Configuration::nifi_remote_input_secure, "false");
   auto factory = io::StreamFactory::getInstance(configure);
   std::string host = Socket::getMyHostName();
   Socket *socket = factory->createSocket(host, 10001).release();
-  io::TLSSocket *tls = dynamic_cast<io::TLSSocket*>(socket);
+  auto *tls = dynamic_cast<io::TLSSocket*>(socket);
   REQUIRE(tls == nullptr);
 }
 
-/**
- * MINIFI-329 was created in regards to an option existing but not
- * being properly evaluated.
- */
 TEST_CASE("TestTLSContextCreationNullptr", "[TestSocket10]") {
   std::shared_ptr<minifi::Configure> configure = 
std::make_shared<minifi::Configure>();
   configure->set(minifi::Configuration::nifi_remote_input_secure, "false");
   auto factory = io::StreamFactory::getInstance(configure);
   std::string host = Socket::getMyHostName();
   io::Socket *socket = factory->createSecureSocket(host, 10001, 
nullptr).release();
-  io::TLSSocket *tls = dynamic_cast<minifi::io::TLSSocket*>(socket);
+  auto *tls = dynamic_cast<minifi::io::TLSSocket*>(socket);
   REQUIRE(tls == nullptr);
 }
 #endif  // OPENSSL_SUPPORT
diff --git a/libminifi/test/unit/ThreadPoolTests.cpp 
b/libminifi/test/unit/ThreadPoolTests.cpp
index 948fc73e5..cbf0547b5 100644
--- a/libminifi/test/unit/ThreadPoolTests.cpp
+++ b/libminifi/test/unit/ThreadPoolTests.cpp
@@ -25,67 +25,43 @@
 
 using namespace std::literals::chrono_literals;
 
-class WorkerNumberExecutions : public utils::AfterExecute<int> {
- public:
-  explicit WorkerNumberExecutions(int tasks)
-      : tasks(tasks) {
-  }
-
-  explicit WorkerNumberExecutions(WorkerNumberExecutions && other) noexcept
-      : runs(other.runs),
-        tasks(other.tasks) {
-  }
-
-  bool isFinished(const int &result) override {
-    return result <= 0 || ++runs >= tasks;
-  }
-  bool isCancelled(const int& /*result*/) override {
-    return false;
-  }
-
-  std::chrono::steady_clock::duration wait_time() override {
-    return 50ms;
-  }
-
- protected:
-  int runs{0};
-  int tasks;
-};
-
 TEST_CASE("ThreadPoolTest1", "[TPT1]") {
-  utils::ThreadPool<bool> pool(5);
-  utils::Worker<bool> functor([](){ return true; }, "id");
+  utils::ThreadPool pool(5);
+  utils::Worker functor([](){ return utils::TaskRescheduleInfo::Done(); }, 
"id");
   pool.start();
-  std::future<bool> fut;
+  std::future<utils::TaskRescheduleInfo> fut;
   pool.execute(std::move(functor), fut);  // NOLINT(bugprone-use-after-move)
-  fut.wait();
-  REQUIRE(true == fut.get());
+  CHECK(fut.get().isFinished());
 }
 
 TEST_CASE("ThreadPoolTest2", "[TPT2]") {
-  std::atomic<int> counter = 0;
-  utils::ThreadPool<int> pool(5);
-  std::unique_ptr<utils::AfterExecute<int>> after_execute = 
std::unique_ptr<utils::AfterExecute<int>>(new WorkerNumberExecutions(20));
-  utils::Worker<int> functor([&counter]() { return ++counter; }, "id", 
std::move(after_execute));
+  constexpr size_t max_counter = 20;
+  std::atomic<size_t> counter = 0;
+  utils::ThreadPool pool(5);
+  utils::Worker functor([&counter](){
+    if (++counter == max_counter)
+      return utils::TaskRescheduleInfo::Done();
+
+    return utils::TaskRescheduleInfo::RetryImmediately();}, "id");
   pool.start();
-  std::future<int> fut;
-  pool.execute(std::move(functor), fut);
-  fut.wait();
-  REQUIRE(20 == fut.get());
+  std::future<utils::TaskRescheduleInfo> fut;
+  pool.execute(std::move(functor), fut);  // NOLINT(bugprone-use-after-move)
+  CHECK(fut.get().isFinished());
+  REQUIRE(20 == counter);
 }
 
 TEST_CASE("Worker wait time should be relative to the last run") {
   std::vector<std::chrono::steady_clock::time_point> 
worker_execution_time_points;
-  utils::ThreadPool<utils::TaskRescheduleInfo> pool(1);
+  utils::ThreadPool pool(1);
   auto wait_time_between_tasks = 10ms;
-  utils::Worker<utils::TaskRescheduleInfo> 
worker([&]()->utils::TaskRescheduleInfo {
+  utils::Worker worker([&]()->utils::TaskRescheduleInfo {
     worker_execution_time_points.push_back(std::chrono::steady_clock::now());
     if (worker_execution_time_points.size() == 2) {
       return utils::TaskRescheduleInfo::Done();
     } else {
       return utils::TaskRescheduleInfo::RetryIn(wait_time_between_tasks);
     }
-  }, "id", std::make_unique<utils::ComplexMonitor>());
+  }, "id");
   std::this_thread::sleep_for(wait_time_between_tasks + 1ms);  // Pre-waiting 
should not matter
 
   std::future<utils::TaskRescheduleInfo> task_future;
@@ -94,7 +70,7 @@ TEST_CASE("Worker wait time should be relative to the last 
run") {
 
   auto final_task_reschedule_info = task_future.get();
 
-  CHECK(final_task_reschedule_info.finished_);
+  CHECK(final_task_reschedule_info.isFinished());
   REQUIRE(worker_execution_time_points.size() == 2);
   CHECK(worker_execution_time_points[1] - worker_execution_time_points[0] >= 
wait_time_between_tasks);
 }
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
index 427eed117..c17130371 100644
--- a/nanofi/include/cxx/Instance.h
+++ b/nanofi/include/cxx/Instance.h
@@ -148,9 +148,8 @@ class Instance {
     // run all functions independently
 
     for (auto function : functions) {
-      utils::Worker<utils::TaskRescheduleInfo> functor(function, "listeners");
       std::future<utils::TaskRescheduleInfo> future;
-      listener_thread_pool_.execute(std::move(functor), future);
+      listener_thread_pool_.execute(utils::Worker{function, "listeners"}, 
future);
     }
   }
 
@@ -169,7 +168,7 @@ class Instance {
   std::string url_;
   std::shared_ptr<Configure> configure_;
 
-  utils::ThreadPool<utils::TaskRescheduleInfo> listener_thread_pool_;
+  utils::ThreadPool listener_thread_pool_;
 };
 
 }  // namespace org::apache::nifi::minifi

Reply via email to