http://git-wip-us.apache.org/repos/asf/nifi/blob/0c5b1c27/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index f57343e..3e07995 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -18,6 +18,7 @@ package org.apache.nifi.controller;
 
 import static java.util.Objects.requireNonNull;
 
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -27,13 +28,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
@@ -46,6 +50,9 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.connectable.Connectable;
@@ -57,23 +64,34 @@ import 
org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.logging.LogRepositoryFactory;
+import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.SchedulingContext;
+import org.apache.nifi.processor.SimpleProcessLogger;
+import org.apache.nifi.processor.StandardSchedulingContext;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.ReflectionUtils;
 import org.quartz.CronExpression;
+import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * ProcessorNode provides thread-safe access to a FlowFileProcessor as it 
exists within a controlled flow. This node keeps track of the processor, its 
scheduling information and its relationships to
- * other processors and whatever scheduled futures exist for it. Must be 
thread safe.
+ * ProcessorNode provides thread-safe access to a FlowFileProcessor as it 
exists
+ * within a controlled flow. This node keeps track of the processor, its
+ * scheduling information and its relationships to other processors and 
whatever
+ * scheduled futures exist for it. Must be thread safe.
  *
  */
 public class StandardProcessorNode extends ProcessorNode implements 
Connectable {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(StandardProcessorNode.class);
+
     public static final String BULLETIN_OBSERVER_ID = "bulletin-observer";
 
     public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
@@ -86,16 +104,16 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     private final Map<Relationship, Set<Connection>> connections;
     private final AtomicReference<Set<Relationship>> 
undefinedRelationshipsToTerminate;
     private final AtomicReference<List<Connection>> incomingConnectionsRef;
-    private final ReentrantReadWriteLock rwLock;
-    private final Lock readLock;
-    private final Lock writeLock;
     private final AtomicBoolean isolated;
     private final AtomicBoolean lossTolerant;
     private final AtomicReference<ScheduledState> scheduledState;
     private final AtomicReference<String> comments;
     private final AtomicReference<Position> position;
     private final AtomicReference<String> annotationData;
-    private final AtomicReference<String> schedulingPeriod; // stored as 
string so it's presented to user as they entered it
+    private final AtomicReference<String> schedulingPeriod; // stored as string
+                                                            // so it's 
presented
+                                                            // to user as they
+                                                            // entered it
     private final AtomicReference<String> yieldPeriod;
     private final AtomicReference<String> penalizationPeriod;
     private final AtomicReference<Map<String, String>> style;
@@ -114,10 +132,12 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     private long runNanos = 0L;
 
     private SchedulingStrategy schedulingStrategy; // guarded by read/write 
lock
+                                                   // ??????? NOT any more
 
     @SuppressWarnings("deprecation")
-    public StandardProcessorNode(final Processor processor, final String uuid, 
final ValidationContextFactory validationContextFactory,
-        final ProcessScheduler scheduler, final ControllerServiceProvider 
controllerServiceProvider) {
+    public StandardProcessorNode(final Processor processor, final String uuid,
+            final ValidationContextFactory validationContextFactory, final 
ProcessScheduler scheduler,
+            final ControllerServiceProvider controllerServiceProvider) {
         super(processor, uuid, validationContextFactory, 
controllerServiceProvider);
 
         this.processor = processor;
@@ -126,9 +146,6 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         connections = new HashMap<>();
         incomingConnectionsRef = new AtomicReference<List<Connection>>(new 
ArrayList<Connection>());
         scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
-        rwLock = new ReentrantReadWriteLock(false);
-        readLock = rwLock.readLock();
-        writeLock = rwLock.writeLock();
         lossTolerant = new AtomicBoolean(false);
         final Set<Relationship> emptySetOfRelationships = new HashSet<>();
         undefinedRelationshipsToTerminate = new 
AtomicReference<>(emptySetOfRelationships);
@@ -147,15 +164,21 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         penalizationPeriod = new 
AtomicReference<>(DEFAULT_PENALIZATION_PERIOD);
 
         final Class<?> procClass = processor.getClass();
-        triggerWhenEmpty = 
procClass.isAnnotationPresent(TriggerWhenEmpty.class) || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenEmpty.class);
-        sideEffectFree = procClass.isAnnotationPresent(SideEffectFree.class) 
|| 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SideEffectFree.class);
-        batchSupported = procClass.isAnnotationPresent(SupportsBatching.class) 
|| 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SupportsBatching.class);
-        triggeredSerially = 
procClass.isAnnotationPresent(TriggerSerially.class) || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerSerially.class);
+        triggerWhenEmpty = 
procClass.isAnnotationPresent(TriggerWhenEmpty.class)
+                || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenEmpty.class);
+        sideEffectFree = procClass.isAnnotationPresent(SideEffectFree.class)
+                || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SideEffectFree.class);
+        batchSupported = procClass.isAnnotationPresent(SupportsBatching.class)
+                || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SupportsBatching.class);
+        triggeredSerially = 
procClass.isAnnotationPresent(TriggerSerially.class)
+                || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerSerially.class);
         triggerWhenAnyDestinationAvailable = 
procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class)
-            || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable.class);
+                || procClass.isAnnotationPresent(
+                        
org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable.class);
         this.validationContextFactory = validationContextFactory;
         eventDrivenSupported = 
(procClass.isAnnotationPresent(EventDriven.class)
-            || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.EventDriven.class))
 && !triggeredSerially && !triggerWhenEmpty;
+                || 
procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.EventDriven.class))
+                && !triggeredSerially && !triggerWhenEmpty;
 
         final boolean inputRequirementPresent = 
procClass.isAnnotationPresent(InputRequirement.class);
         if (inputRequirementPresent) {
@@ -176,26 +199,23 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     /**
-     * Provides and opportunity to retain information about this particular 
processor instance
+     * Provides and opportunity to retain information about this particular
+     * processor instance
      *
-     * @param comments new comments
+     * @param comments
+     *            new comments
      */
     @Override
     public void setComments(final String comments) {
-        writeLock.lock();
-        try {
-            if (isRunning()) {
-                throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
-            }
-            this.comments.set(comments);
-        } finally {
-            writeLock.unlock();
+        if (isRunning()) {
+            throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
         }
+        this.comments.set(comments);
     }
 
     @Override
     public ScheduledState getScheduledState() {
-        return scheduledState.get();
+        return this.scheduledState.get();
     }
 
     @Override
@@ -226,7 +246,8 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     /**
-     * @return if true flow file content generated by this processor is 
considered loss tolerant
+     * @return if true flow file content generated by this processor is
+     *         considered loss tolerant
      */
     @Override
     public boolean isLossTolerant() {
@@ -239,7 +260,8 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     /**
-     * @return true if the processor has the {@link TriggerWhenEmpty} 
annotation, false otherwise.
+     * @return true if the processor has the {@link TriggerWhenEmpty}
+     *         annotation, false otherwise.
      */
     @Override
     public boolean isTriggerWhenEmpty() {
@@ -247,7 +269,8 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     /**
-     * @return true if the processor has the {@link SideEffectFree} 
annotation, false otherwise.
+     * @return true if the processor has the {@link SideEffectFree} annotation,
+     *         false otherwise.
      */
     @Override
     public boolean isSideEffectFree() {
@@ -260,7 +283,9 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     /**
-     * @return true if the processor has the {@link 
TriggerWhenAnyDestinationAvailable} annotation, false otherwise.
+     * @return true if the processor has the
+     *         {@link TriggerWhenAnyDestinationAvailable} annotation, false
+     *         otherwise.
      */
     @Override
     public boolean isTriggerWhenAnyDestinationAvailable() {
@@ -268,38 +293,31 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     /**
-     * Indicates whether flow file content made by this processor must be 
persisted
+     * Indicates whether flow file content made by this processor must be
+     * persisted
      *
-     * @param lossTolerant tolerant
+     * @param lossTolerant
+     *            tolerant
      */
     @Override
     public void setLossTolerant(final boolean lossTolerant) {
-        writeLock.lock();
-        try {
-            if (isRunning()) {
-                throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
-            }
-            this.lossTolerant.set(lossTolerant);
-        } finally {
-            writeLock.unlock();
+        if (isRunning()) {
+            throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
         }
+        this.lossTolerant.set(lossTolerant);
     }
 
     /**
      * Indicates whether the processor runs on only the primary node.
      *
-     * @param isolated isolated
+     * @param isolated
+     *            isolated
      */
     public void setIsolated(final boolean isolated) {
-        writeLock.lock();
-        try {
-            if (isRunning()) {
-                throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
-            }
-            this.isolated.set(isolated);
-        } finally {
-            writeLock.unlock();
+        if (isRunning()) {
+            throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
         }
+        this.isolated.set(isolated);
     }
 
     @Override
@@ -308,33 +326,28 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
             return true;
         }
         final Set<Relationship> terminatable = 
undefinedRelationshipsToTerminate.get();
-        if (terminatable == null) {
-            return false;
-        }
-        return terminatable.contains(relationship);
+        return terminatable == null ? false : 
terminatable.contains(relationship);
     }
 
     @Override
     public void setAutoTerminatedRelationships(final Set<Relationship> 
terminate) {
-        writeLock.lock();
-        try {
-            if (isRunning()) {
-                throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
-            }
+        if (isRunning()) {
+            throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
+        }
 
-            for (final Relationship rel : terminate) {
-                if (!getConnections(rel).isEmpty()) {
-                    throw new IllegalStateException("Cannot mark relationship 
'" + rel.getName() + "' as auto-terminated because Connection already exists 
with this relationship");
-                }
+        for (final Relationship rel : terminate) {
+            if (!getConnections(rel).isEmpty()) {
+                throw new IllegalStateException("Cannot mark relationship '" + 
rel.getName()
+                        + "' as auto-terminated because Connection already 
exists with this relationship");
             }
-            undefinedRelationshipsToTerminate.set(new HashSet<>(terminate));
-        } finally {
-            writeLock.unlock();
         }
+        undefinedRelationshipsToTerminate.set(new HashSet<>(terminate));
     }
 
     /**
-     * @return an unmodifiable Set that contains all of the 
ProcessorRelationship objects that are configured to be auto-terminated
+     * @return an unmodifiable Set that contains all of the
+     *         ProcessorRelationship objects that are configured to be
+     *         auto-terminated
      */
     @Override
     public Set<Relationship> getAutoTerminatedRelationships() {
@@ -346,7 +359,8 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     /**
-     * @return the value of the processor's {@link CapabilityDescription} 
annotation, if one exists, else <code>null</code>.
+     * @return the value of the processor's {@link CapabilityDescription}
+     *         annotation, if one exists, else <code>null</code>.
      */
     @SuppressWarnings("deprecation")
     public String getProcessorDescription() {
@@ -355,7 +369,8 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         if (capDesc != null) {
             description = capDesc.value();
         } else {
-            final org.apache.nifi.processor.annotation.CapabilityDescription 
deprecatedCapDesc = 
processor.getClass().getAnnotation(org.apache.nifi.processor.annotation.CapabilityDescription.class);
+            final org.apache.nifi.processor.annotation.CapabilityDescription 
deprecatedCapDesc = processor.getClass()
+                    
.getAnnotation(org.apache.nifi.processor.annotation.CapabilityDescription.class);
             if (deprecatedCapDesc != null) {
                 description = deprecatedCapDesc.value();
             }
@@ -366,20 +381,19 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
     @Override
     public void setName(final String name) {
-        writeLock.lock();
-        try {
-            if (isRunning()) {
-                throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
-            }
-            super.setName(name);
-        } finally {
-            writeLock.unlock();
+        if (isRunning()) {
+            throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
         }
+        super.setName(name);
     }
 
     /**
-     * @param timeUnit determines the unit of time to represent the scheduling 
period. If null will be reported in units of {@link 
#DEFAULT_SCHEDULING_TIME_UNIT}
-     * @return the schedule period that should elapse before subsequent cycles 
of this processor's tasks
+     * @param timeUnit
+     *            determines the unit of time to represent the scheduling
+     *            period. If null will be reported in units of
+     *            {@link #DEFAULT_SCHEDULING_TIME_UNIT}
+     * @return the schedule period that should elapse before subsequent cycles
+     *         of this processor's tasks
      */
     @Override
     public long getSchedulingPeriod(final TimeUnit timeUnit) {
@@ -388,37 +402,32 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
     @Override
     public boolean isEventDrivenSupported() {
-        readLock.lock();
-        try {
-            return this.eventDrivenSupported;
-        } finally {
-            readLock.unlock();
-        }
+        return this.eventDrivenSupported;
     }
 
     /**
      * Updates the Scheduling Strategy used for this Processor
      *
-     * @param schedulingStrategy strategy
+     * @param schedulingStrategy
+     *            strategy
      *
-     * @throws IllegalArgumentException if the SchedulingStrategy is not not 
applicable for this Processor
+     * @throws IllegalArgumentException
+     *             if the SchedulingStrategy is not not applicable for this
+     *             Processor
      */
     @Override
     public void setSchedulingStrategy(final SchedulingStrategy 
schedulingStrategy) {
-        writeLock.lock();
-        try {
-            if (schedulingStrategy == SchedulingStrategy.EVENT_DRIVEN && 
!eventDrivenSupported) {
-                // not valid. Just ignore it. We don't throw an Exception 
because if a developer changes a Processor so that
-                // it no longer supports EventDriven mode, we don't want the 
app to fail to startup if it was already in Event-Driven
-                // Mode. Instead, we will simply leave it in Timer-Driven mode
-                return;
-            }
-
-            this.schedulingStrategy = schedulingStrategy;
-            setIsolated(schedulingStrategy == 
SchedulingStrategy.PRIMARY_NODE_ONLY);
-        } finally {
-            writeLock.unlock();
+        if (schedulingStrategy == SchedulingStrategy.EVENT_DRIVEN && 
!eventDrivenSupported) {
+            // not valid. Just ignore it. We don't throw an Exception because 
if
+            // a developer changes a Processor so that
+            // it no longer supports EventDriven mode, we don't want the app to
+            // fail to startup if it was already in Event-Driven
+            // Mode. Instead, we will simply leave it in Timer-Driven mode
+            return;
         }
+
+        this.schedulingStrategy = schedulingStrategy;
+        setIsolated(schedulingStrategy == 
SchedulingStrategy.PRIMARY_NODE_ONLY);
     }
 
     /**
@@ -426,12 +435,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
      */
     @Override
     public SchedulingStrategy getSchedulingStrategy() {
-        readLock.lock();
-        try {
-            return this.schedulingStrategy;
-        } finally {
-            readLock.unlock();
-        }
+        return this.schedulingStrategy;
     }
 
     @Override
@@ -441,63 +445,51 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
     @Override
     public void setScheduldingPeriod(final String schedulingPeriod) {
-        writeLock.lock();
-        try {
-            if (isRunning()) {
-                throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
-            }
+        if (isRunning()) {
+            throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
+        }
 
-            switch (schedulingStrategy) {
-                case CRON_DRIVEN: {
-                    try {
-                        new CronExpression(schedulingPeriod);
-                    } catch (final Exception e) {
-                        throw new IllegalArgumentException("Scheduling Period 
is not a valid cron expression: " + schedulingPeriod);
-                    }
-                }
-                    break;
-                case PRIMARY_NODE_ONLY:
-                case TIMER_DRIVEN: {
-                    final long schedulingNanos = 
FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), 
TimeUnit.NANOSECONDS);
-                    if (schedulingNanos < 0) {
-                        throw new IllegalArgumentException("Scheduling Period 
must be positive");
-                    }
-                    
this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos));
-                }
-                    break;
-                case EVENT_DRIVEN:
-                default:
-                    return;
+        switch (schedulingStrategy) {
+        case CRON_DRIVEN: {
+            try {
+                new CronExpression(schedulingPeriod);
+            } catch (final Exception e) {
+                throw new IllegalArgumentException(
+                        "Scheduling Period is not a valid cron expression: " + 
schedulingPeriod);
             }
-
-            this.schedulingPeriod.set(schedulingPeriod);
-        } finally {
-            writeLock.unlock();
         }
+            break;
+        case PRIMARY_NODE_ONLY:
+        case TIMER_DRIVEN: {
+            final long schedulingNanos = 
FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod),
+                    TimeUnit.NANOSECONDS);
+            if (schedulingNanos < 0) {
+                throw new IllegalArgumentException("Scheduling Period must be 
positive");
+            }
+            this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, 
schedulingNanos));
+        }
+            break;
+        case EVENT_DRIVEN:
+        default:
+            return;
+        }
+
+        this.schedulingPeriod.set(schedulingPeriod);
     }
 
     @Override
     public long getRunDuration(final TimeUnit timeUnit) {
-        readLock.lock();
-        try {
-            return timeUnit.convert(this.runNanos, TimeUnit.NANOSECONDS);
-        } finally {
-            readLock.unlock();
-        }
+        return timeUnit.convert(this.runNanos, TimeUnit.NANOSECONDS);
     }
 
     @Override
     public void setRunDuration(final long duration, final TimeUnit timeUnit) {
-        writeLock.lock();
-        try {
-            if (duration < 0) {
-                throw new IllegalArgumentException("Run Duration must be 
non-negative value; cannot set to " + timeUnit.toSeconds(duration) + " 
seconds");
-            }
-
-            this.runNanos = timeUnit.toNanos(duration);
-        } finally {
-            writeLock.unlock();
+        if (duration < 0) {
+            throw new IllegalArgumentException("Run Duration must be 
non-negative value; cannot set to "
+                    + timeUnit.toSeconds(duration) + " seconds");
         }
+
+        this.runNanos = timeUnit.toNanos(duration);
     }
 
     @Override
@@ -512,32 +504,32 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
     @Override
     public void setYieldPeriod(final String yieldPeriod) {
-        writeLock.lock();
-        try {
-            if (isRunning()) {
-                throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
-            }
-            final long yieldMillis = 
FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS);
-            if (yieldMillis < 0) {
-                throw new IllegalArgumentException("Yield duration must be 
positive");
-            }
-            this.yieldPeriod.set(yieldPeriod);
-        } finally {
-            writeLock.unlock();
+        if (isRunning()) {
+            throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
         }
+        final long yieldMillis = 
FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS);
+        if (yieldMillis < 0) {
+            throw new IllegalArgumentException("Yield duration must be 
positive");
+        }
+        this.yieldPeriod.set(yieldPeriod);
     }
 
     /**
-     * Causes the processor not to be scheduled for some period of time. This 
duration can be obtained and set via the {@link #getYieldPeriod(TimeUnit)} and 
{@link #setYieldPeriod(long, TimeUnit)}
-     * methods.
+     * Causes the processor not to be scheduled for some period of time. This
+     * duration can be obtained and set via the
+     * {@link #getYieldPeriod(TimeUnit)} and
+     * {@link #setYieldPeriod(long, TimeUnit)} methods.
      */
     @Override
     public void yield() {
         final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS);
         yield(yieldMillis, TimeUnit.MILLISECONDS);
 
-        final String yieldDuration = (yieldMillis > 1000) ? (yieldMillis / 
1000) + " seconds" : yieldMillis + " milliseconds";
-        LoggerFactory.getLogger(processor.getClass()).debug("{} has chosen to 
yield its resources; will not be scheduled to run again for {}", processor, 
yieldDuration);
+        final String yieldDuration = (yieldMillis > 1000) ? (yieldMillis / 
1000) + " seconds"
+                : yieldMillis + " milliseconds";
+        LoggerFactory.getLogger(processor.getClass()).debug(
+                "{} has chosen to yield its resources; will not be scheduled 
to run again for {}", processor,
+                yieldDuration);
     }
 
     @Override
@@ -549,7 +541,8 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     /**
-     * @return the number of milliseconds since Epoch at which time this 
processor is to once again be scheduled.
+     * @return the number of milliseconds since Epoch at which time this
+     *         processor is to once again be scheduled.
      */
     @Override
     public long getYieldExpiration() {
@@ -568,42 +561,36 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
     @Override
     public void setPenalizationPeriod(final String penalizationPeriod) {
-        writeLock.lock();
-        try {
-            if (isRunning()) {
-                throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
-            }
-            final long penalizationMillis = 
FormatUtils.getTimeDuration(requireNonNull(penalizationPeriod), 
TimeUnit.MILLISECONDS);
-            if (penalizationMillis < 0) {
-                throw new IllegalArgumentException("Penalization duration must 
be positive");
-            }
-            this.penalizationPeriod.set(penalizationPeriod);
-        } finally {
-            writeLock.unlock();
+        if (isRunning()) {
+            throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
+        }
+        final long penalizationMillis = 
FormatUtils.getTimeDuration(requireNonNull(penalizationPeriod),
+                TimeUnit.MILLISECONDS);
+        if (penalizationMillis < 0) {
+            throw new IllegalArgumentException("Penalization duration must be 
positive");
         }
+        this.penalizationPeriod.set(penalizationPeriod);
     }
 
     /**
-     * Determines the number of concurrent tasks that may be running for this 
processor.
+     * Determines the number of concurrent tasks that may be running for this
+     * processor.
      *
-     * @param taskCount a number of concurrent tasks this processor may have 
running
-     * @throws IllegalArgumentException if the given value is less than 1
+     * @param taskCount
+     *            a number of concurrent tasks this processor may have running
+     * @throws IllegalArgumentException
+     *             if the given value is less than 1
      */
     @Override
     public void setMaxConcurrentTasks(final int taskCount) {
-        writeLock.lock();
-        try {
-            if (isRunning()) {
-                throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
-            }
-            if (taskCount < 1 && getSchedulingStrategy() != 
SchedulingStrategy.EVENT_DRIVEN) {
-                throw new IllegalArgumentException();
-            }
-            if (!triggeredSerially) {
-                concurrentTaskCount.set(taskCount);
-            }
-        } finally {
-            writeLock.unlock();
+        if (isRunning()) {
+            throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
+        }
+        if (taskCount < 1 && getSchedulingStrategy() != 
SchedulingStrategy.EVENT_DRIVEN) {
+            throw new IllegalArgumentException();
+        }
+        if (!triggeredSerially) {
+            concurrentTaskCount.set(taskCount);
         }
     }
 
@@ -613,7 +600,8 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     /**
-     * @return the number of tasks that may execute concurrently for this 
processor
+     * @return the number of tasks that may execute concurrently for this
+     *         processor
      */
     @Override
     public int getMaxConcurrentTasks() {
@@ -633,13 +621,8 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     @Override
     public Set<Connection> getConnections() {
         final Set<Connection> allConnections = new HashSet<>();
-        readLock.lock();
-        try {
-            for (final Set<Connection> connectionSet : connections.values()) {
-                allConnections.addAll(connectionSet);
-            }
-        } finally {
-            readLock.unlock();
+        for (final Set<Connection> connectionSet : connections.values()) {
+            allConnections.addAll(connectionSet);
         }
 
         return allConnections;
@@ -652,14 +635,9 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
     @Override
     public Set<Connection> getConnections(final Relationship relationship) {
-        final Set<Connection> applicableConnections;
-        readLock.lock();
-        try {
-            applicableConnections = connections.get(relationship);
-        } finally {
-            readLock.unlock();
-        }
-        return (applicableConnections == null) ? Collections.<Connection> 
emptySet() : Collections.unmodifiableSet(applicableConnections);
+        Set<Connection> applicableConnections = connections.get(relationship);
+        return (applicableConnections == null) ? Collections.<Connection> 
emptySet()
+                : Collections.unmodifiableSet(applicableConnections);
     }
 
     @Override
@@ -667,52 +645,52 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         Objects.requireNonNull(connection, "connection cannot be null");
 
         if (!connection.getSource().equals(this) && 
!connection.getDestination().equals(this)) {
-            throw new IllegalStateException("Cannot a connection to a 
ProcessorNode for which the ProcessorNode is neither the Source nor the 
Destination");
+            throw new IllegalStateException(
+                    "Cannot a connection to a ProcessorNode for which the 
ProcessorNode is neither the Source nor the Destination");
         }
 
-        writeLock.lock();
-        try {
-            List<Connection> updatedIncoming = null;
-            if (connection.getDestination().equals(this)) {
-                // don't add the connection twice. This may occur if we have a 
self-loop because we will be told
-                // to add the connection once because we are the source and 
again because we are the destination.
-                final List<Connection> incomingConnections = 
incomingConnectionsRef.get();
-                updatedIncoming = new ArrayList<>(incomingConnections);
-                if (!updatedIncoming.contains(connection)) {
-                    updatedIncoming.add(connection);
-                }
+        List<Connection> updatedIncoming = null;
+        if (connection.getDestination().equals(this)) {
+            // don't add the connection twice. This may occur if we have a
+            // self-loop because we will be told
+            // to add the connection once because we are the source and again
+            // because we are the destination.
+            final List<Connection> incomingConnections = 
incomingConnectionsRef.get();
+            updatedIncoming = new ArrayList<>(incomingConnections);
+            if (!updatedIncoming.contains(connection)) {
+                updatedIncoming.add(connection);
             }
+        }
 
-            if (connection.getSource().equals(this)) {
-                // don't add the connection twice. This may occur if we have a 
self-loop because we will be told
-                // to add the connection once because we are the source and 
again because we are the destination.
-                if (!destinations.containsKey(connection)) {
-                    for (final Relationship relationship : 
connection.getRelationships()) {
-                        final Relationship rel = 
getRelationship(relationship.getName());
-                        Set<Connection> set = connections.get(rel);
-                        if (set == null) {
-                            set = new HashSet<>();
-                            connections.put(rel, set);
-                        }
+        if (connection.getSource().equals(this)) {
+            // don't add the connection twice. This may occur if we have a
+            // self-loop because we will be told
+            // to add the connection once because we are the source and again
+            // because we are the destination.
+            if (!destinations.containsKey(connection)) {
+                for (final Relationship relationship : 
connection.getRelationships()) {
+                    final Relationship rel = 
getRelationship(relationship.getName());
+                    Set<Connection> set = connections.get(rel);
+                    if (set == null) {
+                        set = new HashSet<>();
+                        connections.put(rel, set);
+                    }
 
-                        set.add(connection);
+                    set.add(connection);
 
-                        destinations.put(connection, 
connection.getDestination());
-                    }
+                    destinations.put(connection, connection.getDestination());
+                }
 
-                    final Set<Relationship> autoTerminated = 
this.undefinedRelationshipsToTerminate.get();
-                    if (autoTerminated != null) {
-                        
autoTerminated.removeAll(connection.getRelationships());
-                        
this.undefinedRelationshipsToTerminate.set(autoTerminated);
-                    }
+                final Set<Relationship> autoTerminated = 
this.undefinedRelationshipsToTerminate.get();
+                if (autoTerminated != null) {
+                    autoTerminated.removeAll(connection.getRelationships());
+                    this.undefinedRelationshipsToTerminate.set(autoTerminated);
                 }
             }
+        }
 
-            if (updatedIncoming != null) {
-                
incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
-            }
-        } finally {
-            writeLock.unlock();
+        if (updatedIncoming != null) {
+            
incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
         }
     }
 
@@ -724,74 +702,68 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     @Override
     public void updateConnection(final Connection connection) throws 
IllegalStateException {
         if (requireNonNull(connection).getSource().equals(this)) {
-            writeLock.lock();
-            try {
-                //
-                // update any relationships
-                //
-                // first check if any relations were removed.
-                final List<Relationship> existingRelationships = new 
ArrayList<>();
-                for (final Map.Entry<Relationship, Set<Connection>> entry : 
connections.entrySet()) {
-                    if (entry.getValue().contains(connection)) {
-                        existingRelationships.add(entry.getKey());
-                    }
+            // update any relationships
+            //
+            // first check if any relations were removed.
+            final List<Relationship> existingRelationships = new ArrayList<>();
+            for (final Map.Entry<Relationship, Set<Connection>> entry : 
connections.entrySet()) {
+                if (entry.getValue().contains(connection)) {
+                    existingRelationships.add(entry.getKey());
                 }
+            }
 
-                for (final Relationship rel : connection.getRelationships()) {
-                    if (!existingRelationships.contains(rel)) {
-                        // relationship was removed. Check if this is legal.
-                        final Set<Connection> connectionsForRelationship = 
getConnections(rel);
-                        if (connectionsForRelationship != null && 
connectionsForRelationship.size() == 1 && this.isRunning() && 
!isAutoTerminated(rel) && getRelationships().contains(rel)) {
-                            // if we are running and we do not terminate 
undefined relationships and this is the only
-                            // connection that defines the given relationship, 
and that relationship is required,
-                            // then it is not legal to remove this 
relationship from this connection.
-                            throw new IllegalStateException("Cannot remove 
relationship " + rel.getName() + " from Connection because doing so would 
invalidate Processor "
-                                + this + ", which is currently running");
-                        }
+            for (final Relationship rel : connection.getRelationships()) {
+                if (!existingRelationships.contains(rel)) {
+                    // relationship was removed. Check if this is legal.
+                    final Set<Connection> connectionsForRelationship = 
getConnections(rel);
+                    if (connectionsForRelationship != null && 
connectionsForRelationship.size() == 1 && this.isRunning()
+                            && !isAutoTerminated(rel) && 
getRelationships().contains(rel)) {
+                        // if we are running and we do not terminate undefined
+                        // relationships and this is the only
+                        // connection that defines the given relationship, and
+                        // that relationship is required,
+                        // then it is not legal to remove this relationship 
from
+                        // this connection.
+                        throw new IllegalStateException("Cannot remove 
relationship " + rel.getName()
+                                + " from Connection because doing so would 
invalidate Processor " + this
+                                + ", which is currently running");
                     }
                 }
+            }
 
-                // remove the connection from any list that currently contains
-                for (final Set<Connection> list : connections.values()) {
-                    list.remove(connection);
-                }
+            // remove the connection from any list that currently contains
+            for (final Set<Connection> list : connections.values()) {
+                list.remove(connection);
+            }
 
-                // add the connection in for all relationships listed.
-                for (final Relationship rel : connection.getRelationships()) {
-                    Set<Connection> set = connections.get(rel);
-                    if (set == null) {
-                        set = new HashSet<>();
-                        connections.put(rel, set);
-                    }
-                    set.add(connection);
+            // add the connection in for all relationships listed.
+            for (final Relationship rel : connection.getRelationships()) {
+                Set<Connection> set = connections.get(rel);
+                if (set == null) {
+                    set = new HashSet<>();
+                    connections.put(rel, set);
                 }
+                set.add(connection);
+            }
 
-                // update to the new destination
-                destinations.put(connection, connection.getDestination());
+            // update to the new destination
+            destinations.put(connection, connection.getDestination());
 
-                final Set<Relationship> autoTerminated = 
this.undefinedRelationshipsToTerminate.get();
-                if (autoTerminated != null) {
-                    autoTerminated.removeAll(connection.getRelationships());
-                    this.undefinedRelationshipsToTerminate.set(autoTerminated);
-                }
-            } finally {
-                writeLock.unlock();
+            final Set<Relationship> autoTerminated = 
this.undefinedRelationshipsToTerminate.get();
+            if (autoTerminated != null) {
+                autoTerminated.removeAll(connection.getRelationships());
+                this.undefinedRelationshipsToTerminate.set(autoTerminated);
             }
         }
 
         if (connection.getDestination().equals(this)) {
-            writeLock.lock();
-            try {
-                // update our incoming connections -- we can just remove & 
re-add the connection to
-                // update the list.
-                final List<Connection> incomingConnections = 
incomingConnectionsRef.get();
-                final List<Connection> updatedIncoming = new 
ArrayList<>(incomingConnections);
-                updatedIncoming.remove(connection);
-                updatedIncoming.add(connection);
-                
incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
-            } finally {
-                writeLock.unlock();
-            }
+            // update our incoming connections -- we can just remove & re-add
+            // the connection to update the list.
+            final List<Connection> incomingConnections = 
incomingConnectionsRef.get();
+            final List<Connection> updatedIncoming = new 
ArrayList<>(incomingConnections);
+            updatedIncoming.remove(connection);
+            updatedIncoming.add(connection);
+            
incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
         }
     }
 
@@ -803,45 +775,39 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
             for (final Relationship relationship : 
connection.getRelationships()) {
                 final Set<Connection> connectionsForRelationship = 
getConnections(relationship);
                 if ((connectionsForRelationship == null || 
connectionsForRelationship.size() <= 1) && isRunning()) {
-                    throw new IllegalStateException("This connection cannot be 
removed because its source is running and removing it will invalidate this 
processor");
+                    throw new IllegalStateException(
+                            "This connection cannot be removed because its 
source is running and removing it will invalidate this processor");
                 }
             }
 
-            writeLock.lock();
-            try {
-                for (final Set<Connection> connectionList : 
this.connections.values()) {
-                    connectionList.remove(connection);
-                }
-
-                connectionRemoved = (destinations.remove(connection) != null);
-            } finally {
-                writeLock.unlock();
+            for (final Set<Connection> connectionList : 
this.connections.values()) {
+                connectionList.remove(connection);
             }
+
+            connectionRemoved = (destinations.remove(connection) != null);
         }
 
         if (connection.getDestination().equals(this)) {
-            writeLock.lock();
-            try {
-                final List<Connection> incomingConnections = 
incomingConnectionsRef.get();
-                if (incomingConnections.contains(connection)) {
-                    final List<Connection> updatedIncoming = new 
ArrayList<>(incomingConnections);
-                    updatedIncoming.remove(connection);
-                    
incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
-                    return;
-                }
-            } finally {
-                writeLock.unlock();
+            final List<Connection> incomingConnections = 
incomingConnectionsRef.get();
+            if (incomingConnections.contains(connection)) {
+                final List<Connection> updatedIncoming = new 
ArrayList<>(incomingConnections);
+                updatedIncoming.remove(connection);
+                
incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
+                return;
             }
         }
 
         if (!connectionRemoved) {
-            throw new IllegalArgumentException("Cannot remove a connection 
from a ProcessorNode for which the ProcessorNode is not the Source");
+            throw new IllegalArgumentException(
+                    "Cannot remove a connection from a ProcessorNode for which 
the ProcessorNode is not the Source");
         }
     }
 
     /**
-     * @param relationshipName name
-     * @return the relationship for this nodes processor for the given name or 
creates a new relationship for the given name
+     * @param relationshipName
+     *            name
+     * @return the relationship for this nodes processor for the given name or
+     *         creates a new relationship for the given name
      */
     @Override
     public Relationship getRelationship(final String relationshipName) {
@@ -868,59 +834,45 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     /**
-     * @return the Set of destination processors for all relationships 
excluding any destinations that are this processor itself (self-loops)
+     * @return the Set of destination processors for all relationships 
excluding
+     *         any destinations that are this processor itself (self-loops)
      */
     public Set<Connectable> getDestinations() {
         final Set<Connectable> nonSelfDestinations = new HashSet<>();
-        readLock.lock();
-        try {
-            for (final Connectable connectable : destinations.values()) {
-                if (connectable != this) {
-                    nonSelfDestinations.add(connectable);
-                }
+        for (final Connectable connectable : destinations.values()) {
+            if (connectable != this) {
+                nonSelfDestinations.add(connectable);
             }
-        } finally {
-            readLock.unlock();
         }
         return nonSelfDestinations;
     }
 
     public Set<Connectable> getDestinations(final Relationship relationship) {
-        readLock.lock();
-        try {
-            final Set<Connectable> destinationSet = new HashSet<>();
-            final Set<Connection> relationshipConnections = 
connections.get(relationship);
-            if (relationshipConnections != null) {
-                for (final Connection connection : relationshipConnections) {
-                    destinationSet.add(destinations.get(connection));
-                }
+        final Set<Connectable> destinationSet = new HashSet<>();
+        final Set<Connection> relationshipConnections = 
connections.get(relationship);
+        if (relationshipConnections != null) {
+            for (final Connection connection : relationshipConnections) {
+                destinationSet.add(destinations.get(connection));
             }
-            return destinationSet;
-        } finally {
-            readLock.unlock();
         }
+        return destinationSet;
     }
 
     public Set<Relationship> getUndefinedRelationships() {
         final Set<Relationship> undefined = new HashSet<>();
-        readLock.lock();
-        try {
-            final Set<Relationship> relationships;
-            try (final NarCloseable narCloseable = 
NarCloseable.withNarLoader()) {
-                relationships = processor.getRelationships();
-            }
+        final Set<Relationship> relationships;
+        try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+            relationships = processor.getRelationships();
+        }
 
-            if (relationships == null) {
-                return undefined;
-            }
-            for (final Relationship relation : relationships) {
-                final Set<Connection> connectionSet = 
this.connections.get(relation);
-                if (connectionSet == null || connectionSet.isEmpty()) {
-                    undefined.add(relation);
-                }
+        if (relationships == null) {
+            return undefined;
+        }
+        for (final Relationship relation : relationships) {
+            final Set<Connection> connectionSet = 
this.connections.get(relation);
+            if (connectionSet == null || connectionSet.isEmpty()) {
+                undefined.add(relation);
             }
-        } finally {
-            readLock.unlock();
         }
         return undefined;
     }
@@ -928,36 +880,22 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     /**
      * Determines if the given node is a destination for this node
      *
-     * @param node node
+     * @param node
+     *            node
      * @return true if is a direct destination node; false otherwise
      */
     boolean isRelated(final ProcessorNode node) {
-        readLock.lock();
-        try {
-            return this.destinations.containsValue(node);
-        } finally {
-            readLock.unlock();
-        }
+        return this.destinations.containsValue(node);
     }
 
     @Override
     public boolean isRunning() {
-        readLock.lock();
-        try {
-            return getScheduledState().equals(ScheduledState.RUNNING) || 
processScheduler.getActiveThreadCount(this) > 0;
-        } finally {
-            readLock.unlock();
-        }
+        return getScheduledState().equals(ScheduledState.RUNNING) || 
processScheduler.getActiveThreadCount(this) > 0;
     }
 
     @Override
     public int getActiveThreadCount() {
-        readLock.lock();
-        try {
-            return processScheduler.getActiveThreadCount(this);
-        } finally {
-            readLock.unlock();
-        }
+        return processScheduler.getActiveThreadCount(this);
     }
 
     List<Connection> getIncomingNonLoopConnections() {
@@ -974,14 +912,12 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
     @Override
     public boolean isValid() {
-        readLock.lock();
         try {
-            final ValidationContext validationContext = 
validationContextFactory.newValidationContext(getProperties(), 
getAnnotationData());
+            final ValidationContext validationContext = 
validationContextFactory.newValidationContext(getProperties(),
+                    getAnnotationData());
 
             final Collection<ValidationResult> validationResults;
-            try (final NarCloseable narCloseable = 
NarCloseable.withNarLoader()) {
-                validationResults = getProcessor().validate(validationContext);
-            }
+            validationResults = getProcessor().validate(validationContext);
 
             for (final ValidationResult result : validationResults) {
                 if (!result.isValid()) {
@@ -996,36 +932,34 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
             }
 
             switch (getInputRequirement()) {
-                case INPUT_ALLOWED:
-                    break;
-                case INPUT_FORBIDDEN: {
-                    if (!getIncomingNonLoopConnections().isEmpty()) {
-                        return false;
-                    }
-                    break;
+            case INPUT_ALLOWED:
+                break;
+            case INPUT_FORBIDDEN: {
+                if (!getIncomingNonLoopConnections().isEmpty()) {
+                    return false;
                 }
-                case INPUT_REQUIRED: {
-                    if (getIncomingNonLoopConnections().isEmpty()) {
-                        return false;
-                    }
-                    break;
+                break;
+            }
+            case INPUT_REQUIRED: {
+                if (getIncomingNonLoopConnections().isEmpty()) {
+                    return false;
                 }
+                break;
+            }
             }
         } catch (final Throwable t) {
+            LOG.warn("Failed during validation", t);
             return false;
-        } finally {
-            readLock.unlock();
         }
-
         return true;
     }
 
     @Override
     public Collection<ValidationResult> getValidationErrors() {
         final List<ValidationResult> results = new ArrayList<>();
-        readLock.lock();
         try {
-            final ValidationContext validationContext = 
validationContextFactory.newValidationContext(getProperties(), 
getAnnotationData());
+            final ValidationContext validationContext = 
validationContextFactory.newValidationContext(getProperties(),
+                    getAnnotationData());
 
             final Collection<ValidationResult> validationResults;
             try (final NarCloseable narCloseable = 
NarCloseable.withNarLoader()) {
@@ -1041,43 +975,37 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
             for (final Relationship relationship : 
getUndefinedRelationships()) {
                 if (!isAutoTerminated(relationship)) {
                     final ValidationResult error = new 
ValidationResult.Builder()
-                        .explanation("Relationship '" + relationship.getName() 
+ "' is not connected to any component and is not auto-terminated")
-                        .subject("Relationship " + relationship.getName())
-                        .valid(false)
-                        .build();
+                            .explanation("Relationship '" + 
relationship.getName()
+                                    + "' is not connected to any component and 
is not auto-terminated")
+                            .subject("Relationship " + 
relationship.getName()).valid(false).build();
                     results.add(error);
                 }
             }
 
             switch (getInputRequirement()) {
-                case INPUT_ALLOWED:
-                    break;
-                case INPUT_FORBIDDEN: {
-                    final int incomingConnCount = 
getIncomingNonLoopConnections().size();
-                    if (incomingConnCount != 0) {
-                        results.add(new ValidationResult.Builder()
-                            .explanation("Processor does not allow upstream 
connections but currently has " + incomingConnCount)
-                            .subject("Upstream Connections")
-                            .valid(false)
-                            .build());
-                    }
-                    break;
+            case INPUT_ALLOWED:
+                break;
+            case INPUT_FORBIDDEN: {
+                final int incomingConnCount = 
getIncomingNonLoopConnections().size();
+                if (incomingConnCount != 0) {
+                    results.add(new ValidationResult.Builder().explanation(
+                            "Processor does not allow upstream connections but 
currently has " + incomingConnCount)
+                            .subject("Upstream 
Connections").valid(false).build());
                 }
-                case INPUT_REQUIRED: {
-                    if (getIncomingNonLoopConnections().isEmpty()) {
-                        results.add(new ValidationResult.Builder()
+                break;
+            }
+            case INPUT_REQUIRED: {
+                if (getIncomingNonLoopConnections().isEmpty()) {
+                    results.add(new ValidationResult.Builder()
                             .explanation("Processor requires an upstream 
connection but currently has none")
-                            .subject("Upstream Connections")
-                            .valid(false)
-                            .build());
-                    }
-                    break;
+                            .subject("Upstream 
Connections").valid(false).build());
                 }
+                break;
+            }
             }
         } catch (final Throwable t) {
-            results.add(new ValidationResult.Builder().explanation("Failed to 
run validation due to " + t.toString()).valid(false).build());
-        } finally {
-            readLock.unlock();
+            results.add(new ValidationResult.Builder().explanation("Failed to 
run validation due to " + t.toString())
+                    .valid(false).build());
         }
         return results;
     }
@@ -1090,7 +1018,8 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     /**
      * Establishes node equality (based on the processor's identifier)
      *
-     * @param other node
+     * @param other
+     *            node
      * @return true if equal
      */
     @Override
@@ -1128,18 +1057,15 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
 
     @Override
     public void setProcessGroup(final ProcessGroup group) {
-        writeLock.lock();
-        try {
-            this.processGroup.set(group);
-        } finally {
-            writeLock.unlock();
-        }
+        this.processGroup.set(group);
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
         try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
             processor.onTrigger(context, sessionFactory);
+        } catch (Exception ex) {
+            ex.printStackTrace();
         }
     }
 
@@ -1149,25 +1075,12 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
     }
 
     @Override
-    public void setScheduledState(final ScheduledState scheduledState) {
-        this.scheduledState.set(scheduledState);
-        if (!scheduledState.equals(ScheduledState.RUNNING)) { // if user stops 
processor, clear yield expiration
-            yieldExpiration.set(0L);
-        }
-    }
-
-    @Override
     public void setAnnotationData(final String data) {
-        writeLock.lock();
-        try {
-            if (isRunning()) {
-                throw new IllegalStateException("Cannot set AnnotationData 
while processor is running");
-            }
-
-            this.annotationData.set(data);
-        } finally {
-            writeLock.unlock();
+        if (isRunning()) {
+            throw new IllegalStateException("Cannot set AnnotationData while 
processor is running");
         }
+
+        this.annotationData.set(data);
     }
 
     @Override
@@ -1187,75 +1100,55 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
 
     @Override
     public void verifyCanDelete(final boolean ignoreConnections) {
-        readLock.lock();
-        try {
-            if (isRunning()) {
-                throw new IllegalStateException(this + " is running");
-            }
+        if (isRunning()) {
+            throw new IllegalStateException(this + " is running");
+        }
 
-            if (!ignoreConnections) {
-                for (final Set<Connection> connectionSet : 
connections.values()) {
-                    for (final Connection connection : connectionSet) {
-                        connection.verifyCanDelete();
-                    }
+        if (!ignoreConnections) {
+            for (final Set<Connection> connectionSet : connections.values()) {
+                for (final Connection connection : connectionSet) {
+                    connection.verifyCanDelete();
                 }
+            }
 
-                for (final Connection connection : 
incomingConnectionsRef.get()) {
-                    if (connection.getSource().equals(this)) {
-                        connection.verifyCanDelete();
-                    } else {
-                        throw new IllegalStateException(this + " is the 
destination of another component");
-                    }
+            for (final Connection connection : incomingConnectionsRef.get()) {
+                if (connection.getSource().equals(this)) {
+                    connection.verifyCanDelete();
+                } else {
+                    throw new IllegalStateException(this + " is the 
destination of another component");
                 }
             }
-        } finally {
-            readLock.unlock();
         }
     }
 
     @Override
     public void verifyCanStart() {
-        readLock.lock();
-        try {
-            switch (getScheduledState()) {
-                case DISABLED:
-                    throw new IllegalStateException(this + " cannot be started 
because it is disabled");
-                case RUNNING:
-                    throw new IllegalStateException(this + " cannot be started 
because it is already running");
-                case STOPPED:
-                    break;
-            }
-            verifyNoActiveThreads();
-
-            if (!isValid()) {
-                throw new IllegalStateException(this + " is not in a valid 
state");
-            }
-        } finally {
-            readLock.unlock();
-        }
+        this.verifyCanStart(null);
     }
 
     @Override
     public void verifyCanStart(final Set<ControllerServiceNode> 
ignoredReferences) {
-        switch (getScheduledState()) {
-            case DISABLED:
-                throw new IllegalStateException(this + " cannot be started 
because it is disabled");
-            case RUNNING:
-                throw new IllegalStateException(this + " cannot be started 
because it is already running");
-            case STOPPED:
-                break;
+        if (this.getScheduledState() == ScheduledState.RUNNING) {
+            throw new IllegalStateException(this + " cannot be started because 
it is already running");
         }
+
         verifyNoActiveThreads();
 
-        final Set<String> ids = new HashSet<>();
-        for (final ControllerServiceNode node : ignoredReferences) {
-            ids.add(node.getIdentifier());
-        }
+        if (ignoredReferences != null) {
+            final Set<String> ids = new HashSet<>();
+            for (final ControllerServiceNode node : ignoredReferences) {
+                ids.add(node.getIdentifier());
+            }
 
-        final Collection<ValidationResult> validationResults = 
getValidationErrors(ids);
-        for (final ValidationResult result : validationResults) {
-            if (!result.isValid()) {
-                throw new IllegalStateException(this + " cannot be started 
because it is not valid: " + result);
+            final Collection<ValidationResult> validationResults = 
getValidationErrors(ids);
+            for (final ValidationResult result : validationResults) {
+                if (!result.isValid()) {
+                    throw new IllegalStateException(this + " cannot be started 
because it is not valid: " + result);
+                }
+            }
+        } else {
+            if (!isValid()) {
+                throw new IllegalStateException(this + " is not in a valid 
state");
             }
         }
     }
@@ -1269,41 +1162,26 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
 
     @Override
     public void verifyCanUpdate() {
-        readLock.lock();
-        try {
-            if (isRunning()) {
-                throw new IllegalStateException(this + " is not stopped");
-            }
-        } finally {
-            readLock.unlock();
+        if (isRunning()) {
+            throw new IllegalStateException(this + " is not stopped");
         }
     }
 
     @Override
     public void verifyCanEnable() {
-        readLock.lock();
-        try {
-            if (getScheduledState() != ScheduledState.DISABLED) {
-                throw new IllegalStateException(this + " is not disabled");
-            }
-
-            verifyNoActiveThreads();
-        } finally {
-            readLock.unlock();
+        if (getScheduledState() != ScheduledState.DISABLED) {
+            throw new IllegalStateException(this + " is not disabled");
         }
+
+        verifyNoActiveThreads();
     }
 
     @Override
     public void verifyCanDisable() {
-        readLock.lock();
-        try {
-            if (getScheduledState() != ScheduledState.STOPPED) {
-                throw new IllegalStateException(this + " is not stopped");
-            }
-            verifyNoActiveThreads();
-        } finally {
-            readLock.unlock();
+        if (getScheduledState() != ScheduledState.STOPPED) {
+            throw new IllegalStateException(this + " is not stopped");
         }
+        verifyNoActiveThreads();
     }
 
     @Override
@@ -1324,4 +1202,195 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
             throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
         }
     }
+
+    /**
+     * Will idempotently start the processor using the following sequence: <i>
+     * <ul>
+     * <li>Validate Processor's state (e.g., PropertyDescriptors,
+     * ControllerServices etc.)</li>
+     * <li>Transition (atomically) Processor's scheduled state form STOPPED to
+     * STARTING. If the above state transition succeeds, then execute the start
+     * task (asynchronously) which will be re-tried until @OnScheduled is
+     * executed successfully and "schedulingAgentCallback' is invoked, or until
+     * STOP operation is initiated on this processor. If state transition fails
+     * it means processor is already being started and WARN message will be
+     * logged explaining it.</li>
+     * </ul>
+     * </i>
+     * <p>
+     * Any exception thrown while invoking operations annotated with 
@OnSchedule
+     * will be caught and logged after which @OnUnscheduled operation will be
+     * invoked (quietly) and the start sequence will be repeated (re-try) after
+     * delay provided by 'administrativeYieldMillis'.
+     * </p>
+     * <p>
+     * Upon successful completion of start sequence (@OnScheduled -&gt;
+     * 'schedulingAgentCallback') the attempt will be made to transition
+     * processor's scheduling state to RUNNING at which point processor is
+     * considered to be fully started and functioning. If upon successful
+     * invocation of @OnScheduled operation the processor can not be
+     * transitioned to RUNNING state (e.g., STOP operation was invoked on the
+     * processor while it's @OnScheduled operation was executing), the
+     * processor's @OnUnscheduled operation will be invoked and its scheduling
+     * state will be set to STOPPED at which point the processor is considered
+     * to be fully stopped.
+     * </p>
+     */
+    @Override
+    public <T extends ProcessContext & ControllerServiceLookup> void 
start(final ScheduledExecutorService taskScheduler,
+            final long administrativeYieldMillis, final T processContext, 
final Runnable schedulingAgentCallback) {
+        if (!this.isValid()) {
+            throw new IllegalStateException( "Processor " + this.getName() + " 
is not in a valid state due to " + this.getValidationErrors());
+        }
+
+        if (this.scheduledState.compareAndSet(ScheduledState.STOPPED, 
ScheduledState.STARTING)) { // will ensure that the Processor represented by 
this node can only be started once
+            final Runnable startProcRunnable = new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        SchedulingContext schedulingContext = new 
StandardSchedulingContext(processContext, getControllerServiceProvider(),
+                                StandardProcessorNode.this, 
processContext.getStateManager());
+                        invokeOnScheduleAsync(taskScheduler, 
schedulingContext);
+                        if 
(scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) 
{
+                            schedulingAgentCallback.run(); // callback 
provided by StandardProcessScheduler to essentially initiate component's 
onTrigger() cycle
+                        } else { // can only happen if stopProcessor was 
called before service was transitioned to RUNNING state
+                            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
processor, processContext);
+                            scheduledState.set(ScheduledState.STOPPED);
+                        }
+                    } catch (final Exception e) {
+                        final Throwable cause = e instanceof 
InvocationTargetException ? e.getCause() : e;
+                        final ProcessorLog procLog = new 
SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
+
+                        procLog.error( "{} failed to invoke @OnScheduled 
method due to {}; processor will not be scheduled to run for {}",
+                                new Object[] { 
StandardProcessorNode.this.getProcessor(), cause, administrativeYieldMillis + " 
milliseconds" }, cause);
+                        LOG.error("Failed to invoke @OnScheduled method due to 
{}", cause.toString(), cause);
+
+                        
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
processor, processContext);
+                        if (scheduledState.get() != ScheduledState.STOPPING) { 
// make sure we only continue retry loop if STOP action wasn't initiated
+                            taskScheduler.schedule(this, 
administrativeYieldMillis, TimeUnit.MILLISECONDS);
+                        } else {
+                            scheduledState.set(ScheduledState.STOPPED);
+                        }
+                    }
+                }
+            };
+            taskScheduler.execute(startProcRunnable);
+        } else {
+            LOG.warn("Can not start Processor since it's already in the 
process of being started");
+        }
+    }
+
+    /**
+     * Will idempotently stop the processor using the following sequence: <i>
+     * <ul>
+     * <li>Transition (atomically) Processor's scheduled state form RUNNING to
+     * STOPPING. If the above state transition succeeds, then execute the stop
+     * task (asynchronously) where 'activeThreadMonitorCallback' provided by 
the
+     * {@link ProcessScheduler} will be called to check if this processor still
+     * has active threads. If it does, the task will be re-scheduled with delay
+     * of 100 milliseconds until there are no more active threads, at which
+     * point processor's @OnUnscheduled and @OnStopped operation will be 
invoked
+     * and its scheduled state is set to STOPPED which completes processor stop
+     * sequence.</li>
+     * </ul>
+     * </i>
+     * <p>
+     * If for some reason processor's scheduled state can not be transitioned 
to
+     * STOPPING (e.g., the processor didn't finish @OnScheduled operation when
+     * stop was called), the attempt will be made to transition processor's
+     * scheduled state from STARTING to STOPPING which will allow
+     * {@link #start(ScheduledExecutorService, long, ProcessContext, Runnable)}
+     * method to initiate processor's shutdown upon exiting @OnScheduled
+     * operation, otherwise the processor's scheduled state will remain
+     * unchanged ensuring that multiple calls to this method are idempotent.
+     * </p>
+     */
+    @Override
+    public <T extends ProcessContext & ControllerServiceLookup> void 
stop(final ScheduledExecutorService scheduler,
+            final T processContext, final Callable<Boolean> 
activeThreadMonitorCallback) {
+        if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, 
ScheduledState.STOPPING)) { // will ensure that the Processor represented by 
this node can only be stopped once
+            final Runnable stopProcRunnable = new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        if (activeThreadMonitorCallback.call()) {
+                            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
processor, processContext);
+                            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, 
processContext);
+                            scheduledState.set(ScheduledState.STOPPED);
+                        } else {
+                            scheduler.schedule(this, 100, 
TimeUnit.MILLISECONDS);
+                        }
+                    } catch (Exception e) {
+                        LOG.warn("Failed while shutting down processor " + 
processor, e);
+                    }
+                }
+            };
+            scheduler.execute(stopProcRunnable);
+        } else {
+            /*
+             * We do compareAndSet() instead of set() to ensure that Processor
+             * stoppage is handled consistently including a condition where
+             * Processor never got a chance to transition to RUNNING state
+             * before stop() was called. If that happens the stop processor
+             * routine will be initiated in start() method, otherwise the IF
+             * part will handle the stop processor routine.
+             */
+            this.scheduledState.compareAndSet(ScheduledState.STARTING, 
ScheduledState.STOPPING);
+        }
+    }
+
+    /**
+     * Will invoke processor's methods annotated with @OnSchedule 
asynchronously
+     * to ensure that it could be interrupted if stop action was initiated on
+     * the processor that may be sitting in the infinitely blocking @OnSchedule
+     * operation. While this approach paves the way for further enhancements
+     * related to managing processor'slife-cycle operation at the moment the
+     * interrupt will not happen automatically. This is primarily to preserve
+     * the existing behavior or the NiFi where stop operation can only be
+     * invoked once the processor is started. Unfortunately that could mean 
that
+     * the processor may be blocking indefinitely in the @Oncheduled call. To
+     * deal with that a new NiFi property has been introduced
+     * <i>nifi.processor.start.timeout</i> which allows one to set the time (in
+     * milliseconds) of how long to wait before canceling the @OnScheduled task
+     * allowing processor's stop sequence to proceed. The default value for 
this
+     * property is {@link Long#MAX_VALUE}.
+     * <p>
+     * NOTE: Canceling the task does not guarantee that the task will actually
+     * completes (successfully or otherwise), since cancellation of the task
+     * will issue a simple Thread.interrupt(). However code inside
+     * of @OnScheduled operation is written purely and will ignore thread
+     * interrupts you may end up with runaway thread which may eventually
+     * require NiFi reboot. In any event, the above explanation will be logged
+     * (WARN) informing a user so further actions could be taken.
+     * </p>
+     */
+    private void invokeOnScheduleAsync(ScheduledExecutorService taskScheduler, 
final SchedulingContext schedulingContext) throws ExecutionException {
+        Future<Void> executionResult = taskScheduler.submit(new 
Callable<Void>() {
+            @SuppressWarnings("deprecation")
+            @Override
+            public Void call() throws Exception {
+                
ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, 
org.apache.nifi.processor.annotation.OnScheduled.class, processor, 
schedulingContext);
+                return null;
+            }
+        });
+
+        long onScheduleTimeout = Long.parseLong(NiFiProperties.getInstance()
+                .getProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, 
String.valueOf(Long.MAX_VALUE)));
+        try {
+            executionResult.get(onScheduleTimeout, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOG.warn("Thread was interrupted while waiting for processor '" + 
this.processor.getClass().getSimpleName()
+                    + "' @OnSchedule operation to finish.");
+            Thread.currentThread().interrupt();
+        } catch (TimeoutException e) {
+            executionResult.cancel(true);
+            LOG.warn("Timed out while waiting for the task executing 
@OnSchedule operation for '"
+                    + this.processor.getClass().getSimpleName()
+                    + "' processor to finish. An attempt is made to cancel the 
task via Thread.interrupt(). However it does not "
+                    + "guarantee that the task will be canceled since the code 
inside @OnSchedule method may "
+                    + "have been written to ignore interrupts which may result 
in runaway thread which could lead to more issues "
+                    + "eventually requiring NiFi to be restarted. This is 
usually a bug in the target Processor '"
+                    + this.processor + "' that needs to be documented, 
reported and eventually fixed.");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0c5b1c27/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
new file mode 100644
index 0000000..b931c64
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.scheduling;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.ReportingTaskNode;
+
+/**
+ * Base implementation of the {@link SchedulingAgent} which encapsulates the
+ * updates to the {@link ScheduleState} based on invoked operation and then
+ * delegates to the corresponding 'do' methods. For example; By invoking
+ * {@link #schedule(Connectable, ScheduleState)} the the
+ * {@link ScheduleState#setScheduled(boolean)} with value 'true' will be
+ * invoked.
+ *
+ * @see EventDrivenSchedulingAgent
+ * @see TimerDrivenSchedulingAgent
+ * @see QuartzSchedulingAgent
+ */
+abstract class AbstractSchedulingAgent implements SchedulingAgent {
+
+    @Override
+    public void schedule(Connectable connectable, ScheduleState scheduleState) 
{
+        scheduleState.setScheduled(true);
+        this.doSchedule(connectable, scheduleState);
+    }
+
+    @Override
+    public void unschedule(Connectable connectable, ScheduleState 
scheduleState) {
+        scheduleState.setScheduled(false);
+        this.doUnschedule(connectable, scheduleState);
+    }
+
+    @Override
+    public void schedule(ReportingTaskNode taskNode, ScheduleState 
scheduleState) {
+        scheduleState.setScheduled(true);
+        this.doSchedule(taskNode, scheduleState);
+    }
+
+    @Override
+    public void unschedule(ReportingTaskNode taskNode, ScheduleState 
scheduleState) {
+        scheduleState.setScheduled(false);
+        this.doUnschedule(taskNode, scheduleState);
+    }
+
+    /**
+     * Schedules the provided {@link Connectable}. Its {@link ScheduleState}
+     * will be set to <i>true</i>
+     *
+     * @param connectable
+     *            the instance of {@link Connectable}
+     * @param scheduleState
+     *            the instance of {@link ScheduleState}
+     */
+    protected abstract void doSchedule(Connectable connectable, ScheduleState 
scheduleState);
+
+    /**
+     * Unschedules the provided {@link Connectable}. Its {@link ScheduleState}
+     * will be set to <i>false</i>
+     *
+     * @param connectable
+     *            the instance of {@link Connectable}
+     * @param scheduleState
+     *            the instance of {@link ScheduleState}
+     */
+    protected abstract void doUnschedule(Connectable connectable, 
ScheduleState scheduleState);
+
+    /**
+     * Schedules the provided {@link ReportingTaskNode}. Its
+     * {@link ScheduleState} will be set to <i>true</i>
+     *
+     * @param connectable
+     *            the instance of {@link ReportingTaskNode}
+     * @param scheduleState
+     *            the instance of {@link ScheduleState}
+     */
+    protected abstract void doSchedule(ReportingTaskNode connectable, 
ScheduleState scheduleState);
+
+    /**
+     * Unschedules the provided {@link ReportingTaskNode}. Its
+     * {@link ScheduleState} will be set to <i>false</i>
+     *
+     * @param connectable
+     *            the instance of {@link ReportingTaskNode}
+     * @param scheduleState
+     *            the instance of {@link ScheduleState}
+     */
+    protected abstract void doUnschedule(ReportingTaskNode connectable, 
ScheduleState scheduleState);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0c5b1c27/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index 76d7c08..37cab01 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -51,7 +51,7 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class EventDrivenSchedulingAgent implements SchedulingAgent {
+public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
 
     private static final Logger logger = 
LoggerFactory.getLogger(EventDrivenSchedulingAgent.class);
 
@@ -94,24 +94,24 @@ public class EventDrivenSchedulingAgent implements 
SchedulingAgent {
     }
 
     @Override
-    public void schedule(final ReportingTaskNode taskNode, ScheduleState 
scheduleState) {
+    public void doSchedule(final ReportingTaskNode taskNode, ScheduleState 
scheduleState) {
         throw new UnsupportedOperationException("ReportingTasks cannot be 
scheduled in Event-Driven Mode");
     }
 
     @Override
-    public void unschedule(ReportingTaskNode taskNode, ScheduleState 
scheduleState) {
+    public void doUnschedule(ReportingTaskNode taskNode, ScheduleState 
scheduleState) {
         throw new UnsupportedOperationException("ReportingTasks cannot be 
scheduled in Event-Driven Mode");
     }
 
     @Override
-    public void schedule(final Connectable connectable, final ScheduleState 
scheduleState) {
+    public void doSchedule(final Connectable connectable, final ScheduleState 
scheduleState) {
         workerQueue.resumeWork(connectable);
         logger.info("Scheduled {} to run in Event-Driven mode", connectable);
         scheduleStates.put(connectable, scheduleState);
     }
 
     @Override
-    public void unschedule(final Connectable connectable, final ScheduleState 
scheduleState) {
+    public void doUnschedule(final Connectable connectable, final 
ScheduleState scheduleState) {
         workerQueue.suspendWork(connectable);
         logger.info("Stopped scheduling {} to run", connectable);
     }

Reply via email to