[ https://issues.apache.org/jira/browse/MINIFI-338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16096298#comment-16096298 ]
ASF GitHub Bot commented on MINIFI-338: --------------------------------------- Github user benqiu2016 commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/117#discussion_r128773288 --- Diff: libminifi/include/utils/ThreadPool.h --- @@ -246,15 +349,67 @@ void ThreadPool<T>::startWorkers() { template<typename T> void ThreadPool<T>::run_tasks() { auto waitperiod = std::chrono::milliseconds(1) * 100; + uint64_t wait_decay_ = 0; while (running_.load()) { + // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible + // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially + // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should + // be more likely to run. This is intentional. + if (wait_decay_ > 1000) { + std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_)); + } Worker<T> task; if (!worker_queue_.try_dequeue(task)) { + std::unique_lock<std::mutex> lock(worker_queue_mutex_); tasks_available_.wait_for(lock, waitperiod); continue; } - task.run(); + else { + + std::unique_lock<std::mutex> lock(worker_queue_mutex_); + if (!task_status_[task.getIdentifier()]) { + continue; + } + } + + bool wait_to_run = false; + if (task.getTimeSlice() > 1) { + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now); + if (task.getTimeSlice() > ms.count()) { + wait_to_run = true; + } + } + // if we have to wait we re-queue the worker. + if (wait_to_run) { + { + std::unique_lock<std::mutex> lock(worker_queue_mutex_); + if (!task_status_[task.getIdentifier()]) { + continue; + } + } + worker_queue_.enqueue(std::move(task)); --- End diff -- OK. it is possible to sort the queue or somehow to make it such that the head of the queue is the first to expire. In this case, we can avoid enqueue/dequeue for all the items in the queues. > Threads can be unbounded per flow configuration > ----------------------------------------------- > > Key: MINIFI-338 > URL: https://issues.apache.org/jira/browse/MINIFI-338 > Project: Apache NiFi MiNiFi > Issue Type: Bug > Components: C++ > Reporter: marco polo > Assignee: marco polo > > The number of tasks configured by a given processor should be bounded by a > thread pool configuration. Currently the schedulers have no concept of a > thread pool except for the component life cycle thread pool. We should > transition the tasks to a thread pool shared by the scheduler and is globally > configurable to better minimize the impact of processors. -- This message was sent by Atlassian JIRA (v6.4.14#64029)