NIFI-1464, Refactored Processor's life-cycle operation sequence * Simplified and cleaned StandardProcessScheduler.start/stopProcessor methods * Added stop/start operations to ProcessorNode. * Removed unnecessary synchronization blocks related to ScheduledState in favor of enforcing order and idempotency via CAS operations. Those synchronization blocks were causing intermittent deadlocks whenever @OnScheduled blocks indefinitely. * Added support for stopping the service when @OnScheduled operation hangs. * Fixed the order of life-cycle operation invocation ensuring that each operation can *only* be invoked at the appropriate time * Removed unnecessary locks from StandardProcessNode since Atomic variables are used. * Removed calls to @OnStopped from ContinuallyRunningProcessTask while ensuring that procesor's full shut down in implementation of StandardProcessorNode.stop() method. * Removed dead code * Added comprehensive tests suite that covers 95% of Processor's life-cycle operations within the scope of FlowController, StandardProcesssScheduler and StandardProcessNode * Improved and added javadocs on covered operations with detailed explanations.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0c5b1c27 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0c5b1c27 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0c5b1c27 Branch: refs/heads/master Commit: 0c5b1c27f26a008baedac4eb0c69f9b01080eaec Parents: f6705f2 Author: Oleg Zhurakousky <[email protected]> Authored: Thu Feb 4 16:23:45 2016 -0500 Committer: Mark Payne <[email protected]> Committed: Fri Mar 11 12:54:50 2016 -0500 ---------------------------------------------------------------------- .../apache/nifi/controller/ScheduledState.java | 6 +- .../org/apache/nifi/util/NiFiProperties.java | 2 + .../src/main/asciidoc/administration-guide.adoc | 1 + .../controller/AbstractConfiguredComponent.java | 7 +- .../apache/nifi/controller/ProcessorNode.java | 50 +- .../nifi/controller/StandardProcessorNode.java | 1135 ++++++++++-------- .../scheduling/AbstractSchedulingAgent.java | 103 ++ .../scheduling/EventDrivenSchedulingAgent.java | 10 +- .../scheduling/QuartzSchedulingAgent.java | 10 +- .../controller/scheduling/ScheduleState.java | 8 +- .../scheduling/StandardProcessScheduler.java | 218 +--- .../scheduling/TimerDrivenSchedulingAgent.java | 10 +- .../StandardControllerServiceProvider.java | 1 + .../tasks/ContinuallyRunProcessorTask.java | 12 - .../scheduling/TestProcessorLifecycle.java | 760 ++++++++++++ .../TestStandardControllerServiceProvider.java | 19 +- .../src/test/resources/state-management.xml | 38 + 17 files changed, 1654 insertions(+), 736 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0c5b1c27/nifi-api/src/main/java/org/apache/nifi/controller/ScheduledState.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/ScheduledState.java b/nifi-api/src/main/java/org/apache/nifi/controller/ScheduledState.java index d482eae..bf608db 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/ScheduledState.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/ScheduledState.java @@ -33,5 +33,9 @@ public enum ScheduledState { /** * Entity is currently scheduled to run */ - RUNNING; + RUNNING, + + STARTING, + + STOPPING; } http://git-wip-us.apache.org/repos/asf/nifi/blob/0c5b1c27/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index c432edf..2be8177 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -71,6 +71,7 @@ public class NiFiProperties extends Properties { public static final String ADMINISTRATIVE_YIELD_DURATION = "nifi.administrative.yield.duration"; public static final String PERSISTENT_STATE_DIRECTORY = "nifi.persistent.state.directory"; public static final String BORED_YIELD_DURATION = "nifi.bored.yield.duration"; + public static final String PROCESSOR_START_TIMEOUT = "nifi.processor.start.timeout"; // content repository properties public static final String REPOSITORY_CONTENT_PREFIX = "nifi.content.repository.directory."; @@ -539,6 +540,7 @@ public class NiFiProperties extends Properties { return shouldSupport; } + @SuppressWarnings("unchecked") public Set<String> getAnonymousAuthorities() { final Set<String> authorities; http://git-wip-us.apache.org/repos/asf/nifi/blob/0c5b1c27/nifi-docs/src/main/asciidoc/administration-guide.adoc ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index 682a92c..58fe205 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -1153,6 +1153,7 @@ nifi.nar.library.directory.lib2=/nars/lib2 + Providing three total locations, including _nifi.nar.library.directory_. |nifi.nar.working.directory|The location of the nar working directory. The default value is ./work/nar and probably should be left as is. |nifi.documentation.working.directory|The documentation working directory. The default value is ./work/docs/components and probably should be left as is. +|nifi.processor.start.timeout|Time (milliseconds) to wait for a Processors to start before other life-cycle operation (e.g., stop) could be invoked. Default is infinite. |==== http://git-wip-us.apache.org/repos/asf/nifi/blob/0c5b1c27/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java index 924e61e..65e59ee 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java @@ -44,7 +44,6 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone private final ConfigurableComponent component; private final ValidationContextFactory validationContextFactory; private final ControllerServiceProvider serviceProvider; - private final AtomicReference<String> name; private final AtomicReference<String> annotationData = new AtomicReference<>(); @@ -298,4 +297,10 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone public abstract void verifyModifiable() throws IllegalStateException; + /** + * + */ + ControllerServiceProvider getControllerServiceProvider() { + return this.serviceProvider; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0c5b1c27/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index d340c77..d7dbe24 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -18,6 +18,8 @@ package org.apache.nifi.controller; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -25,6 +27,7 @@ import org.apache.nifi.connectable.Connectable; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.scheduling.SchedulingStrategy; @@ -54,8 +57,6 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen @Override public abstract boolean isValid(); - public abstract void setScheduledState(ScheduledState scheduledState); - public abstract void setBulletinLevel(LogLevel bulletinLevel); public abstract LogLevel getBulletinLevel(); @@ -99,4 +100,49 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen */ public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences); + /** + * Will start the {@link Processor} represented by this + * {@link ProcessorNode}. Starting processor typically means invoking it's + * operation that is annotated with @OnScheduled and then executing a + * callback provided by the {@link ProcessScheduler} to which typically + * initiates + * {@link Processor#onTrigger(ProcessContext, org.apache.nifi.processor.ProcessSessionFactory)} + * cycle. + * + * @param scheduler + * implementation of {@link ScheduledExecutorService} used to + * initiate processor <i>start</i> task + * @param administrativeYieldMillis + * the amount of milliseconds to wait for administrative yield + * @param processContext + * the instance of {@link ProcessContext} and + * {@link ControllerServiceLookup} + * @param schedulingAgentCallback + * the callback provided by the {@link ProcessScheduler} to + * execute upon successful start of the Processor + */ + public abstract <T extends ProcessContext & ControllerServiceLookup> void start(ScheduledExecutorService scheduler, + long administrativeYieldMillis, T processContext, Runnable schedulingAgentCallback); + + /** + * Will stop the {@link Processor} represented by this {@link ProcessorNode}. + * Stopping processor typically means invoking it's operation that is + * annotated with @OnUnschedule and then @OnStopped. + * + * @param scheduler + * implementation of {@link ScheduledExecutorService} used to + * initiate processor <i>stop</i> task + * @param processContext + * the instance of {@link ProcessContext} and + * {@link ControllerServiceLookup} + * @param activeThreadMonitorCallback + * the callback provided by the {@link ProcessScheduler} to + * report the count of processor's active threads. Typically it + * is used to ensure that operations annotated with @OnUnschedule + * and then @OnStopped are not invoked until such count reaches + * 0, essentially allowing tasks to finish before bringing + * processor to a halt. + */ + public abstract <T extends ProcessContext & ControllerServiceLookup> void stop(ScheduledExecutorService scheduler, + T processContext, Callable<Boolean> activeThreadMonitorCallback); }
