NIFI-1464 ensured that OnUnscheduled is treated the same as OnScheduled

NIFI-1464 polished javadocs, error messages and docs


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/59fac58c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/59fac58c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/59fac58c

Branch: refs/heads/master
Commit: 59fac58c96f4127b1bf6fb7a501e934692534a03
Parents: 1c22f3f
Author: Oleg Zhurakousky <[email protected]>
Authored: Wed Mar 9 12:30:11 2016 -0500
Committer: Mark Payne <[email protected]>
Committed: Fri Mar 11 12:54:50 2016 -0500

----------------------------------------------------------------------
 .../org/apache/nifi/util/NiFiProperties.java    |  2 +-
 .../src/main/asciidoc/administration-guide.adoc |  2 +-
 .../nifi/controller/StandardProcessorNode.java  | 79 +++++++++++---------
 .../scheduling/TestProcessorLifecycle.java      |  4 +-
 4 files changed, 49 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/59fac58c/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 2be8177..3d05a47 100644
--- 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -71,7 +71,7 @@ public class NiFiProperties extends Properties {
     public static final String ADMINISTRATIVE_YIELD_DURATION = 
"nifi.administrative.yield.duration";
     public static final String PERSISTENT_STATE_DIRECTORY = 
"nifi.persistent.state.directory";
     public static final String BORED_YIELD_DURATION = 
"nifi.bored.yield.duration";
-    public static final String PROCESSOR_START_TIMEOUT = 
"nifi.processor.start.timeout";
+    public static final String PROCESSOR_SCHEDULING_TIMEOUT = 
"nifi.processor.scheduling.timeout";
 
     // content repository properties
     public static final String REPOSITORY_CONTENT_PREFIX = 
"nifi.content.repository.directory.";

http://git-wip-us.apache.org/repos/asf/nifi/blob/59fac58c/nifi-docs/src/main/asciidoc/administration-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc 
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 58fe205..9ca582d 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -1153,7 +1153,7 @@ nifi.nar.library.directory.lib2=/nars/lib2 +
 Providing three total locations, including  _nifi.nar.library.directory_.
 |nifi.nar.working.directory|The location of the nar working directory. The 
default value is ./work/nar and probably should be left as is.
 |nifi.documentation.working.directory|The documentation working directory. The 
default value is ./work/docs/components and probably should be left as is.
-|nifi.processor.start.timeout|Time (milliseconds) to wait for a Processors to 
start before other life-cycle operation (e.g., stop) could be invoked. Default 
is infinite. 
+|nifi.processor.scheduling.timeout|Time (milliseconds) to wait for a 
Processor's life-cycle operation (@OnScheduled and @OnUnscheduled) to finish 
before other life-cycle operation (e.g., stop) could be invoked. Default is 
infinite. 
 |====
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/59fac58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 2a0819c..85c5fe8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -1241,9 +1241,16 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
                 @Override
                 public void run() {
                     try {
-                        SchedulingContext schedulingContext = new 
StandardSchedulingContext(processContext, getControllerServiceProvider(),
+                        final SchedulingContext schedulingContext = new 
StandardSchedulingContext(processContext, getControllerServiceProvider(),
                                 StandardProcessorNode.this, 
processContext.getStateManager());
-                        invokeOnScheduleAsync(taskScheduler, 
schedulingContext);
+                        invokeTaskAsCancelableFuture(taskScheduler, new 
Callable<Void>() {
+                            @SuppressWarnings("deprecation")
+                            @Override
+                            public Void call() throws Exception {
+                                
ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, 
org.apache.nifi.processor.annotation.OnScheduled.class, processor, 
schedulingContext);
+                                return null;
+                            }
+                        });
                         if 
(scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) 
{
                             schedulingAgentCallback.run(); // callback 
provided by StandardProcessScheduler to essentially initiate component's 
onTrigger() cycle
                         } else { // can only happen if stopProcessor was 
called before service was transitioned to RUNNING state
@@ -1302,12 +1309,20 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
     public <T extends ProcessContext & ControllerServiceLookup> void 
stop(final ScheduledExecutorService scheduler,
             final T processContext, final Callable<Boolean> 
activeThreadMonitorCallback) {
         if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, 
ScheduledState.STOPPING)) { // will ensure that the Processor represented by 
this node can only be stopped once
-            final Runnable stopProcRunnable = new Runnable() {
+            invokeTaskAsCancelableFuture(scheduler, new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
processor, processContext);
+                    return null;
+                }
+            });
+            // will continue to monitor active threads, invoking OnStopped once
+            // there are none
+            scheduler.execute(new Runnable() {
                 @Override
                 public void run() {
                     try {
                         if (activeThreadMonitorCallback.call()) {
-                            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
processor, processContext);
                             
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, 
processContext);
                             scheduledState.set(ScheduledState.STOPPED);
                         } else {
@@ -1317,8 +1332,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
                         LOG.warn("Failed while shutting down processor " + 
processor, e);
                     }
                 }
-            };
-            scheduler.execute(stopProcRunnable);
+            });
         } else {
             /*
              * We do compareAndSet() instead of set() to ensure that Processor
@@ -1333,41 +1347,35 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
     }
 
     /**
-     * Will invoke processor's methods annotated with @OnSchedule 
asynchronously
-     * to ensure that it could be interrupted if stop action was initiated on
-     * the processor that may be sitting in the infinitely blocking @OnSchedule
+     * Will invoke lifecycle operation (OnScheduled or OnUnscheduled)
+     * asynchronously to ensure that it could be interrupted if stop action was
+     * initiated on the processor that may be infinitely blocking in such
      * operation. While this approach paves the way for further enhancements
      * related to managing processor'slife-cycle operation at the moment the
      * interrupt will not happen automatically. This is primarily to preserve
-     * the existing behavior or the NiFi where stop operation can only be
+     * the existing behavior of the NiFi where stop operation can only be
      * invoked once the processor is started. Unfortunately that could mean 
that
-     * the processor may be blocking indefinitely in the @Oncheduled call. To
-     * deal with that a new NiFi property has been introduced
-     * <i>nifi.processor.start.timeout</i> which allows one to set the time (in
-     * milliseconds) of how long to wait before canceling the @OnScheduled task
-     * allowing processor's stop sequence to proceed. The default value for 
this
-     * property is {@link Long#MAX_VALUE}.
+     * the processor may be blocking indefinitely in lifecycle operation
+     * (OnScheduled or OnUnscheduled). To deal with that a new NiFi property 
has
+     * been introduced <i>nifi.processor.scheduling.timeout</i> which allows 
one
+     * to set the time (in milliseconds) of how long to wait before canceling
+     * such lifecycle operation (OnScheduled or OnUnscheduled) allowing
+     * processor's stop sequence to proceed. The default value for this 
property
+     * is {@link Long#MAX_VALUE}.
      * <p>
      * NOTE: Canceling the task does not guarantee that the task will actually
      * completes (successfully or otherwise), since cancellation of the task
-     * will issue a simple Thread.interrupt(). However code inside
-     * of @OnScheduled operation is written purely and will ignore thread
-     * interrupts you may end up with runaway thread which may eventually
-     * require NiFi reboot. In any event, the above explanation will be logged
-     * (WARN) informing a user so further actions could be taken.
+     * will issue a simple Thread.interrupt(). However code inside of lifecycle
+     * operation (OnScheduled or OnUnscheduled) is written purely and will
+     * ignore thread interrupts you may end up with runaway thread which may
+     * eventually require NiFi reboot. In any event, the above explanation will
+     * be logged (WARN) informing a user so further actions could be taken.
      * </p>
      */
-    private void invokeOnScheduleAsync(ScheduledExecutorService taskScheduler, 
final SchedulingContext schedulingContext) throws ExecutionException {
-        Future<Void> executionResult = taskScheduler.submit(new 
Callable<Void>() {
-            @SuppressWarnings("deprecation")
-            @Override
-            public Void call() throws Exception {
-                
ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, 
org.apache.nifi.processor.annotation.OnScheduled.class, processor, 
schedulingContext);
-                return null;
-            }
-        });
+    private void invokeTaskAsCancelableFuture(ScheduledExecutorService 
taskScheduler, Callable<Void> task) {
+        Future<Void> executionResult = taskScheduler.submit(task);
 
-        String timeoutString = 
NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_START_TIMEOUT);
+        String timeoutString = 
NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
         long onScheduleTimeout = timeoutString == null ? Long.MAX_VALUE
                 : FormatUtils.getTimeDuration(timeoutString.trim(), 
TimeUnit.MILLISECONDS);
 
@@ -1375,17 +1383,20 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
             executionResult.get(onScheduleTimeout, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             LOG.warn("Thread was interrupted while waiting for processor '" + 
this.processor.getClass().getSimpleName()
-                    + "' @OnSchedule operation to finish.");
+                    + "' lifecycle operation (OnScheduled or OnUnscheduled) to 
finish.");
             Thread.currentThread().interrupt();
         } catch (TimeoutException e) {
             executionResult.cancel(true);
-            LOG.warn("Timed out while waiting for the task executing 
@OnSchedule operation for '"
+            LOG.warn("Timed out while waiting for lifecycle operation 
(OnScheduled or OnUnscheduled) of '"
                     + this.processor.getClass().getSimpleName()
                     + "' processor to finish. An attempt is made to cancel the 
task via Thread.interrupt(). However it does not "
-                    + "guarantee that the task will be canceled since the code 
inside @OnSchedule method may "
+                    + "guarantee that the task will be canceled since the code 
inside current lifecycle operation (OnScheduled or OnUnscheduled) may "
                     + "have been written to ignore interrupts which may result 
in runaway thread which could lead to more issues "
                     + "eventually requiring NiFi to be restarted. This is 
usually a bug in the target Processor '"
                     + this.processor + "' that needs to be documented, 
reported and eventually fixed.");
+        } catch (ExecutionException e){
+            throw new RuntimeException(
+                    "Failed while executing one of processor's lifecycle tasks 
(OnScheduled or OnUnscheduled).", e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/59fac58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
index ca2e4fa..560c4cb 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
@@ -409,7 +409,7 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void 
validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() 
throws Exception {
-        
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT,
 "5 sec");
+        
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT,
 "5 sec");
         FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
@@ -439,7 +439,7 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void 
validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() 
throws Exception {
-        
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT,
 "5 sec");
+        
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT,
 "5 sec");
         FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);

Reply via email to