NIFI-1464, Refactored Processor's life-cycle operation sequence
* Simplified and cleaned StandardProcessScheduler.start/stopProcessor methods
* Added stop/start operations to ProcessorNode.
* Removed unnecessary synchronization blocks related to ScheduledState in favor 
of enforcing order and idempotency via CAS operations. Those synchronization 
blocks were causing intermittent deadlocks whenever @OnScheduled blocks 
indefinitely.
* Added support for stopping the service when @OnScheduled operation hangs.
* Fixed the order of life-cycle operation invocation ensuring that each 
operation can *only* be invoked at the appropriate time
* Removed unnecessary locks from StandardProcessNode since Atomic variables are 
used.
* Removed calls to @OnStopped from ContinuallyRunningProcessTask while ensuring 
that procesor's full shut down in implementation of 
StandardProcessorNode.stop() method.
* Removed dead code
* Added comprehensive tests suite that covers 95% of Processor's life-cycle 
operations within the scope of FlowController, StandardProcesssScheduler and 
StandardProcessNode
* Improved and added javadocs on covered operations with detailed explanations.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0c5b1c27
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0c5b1c27
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0c5b1c27

Branch: refs/heads/master
Commit: 0c5b1c27f26a008baedac4eb0c69f9b01080eaec
Parents: f6705f2
Author: Oleg Zhurakousky <[email protected]>
Authored: Thu Feb 4 16:23:45 2016 -0500
Committer: Mark Payne <[email protected]>
Committed: Fri Mar 11 12:54:50 2016 -0500

----------------------------------------------------------------------
 .../apache/nifi/controller/ScheduledState.java  |    6 +-
 .../org/apache/nifi/util/NiFiProperties.java    |    2 +
 .../src/main/asciidoc/administration-guide.adoc |    1 +
 .../controller/AbstractConfiguredComponent.java |    7 +-
 .../apache/nifi/controller/ProcessorNode.java   |   50 +-
 .../nifi/controller/StandardProcessorNode.java  | 1135 ++++++++++--------
 .../scheduling/AbstractSchedulingAgent.java     |  103 ++
 .../scheduling/EventDrivenSchedulingAgent.java  |   10 +-
 .../scheduling/QuartzSchedulingAgent.java       |   10 +-
 .../controller/scheduling/ScheduleState.java    |    8 +-
 .../scheduling/StandardProcessScheduler.java    |  218 +---
 .../scheduling/TimerDrivenSchedulingAgent.java  |   10 +-
 .../StandardControllerServiceProvider.java      |    1 +
 .../tasks/ContinuallyRunProcessorTask.java      |   12 -
 .../scheduling/TestProcessorLifecycle.java      |  760 ++++++++++++
 .../TestStandardControllerServiceProvider.java  |   19 +-
 .../src/test/resources/state-management.xml     |   38 +
 17 files changed, 1654 insertions(+), 736 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0c5b1c27/nifi-api/src/main/java/org/apache/nifi/controller/ScheduledState.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/ScheduledState.java 
b/nifi-api/src/main/java/org/apache/nifi/controller/ScheduledState.java
index d482eae..bf608db 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/ScheduledState.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/ScheduledState.java
@@ -33,5 +33,9 @@ public enum ScheduledState {
     /**
      * Entity is currently scheduled to run
      */
-    RUNNING;
+    RUNNING,
+
+    STARTING,
+
+    STOPPING;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0c5b1c27/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index c432edf..2be8177 100644
--- 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -71,6 +71,7 @@ public class NiFiProperties extends Properties {
     public static final String ADMINISTRATIVE_YIELD_DURATION = 
"nifi.administrative.yield.duration";
     public static final String PERSISTENT_STATE_DIRECTORY = 
"nifi.persistent.state.directory";
     public static final String BORED_YIELD_DURATION = 
"nifi.bored.yield.duration";
+    public static final String PROCESSOR_START_TIMEOUT = 
"nifi.processor.start.timeout";
 
     // content repository properties
     public static final String REPOSITORY_CONTENT_PREFIX = 
"nifi.content.repository.directory.";
@@ -539,6 +540,7 @@ public class NiFiProperties extends Properties {
         return shouldSupport;
     }
 
+    @SuppressWarnings("unchecked")
     public Set<String> getAnonymousAuthorities() {
         final Set<String> authorities;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0c5b1c27/nifi-docs/src/main/asciidoc/administration-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc 
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 682a92c..58fe205 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -1153,6 +1153,7 @@ nifi.nar.library.directory.lib2=/nars/lib2 +
 Providing three total locations, including  _nifi.nar.library.directory_.
 |nifi.nar.working.directory|The location of the nar working directory. The 
default value is ./work/nar and probably should be left as is.
 |nifi.documentation.working.directory|The documentation working directory. The 
default value is ./work/docs/components and probably should be left as is.
+|nifi.processor.start.timeout|Time (milliseconds) to wait for a Processors to 
start before other life-cycle operation (e.g., stop) could be invoked. Default 
is infinite. 
 |====
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0c5b1c27/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
index 924e61e..65e59ee 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
@@ -44,7 +44,6 @@ public abstract class AbstractConfiguredComponent implements 
ConfigurableCompone
     private final ConfigurableComponent component;
     private final ValidationContextFactory validationContextFactory;
     private final ControllerServiceProvider serviceProvider;
-
     private final AtomicReference<String> name;
     private final AtomicReference<String> annotationData = new 
AtomicReference<>();
 
@@ -298,4 +297,10 @@ public abstract class AbstractConfiguredComponent 
implements ConfigurableCompone
 
     public abstract void verifyModifiable() throws IllegalStateException;
 
+    /**
+     *
+     */
+    ControllerServiceProvider getControllerServiceProvider() {
+        return this.serviceProvider;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0c5b1c27/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index d340c77..d7dbe24 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -18,6 +18,8 @@ package org.apache.nifi.controller;
 
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -25,6 +27,7 @@ import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.scheduling.SchedulingStrategy;
@@ -54,8 +57,6 @@ public abstract class ProcessorNode extends 
AbstractConfiguredComponent implemen
     @Override
     public abstract boolean isValid();
 
-    public abstract void setScheduledState(ScheduledState scheduledState);
-
     public abstract void setBulletinLevel(LogLevel bulletinLevel);
 
     public abstract LogLevel getBulletinLevel();
@@ -99,4 +100,49 @@ public abstract class ProcessorNode extends 
AbstractConfiguredComponent implemen
      */
     public abstract void verifyCanStart(Set<ControllerServiceNode> 
ignoredReferences);
 
+    /**
+     * Will start the {@link Processor} represented by this
+     * {@link ProcessorNode}. Starting processor typically means invoking it's
+     * operation that is annotated with @OnScheduled and then executing a
+     * callback provided by the {@link ProcessScheduler} to which typically
+     * initiates
+     * {@link Processor#onTrigger(ProcessContext, 
org.apache.nifi.processor.ProcessSessionFactory)}
+     * cycle.
+     *
+     * @param scheduler
+     *            implementation of {@link ScheduledExecutorService} used to
+     *            initiate processor <i>start</i> task
+     * @param administrativeYieldMillis
+     *            the amount of milliseconds to wait for administrative yield
+     * @param processContext
+     *            the instance of {@link ProcessContext} and
+     *            {@link ControllerServiceLookup}
+     * @param schedulingAgentCallback
+     *            the callback provided by the {@link ProcessScheduler} to
+     *            execute upon successful start of the Processor
+     */
+    public abstract <T extends ProcessContext & ControllerServiceLookup> void 
start(ScheduledExecutorService scheduler,
+            long administrativeYieldMillis, T processContext, Runnable 
schedulingAgentCallback);
+
+    /**
+     * Will stop the {@link Processor} represented by this {@link 
ProcessorNode}.
+     * Stopping processor typically means invoking it's operation that is
+     * annotated with @OnUnschedule and then @OnStopped.
+     *
+     * @param scheduler
+     *            implementation of {@link ScheduledExecutorService} used to
+     *            initiate processor <i>stop</i> task
+     * @param processContext
+     *            the instance of {@link ProcessContext} and
+     *            {@link ControllerServiceLookup}
+     * @param activeThreadMonitorCallback
+     *            the callback provided by the {@link ProcessScheduler} to
+     *            report the count of processor's active threads. Typically it
+     *            is used to ensure that operations annotated with 
@OnUnschedule
+     *            and then @OnStopped are not invoked until such count reaches
+     *            0, essentially allowing tasks to finish before bringing
+     *            processor to a halt.
+     */
+    public abstract <T extends ProcessContext & ControllerServiceLookup> void 
stop(ScheduledExecutorService scheduler,
+            T processContext, Callable<Boolean> activeThreadMonitorCallback);
 }

Reply via email to