Repository: nifi
Updated Branches:
  refs/heads/master 790102d8b -> d4d4ddade


NIFI-5361: When submitting many processors to start, calculate the 'timeout 
timestamp' immediately before calling @OnScheduled method, after the task has 
been scheduled to run, instead of before the task has a chance to run.

This closes #2831


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d4d4ddad
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d4d4ddad
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d4d4ddad

Branch: refs/heads/master
Commit: d4d4ddadee8b7fcba17ea25b008fa75185336efc
Parents: 790102d
Author: Mark Payne <marka...@hotmail.com>
Authored: Fri Jun 29 09:25:57 2018 -0400
Committer: Matt Gilman <matt.c.gil...@gmail.com>
Committed: Mon Jul 2 16:36:07 2018 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/controller/StandardProcessorNode.java    | 8 ++++++--
 .../nifi/controller/scheduling/StandardProcessScheduler.java | 2 +-
 2 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d4d4ddad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 6a831d0..c2b98e6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -1482,12 +1482,16 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
         final Processor processor = getProcessor();
         final ComponentLog procLog = new 
SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
 
-        final long completionTimestamp = System.currentTimeMillis() + 
onScheduleTimeoutMillis;
+        // Completion Timestamp is set to MAX_VALUE because we don't want to 
timeout until the task has a chance to run.
+        final AtomicLong completionTimestampRef = new 
AtomicLong(Long.MAX_VALUE);
 
         // Create a task to invoke the @OnScheduled annotation of the processor
         final Callable<Void> startupTask = () -> {
             LOG.debug("Invoking @OnScheduled methods of {}", processor);
 
+            // Now that the task has been scheduled, set the timeout
+            completionTimestampRef.set(System.currentTimeMillis() + 
onScheduleTimeoutMillis);
+
             try (final NarCloseable nc = 
NarCloseable.withComponentNarLoader(processor.getClass(), 
processor.getIdentifier())) {
                 try {
                     activateThread();
@@ -1572,7 +1576,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
                     return;
                 }
 
-                monitorAsyncTask(taskFuture, monitoringFuture, 
completionTimestamp);
+                monitorAsyncTask(taskFuture, monitoringFuture, 
completionTimestampRef.get());
             }
         };
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d4d4ddad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 8f7eb2f..b23e763 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -319,7 +319,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
             @Override
             public Future<?> scheduleTask(final Callable<?> task) {
                 lifecycleState.incrementActiveThreadCount(null);
-                return componentMonitoringThreadPool.submit(task);
+                return componentLifeCycleThreadPool.submit(task);
             }
 
             @Override

Reply via email to