Adds cancelOnException to ScheduledTask
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/e0014b14 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/e0014b14 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/e0014b14 Branch: refs/heads/master Commit: e0014b14062ce40fd0269ea5c362a707240ab324 Parents: 26773c0 Author: Sam Corbett <[email protected]> Authored: Wed Nov 18 12:47:44 2015 +0000 Committer: Sam Corbett <[email protected]> Committed: Thu Nov 19 14:01:52 2015 +0000 ---------------------------------------------------------------------- .../util/core/task/BasicExecutionManager.java | 50 +++++++++++++----- .../brooklyn/util/core/task/ScheduledTask.java | 54 +++++++++++++++----- .../util/core/task/ScheduledExecutionTest.java | 48 +++++++++++++++-- 3 files changed, 123 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e0014b14/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java index 4c9858a..02277a1 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java @@ -398,7 +398,7 @@ public class BasicExecutionManager implements ExecutionManager { final Callable<?> oldJob = taskScheduled.getJob(); final TaskInternal<?> taskScheduledF = taskScheduled; taskScheduled.setJob(new Callable() { public Object call() { - boolean resubmitted = false; + boolean shouldResubmit = true; task.recentRun = taskScheduledF; try { synchronized (task) { @@ -407,25 +407,19 @@ public class BasicExecutionManager implements ExecutionManager { Object result; try { result = oldJob.call(); + task.lastThrownType = null; } catch (Exception e) { - if (!Tasks.isInterrupted()) { - log.warn("Error executing "+oldJob+" (scheduled job of "+task+" - "+task.getDescription()+"); cancelling scheduled execution", e); - } else { - log.debug("Interrupted executing "+oldJob+" (scheduled job of "+task+" - "+task.getDescription()+"); cancelling scheduled execution: "+e); - } + shouldResubmit = shouldResubmitOnException(oldJob, e); throw Exceptions.propagate(e); } - task.runCount++; - if (task.period!=null && !task.isCancelled()) { - task.delay = task.period; - submitSubsequentScheduledTask(flags, task); - resubmitted = true; - } return result; } finally { // do in finally block in case we were interrupted - if (!resubmitted) + if (shouldResubmit) { + resubmit(); + } else { afterEndScheduledTaskAllIterations(flags, task); + } } }}); task.nextRun = taskScheduled; @@ -437,6 +431,36 @@ public class BasicExecutionManager implements ExecutionManager { } } + private void resubmit() { + task.runCount++; + if (task.period!=null && !task.isCancelled()) { + task.delay = task.period; + submitSubsequentScheduledTask(flags, task); + } + } + + private boolean shouldResubmitOnException(Callable<?> oldJob, Exception e) { + String message = "Error executing " + oldJob + " (scheduled job of " + task + " - " + task.getDescription() + ")"; + if (Tasks.isInterrupted()) { + log.debug(message + "; cancelling scheduled execution: " + e); + return false; + } else if (task.cancelOnException) { + log.warn(message + "; cancelling scheduled execution.", e); + return false; + } else { + message += "; resubmitting task and throwing: " + e; + if (!e.getClass().equals(task.lastThrownType)) { + task.lastThrownType = e.getClass(); + message += " (logging subsequent exceptions at trace)"; + log.debug(message); + } else { + message += " (repeat exception)"; + log.trace(message); + } + return true; + } + } + @Override public String toString() { return "ScheduledTaskCallable["+task+","+flags+"]"; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e0014b14/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java index 94327a1..c1ad4f8 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java @@ -46,17 +46,32 @@ import com.google.common.base.Throwables; public class ScheduledTask extends BasicTask { final Callable<Task<?>> taskFactory; - /** initial delay before running, set as flag in constructor; defaults to 0 */ + + /** + * Initial delay before running, set as flag in constructor; defaults to 0 + */ protected Duration delay; - /** time to wait between executions, or null if not to repeat (default), set as flag to constructor; + + /** + * The time to wait between executions, or null if not to repeat (default), set as flag to constructor; * this may be modified for subsequent submissions by a running task generated by the factory - * using getSubmittedByTask().setPeriod(Duration) */ + * using {@link #getSubmittedByTask().setPeriod(Duration)} + */ protected Duration period = null; - /** optional, set as flag in constructor; defaults to null meaning no limit */ + + /** + * Optional, set as flag in constructor; defaults to null meaning no limit. + */ protected Integer maxIterations = null; - + + /** + * Set false if the task should be rescheduled after throwing an exception; defaults to true. + */ + protected boolean cancelOnException = true; + protected int runCount=0; protected Task<?> recentRun, nextRun; + Class<? extends Exception> lastThrownType; public int getRunCount() { return runCount; } public ScheduledFuture<?> getNextScheduled() { return (ScheduledFuture<?>)internalFuture; } @@ -84,12 +99,15 @@ public class ScheduledTask extends BasicTask { delay = Duration.of(elvis(flags.remove("delay"), 0)); period = Duration.of(elvis(flags.remove("period"), null)); maxIterations = elvis(flags.remove("maxIterations"), null); + Object cancelFlag = flags.remove("cancelOnException"); + cancelOnException = cancelFlag == null || Boolean.TRUE.equals(cancelFlag); } public ScheduledTask delay(Duration d) { this.delay = d; return this; } + public ScheduledTask delay(long val) { return delay(Duration.millis(val)); } @@ -98,6 +116,7 @@ public class ScheduledTask extends BasicTask { this.period = d; return this; } + public ScheduledTask period(long val) { return period(Duration.millis(val)); } @@ -107,6 +126,11 @@ public class ScheduledTask extends BasicTask { return this; } + public ScheduledTask cancelOnException(boolean cancel) { + this.cancelOnException = cancel; + return this; + } + public Callable<Task<?>> getTaskFactory() { return taskFactory; } @@ -121,13 +145,17 @@ public class ScheduledTask extends BasicTask { protected String getActiveTaskStatusString(int verbosity) { StringBuilder rv = new StringBuilder("Scheduler"); - if (runCount>0) rv.append(", iteration "+(runCount+1)); - if (recentRun!=null) rv.append(", last run "+ - Duration.sinceUtc(recentRun.getStartTimeUtc())+" ms ago"); + if (runCount > 0) { + rv.append(", iteration ").append(runCount + 1); + } + if (recentRun != null) { + Duration start = Duration.sinceUtc(recentRun.getStartTimeUtc()); + rv.append(", last run ").append(start).append(" ago"); + } if (truth(getNextScheduled())) { Duration untilNext = Duration.millis(getNextScheduled().getDelay(TimeUnit.MILLISECONDS)); if (untilNext.isPositive()) - rv.append(", next in "+untilNext); + rv.append(", next in ").append(untilNext); else rv.append(", next imminent"); } @@ -158,7 +186,7 @@ public class ScheduledTask extends BasicTask { while (!isDone()) super.blockUntilEnded(); } - /** gets the value of the most recently run task */ + /** @return The value of the most recently run task */ public Object get() throws InterruptedException, ExecutionException { blockUntilStarted(); blockUntilFirstScheduleStarted(); @@ -175,8 +203,10 @@ public class ScheduledTask extends BasicTask { return result; } - /** internal method used to allow callers to wait for underlying tasks to finished in the case of cancellation - * @param duration */ + /** + * Internal method used to allow callers to wait for underlying tasks to finished in the case of cancellation. + * @param timeout maximum time to wait + */ @Beta public boolean blockUntilNextRunFinished(Duration timeout) { return Tasks.blockUntilInternalTasksEnded(nextRun, timeout); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/e0014b14/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java index 6884040..1d551e8 100644 --- a/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java +++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java @@ -32,10 +32,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.test.Asserts; import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.core.task.BasicExecutionManager; -import org.apache.brooklyn.util.core.task.BasicTask; -import org.apache.brooklyn.util.core.task.ScheduledTask; -import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; import org.apache.brooklyn.util.javalang.JavaClassNames; @@ -81,6 +77,50 @@ public class ScheduledExecutionTest { assertEquals(i.get(), 5); } + @Test + public void testScheduledTaskCancelledIfExceptionThrown() throws Exception { + BasicExecutionManager m = new BasicExecutionManager("mycontextid"); + final AtomicInteger calls = new AtomicInteger(0); + ScheduledTask t = new ScheduledTask(MutableMap.of("period", Duration.ONE_MILLISECOND, "maxIterations", 5), new Callable<Task<?>>() { + public Task<?> call() throws Exception { + return new BasicTask<>(new Callable<Integer>() { + public Integer call() { + calls.incrementAndGet(); + throw new RuntimeException("boo"); + }}); + }}); + + m.submit(t); + Runnable callsIsOne = new Runnable() { + @Override public void run() { + if (calls.get() != 1) { + throw new RuntimeException("not yet"); + } + } + + }; + Asserts.succeedsEventually(callsIsOne); + Asserts.succeedsContinually(callsIsOne); + } + + @Test + public void testScheduledTaskResubmittedIfExceptionThrownAndCancelOnExceptionFalse() { + BasicExecutionManager m = new BasicExecutionManager("mycontextid"); + final AtomicInteger calls = new AtomicInteger(0); + ScheduledTask t = new ScheduledTask(MutableMap.of("period", Duration.ONE_MILLISECOND, "maxIterations", 5, "cancelOnException", false), new Callable<Task<?>>() { + public Task<?> call() throws Exception { + return new BasicTask<>(new Callable<Integer>() { + public Integer call() { + calls.incrementAndGet(); + throw new RuntimeException("boo"); + }}); + }}); + + m.submit(t); + t.blockUntilEnded(); + assertEquals(calls.get(), 5, "Expected task to be resubmitted despite throwing an exception"); + } + /** like testScheduledTask but the loop is terminated by the task itself adjusting the period */ @Test public void testScheduledTaskSelfEnding() throws Exception {
