[ 
https://issues.apache.org/jira/browse/MINIFI-338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16095301#comment-16095301
 ] 

ASF GitHub Bot commented on MINIFI-338:
---------------------------------------

Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/117#discussion_r128620813
  
    --- Diff: libminifi/include/utils/ThreadPool.h ---
    @@ -246,15 +349,67 @@ void ThreadPool<T>::startWorkers() {
     template<typename T>
     void ThreadPool<T>::run_tasks() {
       auto waitperiod = std::chrono::milliseconds(1) * 100;
    +  uint64_t wait_decay_ = 0;
       while (running_.load()) {
     
    +    // if we are spinning, perform a wait. If something changes in the 
worker such that the timeslice has changed, we will pick that information up. 
Note that it's possible
    +    // we could starve for processing time if all workers are waiting. In 
the event that the number of workers far exceeds the number of threads, threads 
will spin and potentially
    +    // wait until they arrive at a task that can be run. In this case we 
reset the wait_decay and attempt to pick up a new task. This means that threads 
that recently ran should
    +    // be more likely to run. This is intentional.
    +    if (wait_decay_ > 1000) {
    +      std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
    +    }
         Worker<T> task;
         if (!worker_queue_.try_dequeue(task)) {
    +
           std::unique_lock<std::mutex> lock(worker_queue_mutex_);
           tasks_available_.wait_for(lock, waitperiod);
           continue;
         }
    -    task.run();
    +    else {
    +
    +      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
    +      if (!task_status_[task.getIdentifier()]) {
    +        continue;
    +      }
    +    }
    +
    +    bool wait_to_run = false;
    +    if (task.getTimeSlice() > 1) {
    +      auto now = std::chrono::system_clock::now().time_since_epoch();
    +      auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now);
    +      if (task.getTimeSlice() > ms.count()) {
    +        wait_to_run = true;
    +      }
    +    }
    +    // if we have to wait we re-queue the worker.
    +    if (wait_to_run) {
    +      {
    +        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
    +        if (!task_status_[task.getIdentifier()]) {
    +          continue;
    +        }
    +      }
    +      worker_queue_.enqueue(std::move(task));
    --- End diff --
    
    do we need to enqueue to head?


> Threads can be unbounded per flow configuration
> -----------------------------------------------
>
>                 Key: MINIFI-338
>                 URL: https://issues.apache.org/jira/browse/MINIFI-338
>             Project: Apache NiFi MiNiFi
>          Issue Type: Bug
>          Components: C++
>            Reporter: marco polo
>            Assignee: marco polo
>
> The number of tasks configured by a given processor should be bounded by a 
> thread pool configuration. Currently the schedulers have no concept of a 
> thread pool except for the component life cycle thread pool. We should 
> transition the tasks to a thread pool shared by the scheduler and is globally 
> configurable to better minimize the impact of processors. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to