NIFI-54: When incrementing active thread count, if the value exceeds max, do not run
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/97f8ab0c Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/97f8ab0c Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/97f8ab0c Branch: refs/heads/nifi-27 Commit: 97f8ab0cc50f77a2ff3a4e9d575ec05c185e5b80 Parents: 8254b75 Author: Mark Payne <marka...@hotmail.com> Authored: Wed Dec 10 11:57:33 2014 -0500 Committer: Mark Payne <marka...@hotmail.com> Committed: Wed Dec 10 11:57:33 2014 -0500 ---------------------------------------------------------------------- .../scheduling/EventDrivenSchedulingAgent.java | 24 ++++++++++++++++++-- .../controller/scheduling/ScheduleState.java | 8 +++---- 2 files changed, 26 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/97f8ab0c/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java index 5b237ff..af801bb 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java @@ -263,7 +263,17 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent { } private void trigger(final Connectable worker, final ScheduleState scheduleState, final ConnectableProcessContext processContext, final ProcessSessionFactory sessionFactory) { - scheduleState.incrementActiveThreadCount(); + final int newThreadCount = scheduleState.incrementActiveThreadCount(); + if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) { + // its possible that the worker queue could give us a worker node that is eligible to run based + // on the number of threads but another thread has already incremented the thread count, result in + // reaching the maximum number of threads. we won't know this until we atomically increment the thread count + // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would + // result in using more than the maximum number of defined threads + scheduleState.decrementActiveThreadCount(); + return; + } + try { try (final AutoCloseable ncl = NarCloseable.withNarLoader()) { worker.onTrigger(processContext, sessionFactory); @@ -293,7 +303,17 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent { } private void trigger(final ProcessorNode worker, final ProcessContext context, final ScheduleState scheduleState, final StandardProcessContext processContext, final ProcessSessionFactory sessionFactory) { - scheduleState.incrementActiveThreadCount(); + final int newThreadCount = scheduleState.incrementActiveThreadCount(); + if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) { + // its possible that the worker queue could give us a worker node that is eligible to run based + // on the number of threads but another thread has already incremented the thread count, result in + // reaching the maximum number of threads. we won't know this until we atomically increment the thread count + // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would + // result in using more than the maximum number of defined threads + scheduleState.decrementActiveThreadCount(); + return; + } + try { try (final AutoCloseable ncl = NarCloseable.withNarLoader()) { worker.onTrigger(processContext, sessionFactory); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/97f8ab0c/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java index c10de83..eb5a437 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java @@ -31,12 +31,12 @@ public class ScheduleState { private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false); private volatile long lastStopTime = -1; - public void incrementActiveThreadCount() { - activeThreadCount.incrementAndGet(); + public int incrementActiveThreadCount() { + return activeThreadCount.incrementAndGet(); } - public void decrementActiveThreadCount() { - activeThreadCount.decrementAndGet(); + public int decrementActiveThreadCount() { + return activeThreadCount.decrementAndGet(); } public int getActiveThreadCount() {