NIFI-381: do not re-schedule processor to run after yield if not scheduled to 
run anymore


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a956623f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a956623f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a956623f

Branch: refs/heads/develop
Commit: a956623ff92fe35aecabd95d6f6f2b85de6b3edc
Parents: e370d7d
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed Feb 25 11:38:28 2015 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Wed Feb 25 11:38:28 2015 -0500

----------------------------------------------------------------------
 .../controller/scheduling/StandardProcessScheduler.java  |  5 ++---
 .../scheduling/TimerDrivenSchedulingAgent.java           | 11 ++++++-----
 2 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a956623f/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 1627994..4407451 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -43,7 +43,6 @@ import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.annotation.OnConfigured;
-import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
@@ -374,9 +373,9 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
                 return;
             }
 
+            state.setScheduled(false);
             getSchedulingAgent(procNode).unschedule(procNode, state);
             procNode.setScheduledState(ScheduledState.STOPPED);
-            state.setScheduled(false);
         }
 
         final Runnable stopProcRunnable = new Runnable() {
@@ -474,8 +473,8 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         if (!state.isScheduled()) {
             return;
         }
+        
         state.setScheduled(false);
-
         getSchedulingAgent(connectable).unschedule(connectable, state);
 
         if (!state.isScheduled() && state.getActiveThreadCount() == 0 && 
state.mustCallOnStoppedMethods()) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a956623f/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
index a620202..17fb9f8 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
@@ -130,11 +130,12 @@ public class TimerDrivenSchedulingAgent implements 
SchedulingAgent {
                         // so that we can do this again the next time that the 
component is yielded.
                         if (scheduledFuture.cancel(false)) {
                             final long yieldNanos = 
TimeUnit.MILLISECONDS.toNanos(yieldMillis);
-                            final ScheduledFuture<?> newFuture = 
flowEngine.scheduleWithFixedDelay(this, yieldNanos, 
-                                    
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
                             
                             synchronized (scheduleState) {
                                 if ( scheduleState.isScheduled() ) {
+                                    final ScheduledFuture<?> newFuture = 
flowEngine.scheduleWithFixedDelay(this, yieldNanos, 
+                                            
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+                                    
                                     
scheduleState.replaceFuture(scheduledFuture, newFuture);
                                     futureRef.set(newFuture);
                                 }
@@ -152,11 +153,11 @@ public class TimerDrivenSchedulingAgent implements 
SchedulingAgent {
                         // an accurate accounting of which futures are 
outstanding; we must then also update the futureRef
                         // so that we can do this again the next time that the 
component is yielded.
                         if (scheduledFuture.cancel(false)) {
-                            final ScheduledFuture<?> newFuture = 
flowEngine.scheduleWithFixedDelay(this, NO_WORK_YIELD_NANOS, 
-                                    
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
-                            
                             synchronized (scheduleState) {
                                 if ( scheduleState.isScheduled() ) {
+                                    final ScheduledFuture<?> newFuture = 
flowEngine.scheduleWithFixedDelay(this, NO_WORK_YIELD_NANOS, 
+                                            
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+                                    
                                     
scheduleState.replaceFuture(scheduledFuture, newFuture);
                                     futureRef.set(newFuture);
                                 }

Reply via email to