Repository: nifi Updated Branches: refs/heads/master 790102d8b -> d4d4ddade
NIFI-5361: When submitting many processors to start, calculate the 'timeout timestamp' immediately before calling @OnScheduled method, after the task has been scheduled to run, instead of before the task has a chance to run. This closes #2831 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d4d4ddad Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d4d4ddad Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d4d4ddad Branch: refs/heads/master Commit: d4d4ddadee8b7fcba17ea25b008fa75185336efc Parents: 790102d Author: Mark Payne <marka...@hotmail.com> Authored: Fri Jun 29 09:25:57 2018 -0400 Committer: Matt Gilman <matt.c.gil...@gmail.com> Committed: Mon Jul 2 16:36:07 2018 -0400 ---------------------------------------------------------------------- .../org/apache/nifi/controller/StandardProcessorNode.java | 8 ++++++-- .../nifi/controller/scheduling/StandardProcessScheduler.java | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d4d4ddad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 6a831d0..c2b98e6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -1482,12 +1482,16 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable final Processor processor = getProcessor(); final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor); - final long completionTimestamp = System.currentTimeMillis() + onScheduleTimeoutMillis; + // Completion Timestamp is set to MAX_VALUE because we don't want to timeout until the task has a chance to run. + final AtomicLong completionTimestampRef = new AtomicLong(Long.MAX_VALUE); // Create a task to invoke the @OnScheduled annotation of the processor final Callable<Void> startupTask = () -> { LOG.debug("Invoking @OnScheduled methods of {}", processor); + // Now that the task has been scheduled, set the timeout + completionTimestampRef.set(System.currentTimeMillis() + onScheduleTimeoutMillis); + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { try { activateThread(); @@ -1572,7 +1576,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable return; } - monitorAsyncTask(taskFuture, monitoringFuture, completionTimestamp); + monitorAsyncTask(taskFuture, monitoringFuture, completionTimestampRef.get()); } }; http://git-wip-us.apache.org/repos/asf/nifi/blob/d4d4ddad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 8f7eb2f..b23e763 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -319,7 +319,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { @Override public Future<?> scheduleTask(final Callable<?> task) { lifecycleState.incrementActiveThreadCount(null); - return componentMonitoringThreadPool.submit(task); + return componentLifeCycleThreadPool.submit(task); } @Override