NIFI-362: Avoid continually scheduling components to run if there is no work for them to do or if they are yielded
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4cc106a5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4cc106a5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4cc106a5 Branch: refs/heads/develop Commit: 4cc106a54d9b6528e38cb99ecb15524a07a1f0c9 Parents: dde5fd5 Author: Mark Payne <marka...@hotmail.com> Authored: Sun Feb 22 10:53:24 2015 -0500 Committer: Mark Payne <marka...@hotmail.com> Committed: Sun Feb 22 10:53:24 2015 -0500 ---------------------------------------------------------------------- .../apache/nifi/controller/StandardFunnel.java | 2 +- .../scheduling/QuartzSchedulingAgent.java | 21 +++- .../controller/scheduling/ScheduleState.java | 18 ++-- .../scheduling/TimerDrivenSchedulingAgent.java | 105 ++++++++++++++++--- .../tasks/ContinuallyRunConnectableTask.java | 32 ++++-- .../tasks/ContinuallyRunProcessorTask.java | 32 +++--- 6 files changed, 163 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4cc106a5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java index e34e043..3bdfd20 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java @@ -94,7 +94,7 @@ public class StandardFunnel implements Funnel { position = new AtomicReference<>(new Position(0D, 0D)); scheduledState = new AtomicReference<>(ScheduledState.STOPPED); penalizationPeriod = new AtomicReference<>("30 sec"); - yieldPeriod = new AtomicReference<>("1 sec"); + yieldPeriod = new AtomicReference<>("250 millis"); yieldExpiration = new AtomicLong(0L); schedulingPeriod = new AtomicReference<>("0 millis"); schedulingNanos = new AtomicLong(30000); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4cc106a5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java index ea67492..3355e73 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java @@ -21,6 +21,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,8 +35,9 @@ import org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask; import org.apache.nifi.controller.tasks.ReportingTaskWrapper; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.processor.StandardProcessContext; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.FormatUtils; - import org.quartz.CronExpression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,13 +132,16 @@ public class QuartzSchedulingAgent implements SchedulingAgent { final List<AtomicBoolean> triggers = new ArrayList<>(); for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) { - final Runnable continuallyRunTask; + final Callable<Boolean> continuallyRunTask; if (connectable.getConnectableType() == ConnectableType.PROCESSOR) { final ProcessorNode procNode = (ProcessorNode) connectable; - ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, encryptor); + + final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor); + ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, standardProcContext); continuallyRunTask = runnableTask; } else { - continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, encryptor); + final ConnectableProcessContext connProcContext = new ConnectableProcessContext(connectable, encryptor); + continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, connProcContext); } final AtomicBoolean canceled = new AtomicBoolean(false); @@ -147,7 +152,13 @@ public class QuartzSchedulingAgent implements SchedulingAgent { return; } - continuallyRunTask.run(); + try { + continuallyRunTask.call(); + } catch (final RuntimeException re) { + throw re; + } catch (final Exception e) { + throw new ProcessException(e); + } if (canceled.get()) { return; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4cc106a5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java index eb5a437..ff17912 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java @@ -16,9 +16,10 @@ */ package org.apache.nifi.controller.scheduling; -import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; -import java.util.List; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -27,7 +28,7 @@ public class ScheduleState { private final AtomicInteger activeThreadCount = new AtomicInteger(0); private final AtomicBoolean scheduled = new AtomicBoolean(false); - private final List<ScheduledFuture<?>> futures = new ArrayList<>(); + private final Set<ScheduledFuture<?>> futures = new HashSet<ScheduledFuture<?>>(); private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false); private volatile long lastStopTime = -1; @@ -79,12 +80,17 @@ public class ScheduleState { * * @param newFutures */ - public void setFutures(final List<ScheduledFuture<?>> newFutures) { + public synchronized void setFutures(final Collection<ScheduledFuture<?>> newFutures) { futures.clear(); futures.addAll(newFutures); } - public List<ScheduledFuture<?>> getFutures() { - return Collections.unmodifiableList(futures); + public synchronized void replaceFuture(final ScheduledFuture<?> oldFuture, final ScheduledFuture<?> newFuture) { + futures.remove(oldFuture); + futures.add(newFuture); + } + + public synchronized Set<ScheduledFuture<?>> getFutures() { + return Collections.unmodifiableSet(futures); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4cc106a5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java index db06151..efa8acd 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java @@ -18,8 +18,10 @@ package org.apache.nifi.controller.scheduling; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; @@ -31,15 +33,17 @@ import org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask; import org.apache.nifi.controller.tasks.ReportingTaskWrapper; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.StandardProcessContext; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.FormatUtils; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TimerDrivenSchedulingAgent implements SchedulingAgent { - private static final Logger logger = LoggerFactory.getLogger(TimerDrivenSchedulingAgent.class); - + private static final long NO_WORK_YIELD_NANOS = TimeUnit.MILLISECONDS.toNanos(10L); + private final FlowController flowController; private final FlowEngine flowEngine; private final ProcessContextFactory contextFactory; @@ -72,20 +76,95 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent { logger.info("{} started.", taskNode.getReportingTask()); } + @Override public void schedule(final Connectable connectable, final ScheduleState scheduleState) { - final Runnable runnable; - if (connectable.getConnectableType() == ConnectableType.PROCESSOR) { - final ProcessorNode procNode = (ProcessorNode) connectable; - ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, encryptor); - runnable = runnableTask; - } else { - runnable = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, encryptor); - } - + final List<ScheduledFuture<?>> futures = new ArrayList<>(); for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) { - final ScheduledFuture<?> future = flowEngine.scheduleWithFixedDelay(runnable, 0L, connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + final Callable<Boolean> continuallyRunTask; + final ProcessContext processContext; + + // Determine the task to run and create it. + if (connectable.getConnectableType() == ConnectableType.PROCESSOR) { + final ProcessorNode procNode = (ProcessorNode) connectable; + final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor); + final ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, + contextFactory, scheduleState, standardProcContext); + + continuallyRunTask = runnableTask; + processContext = standardProcContext; + } else { + processContext = new ConnectableProcessContext(connectable, encryptor); + continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, processContext); + } + + final AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<>(); + + final Runnable yieldDetectionRunnable = new Runnable() { + @Override + public void run() { + // Call the continually run task. It will return a boolean indicating whether or not we should yield + // based on a lack of work for to do for the component. + final boolean shouldYield; + try { + shouldYield = continuallyRunTask.call(); + } catch (final RuntimeException re) { + throw re; + } catch (final Exception e) { + throw new ProcessException(e); + } + + // If the component is yielded, cancel its future and re-submit it to run again + // after the yield has expired. + final long newYieldExpiration = connectable.getYieldExpiration(); + if ( newYieldExpiration > System.currentTimeMillis() ) { + final long yieldMillis = System.currentTimeMillis() - newYieldExpiration; + final ScheduledFuture<?> scheduledFuture = futureRef.get(); + if ( scheduledFuture == null ) { + return; + } + + // If we are able to cancel the future, create a new one and update the ScheduleState so that it has + // an accurate accounting of which futures are outstanding; we must then also update the futureRef + // so that we can do this again the next time that the component is yielded. + if (scheduledFuture.cancel(false)) { + final long yieldNanos = TimeUnit.MILLISECONDS.toNanos(yieldMillis); + final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, yieldNanos, + connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + scheduleState.replaceFuture(scheduledFuture, newFuture); + futureRef.set(newFuture); + } + } else if ( shouldYield ) { + // Component itself didn't yield but there was no work to do, so the framework will choose + // to yield the component automatically for a short period of time. + final ScheduledFuture<?> scheduledFuture = futureRef.get(); + if ( scheduledFuture == null ) { + return; + } + + // If we are able to cancel the future, create a new one and update the ScheduleState so that it has + // an accurate accounting of which futures are outstanding; we must then also update the futureRef + // so that we can do this again the next time that the component is yielded. + if (scheduledFuture.cancel(false)) { + final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, NO_WORK_YIELD_NANOS, + connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + scheduleState.replaceFuture(scheduledFuture, newFuture); + futureRef.set(newFuture); + } + } + } + }; + + // Schedule the task to run + final ScheduledFuture<?> future = flowEngine.scheduleWithFixedDelay(yieldDetectionRunnable, 0L, + connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + + // now that we have the future, set the atomic reference so that if the component is yielded we + // are able to then cancel this future. + futureRef.set(future); + + // Keep track of the futures so that we can update the ScheduleState. futures.add(future); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4cc106a5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java index aca870b..408032c 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java @@ -16,16 +16,16 @@ */ package org.apache.nifi.controller.tasks; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.controller.repository.StandardProcessSessionFactory; -import org.apache.nifi.controller.scheduling.ConnectableProcessContext; import org.apache.nifi.controller.scheduling.ProcessContextFactory; import org.apache.nifi.controller.scheduling.ScheduleState; -import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.nar.NarCloseable; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.Connectables; @@ -33,28 +33,33 @@ import org.apache.nifi.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ContinuallyRunConnectableTask implements Runnable { +/** + * Continually runs a Connectable as long as the processor has work to do. {@link #call()} will return + * <code>true</code> if the Connectable should be yielded, <code>false</code> otherwise. + */ +public class ContinuallyRunConnectableTask implements Callable<Boolean> { private static final Logger logger = LoggerFactory.getLogger(ContinuallyRunConnectableTask.class); private final Connectable connectable; private final ScheduleState scheduleState; private final ProcessSessionFactory sessionFactory; - private final ConnectableProcessContext processContext; + private final ProcessContext processContext; - public ContinuallyRunConnectableTask(final ProcessContextFactory contextFactory, final Connectable connectable, final ScheduleState scheduleState, final StringEncryptor encryptor) { + public ContinuallyRunConnectableTask(final ProcessContextFactory contextFactory, final Connectable connectable, final ScheduleState scheduleState, final ProcessContext processContext) { this.connectable = connectable; this.scheduleState = scheduleState; this.sessionFactory = new StandardProcessSessionFactory(contextFactory.newProcessContext(connectable, new AtomicLong(0L))); - this.processContext = new ConnectableProcessContext(connectable, encryptor); + this.processContext = processContext; } - @SuppressWarnings("deprecation") @Override - public void run() { + @SuppressWarnings("deprecation") + public Boolean call() { if (!scheduleState.isScheduled()) { - return; + return false; } + // Connectable should run if the following conditions are met: // 1. It's an Input Port or or is a Remote Input Port or has incoming FlowFiles queued // 2. Any relationship is available (since there's only 1 @@ -62,8 +67,9 @@ public class ContinuallyRunConnectableTask implements Runnable { // it means the same thing) // 3. It is not yielded. final boolean triggerWhenEmpty = connectable.isTriggerWhenEmpty(); + boolean flowFilesQueued = true; final boolean shouldRun = (connectable.getYieldExpiration() < System.currentTimeMillis()) - && (triggerWhenEmpty || Connectables.flowFilesQueued(connectable)) && (connectable.getRelationships().isEmpty() || Connectables.anyRelationshipAvailable(connectable)); + && (triggerWhenEmpty || (flowFilesQueued = Connectables.flowFilesQueued(connectable))) && (connectable.getRelationships().isEmpty() || Connectables.anyRelationshipAvailable(connectable)); if (shouldRun) { scheduleState.incrementActiveThreadCount(); @@ -92,6 +98,12 @@ public class ContinuallyRunConnectableTask implements Runnable { scheduleState.decrementActiveThreadCount(); } + } else if (!flowFilesQueued) { + // FlowFiles must be queued in order to run but there are none queued; + // yield for just a bit. + return true; } + + return true; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4cc106a5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java index 33bd327..f4be855 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller.tasks; import java.io.IOException; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -31,7 +32,6 @@ import org.apache.nifi.controller.repository.StandardProcessSessionFactory; import org.apache.nifi.controller.scheduling.ProcessContextFactory; import org.apache.nifi.controller.scheduling.ScheduleState; import org.apache.nifi.controller.scheduling.SchedulingAgent; -import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.ProcessSessionFactory; @@ -43,7 +43,12 @@ import org.apache.nifi.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ContinuallyRunProcessorTask implements Runnable { + +/** + * Continually runs a processor as long as the processor has work to do. {@link #call()} will return + * <code>true</code> if the processor should be yielded, <code>false</code> otherwise. + */ +public class ContinuallyRunProcessorTask implements Callable<Boolean> { private static final Logger logger = LoggerFactory.getLogger(ContinuallyRunProcessorTask.class); @@ -56,7 +61,8 @@ public class ContinuallyRunProcessorTask implements Runnable { private final int numRelationships; public ContinuallyRunProcessorTask(final SchedulingAgent schedulingAgent, final ProcessorNode procNode, - final FlowController flowController, final ProcessContextFactory contextFactory, final ScheduleState scheduleState, final StringEncryptor encryptor) { + final FlowController flowController, final ProcessContextFactory contextFactory, final ScheduleState scheduleState, + final StandardProcessContext processContext) { this.schedulingAgent = schedulingAgent; this.procNode = procNode; @@ -65,28 +71,28 @@ public class ContinuallyRunProcessorTask implements Runnable { this.flowController = flowController; context = contextFactory.newProcessContext(procNode, new AtomicLong(0L)); - this.processContext = new StandardProcessContext(procNode, flowController, encryptor); + this.processContext = processContext; } - @SuppressWarnings("deprecation") @Override - public void run() { + @SuppressWarnings("deprecation") + public Boolean call() { // make sure processor is not yielded boolean shouldRun = (procNode.getYieldExpiration() < System.currentTimeMillis()); if (!shouldRun) { - return; + return false; } // make sure that either we're not clustered or this processor runs on all nodes or that this is the primary node shouldRun = !procNode.isIsolated() || !flowController.isClustered() || flowController.isPrimary(); if (!shouldRun) { - return; + return false; } // make sure that either proc has incoming FlowFiles or has no incoming connections or is annotated with @TriggerWhenEmpty shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode); if (!shouldRun) { - return; + return true; } if (numRelationships > 0) { @@ -109,7 +115,7 @@ public class ContinuallyRunProcessorTask implements Runnable { } if (!shouldRun) { - return; + return false; } scheduleState.incrementActiveThreadCount(); @@ -124,11 +130,11 @@ public class ContinuallyRunProcessorTask implements Runnable { invocationCount++; if (!batch) { - return; + return false; } if (System.nanoTime() > finishNanos) { - return; + return false; } shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode); @@ -180,6 +186,8 @@ public class ContinuallyRunProcessorTask implements Runnable { logger.error("", e); } } + + return false; } }