NIFI-381: do not re-schedule processor to run after yield if not scheduled to run anymore
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a956623f Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a956623f Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a956623f Branch: refs/heads/develop Commit: a956623ff92fe35aecabd95d6f6f2b85de6b3edc Parents: e370d7d Author: Mark Payne <marka...@hotmail.com> Authored: Wed Feb 25 11:38:28 2015 -0500 Committer: Mark Payne <marka...@hotmail.com> Committed: Wed Feb 25 11:38:28 2015 -0500 ---------------------------------------------------------------------- .../controller/scheduling/StandardProcessScheduler.java | 5 ++--- .../scheduling/TimerDrivenSchedulingAgent.java | 11 ++++++----- 2 files changed, 8 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a956623f/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 1627994..4407451 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -43,7 +43,6 @@ import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.annotation.OnConfigured; -import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; @@ -374,9 +373,9 @@ public final class StandardProcessScheduler implements ProcessScheduler { return; } + state.setScheduled(false); getSchedulingAgent(procNode).unschedule(procNode, state); procNode.setScheduledState(ScheduledState.STOPPED); - state.setScheduled(false); } final Runnable stopProcRunnable = new Runnable() { @@ -474,8 +473,8 @@ public final class StandardProcessScheduler implements ProcessScheduler { if (!state.isScheduled()) { return; } + state.setScheduled(false); - getSchedulingAgent(connectable).unschedule(connectable, state); if (!state.isScheduled() && state.getActiveThreadCount() == 0 && state.mustCallOnStoppedMethods()) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a956623f/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java index a620202..17fb9f8 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java @@ -130,11 +130,12 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent { // so that we can do this again the next time that the component is yielded. if (scheduledFuture.cancel(false)) { final long yieldNanos = TimeUnit.MILLISECONDS.toNanos(yieldMillis); - final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, yieldNanos, - connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); synchronized (scheduleState) { if ( scheduleState.isScheduled() ) { + final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, yieldNanos, + connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + scheduleState.replaceFuture(scheduledFuture, newFuture); futureRef.set(newFuture); } @@ -152,11 +153,11 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent { // an accurate accounting of which futures are outstanding; we must then also update the futureRef // so that we can do this again the next time that the component is yielded. if (scheduledFuture.cancel(false)) { - final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, NO_WORK_YIELD_NANOS, - connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); - synchronized (scheduleState) { if ( scheduleState.isScheduled() ) { + final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, NO_WORK_YIELD_NANOS, + connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + scheduleState.replaceFuture(scheduledFuture, newFuture); futureRef.set(newFuture); }