Pollers set cancelOnException=false when scheduling tasks
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/5832a15f Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/5832a15f Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/5832a15f Branch: refs/heads/master Commit: 5832a15f2a30ba81f3b5b104b17795a5520b06af Parents: e0014b1 Author: Sam Corbett <[email protected]> Authored: Wed Nov 18 14:36:32 2015 +0000 Committer: Sam Corbett <[email protected]> Committed: Thu Nov 19 14:01:54 2015 +0000 ---------------------------------------------------------------------- .../org/apache/brooklyn/core/feed/Poller.java | 9 +- .../apache/brooklyn/core/feed/PollerTest.java | 191 ++++++++++++------- 2 files changed, 125 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/5832a15f/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java index d57f826..a9a34d9 100644 --- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java +++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java @@ -19,6 +19,7 @@ package org.apache.brooklyn.core.feed; import java.util.LinkedHashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; @@ -31,6 +32,7 @@ import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.task.DynamicSequentialTask; import org.apache.brooklyn.util.core.task.ScheduledTask; +import org.apache.brooklyn.util.core.task.TaskTags; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.time.Duration; import org.slf4j.Logger; @@ -155,8 +157,11 @@ public class Poller<V> { return task; } }; - ScheduledTask task = new ScheduledTask(MutableMap.of("period", pollJob.pollPeriod, "displayName", "scheduled:"+scheduleName), pollingTaskFactory); - tasks.add((ScheduledTask)Entities.submit(entity, task)); + Map<String, ?> taskFlags = MutableMap.of("displayName", "scheduled:" + scheduleName); + ScheduledTask task = new ScheduledTask(taskFlags, pollingTaskFactory) + .period(pollJob.pollPeriod) + .cancelOnException(false); + tasks.add(Entities.submit(entity, task)); } else { if (log.isDebugEnabled()) log.debug("Activating poll (but leaving off, as period {}) for {} (using {})", new Object[] {pollJob.pollPeriod, entity, this}); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/5832a15f/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java b/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java index 0f2c1ce..6495f566 100644 --- a/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/feed/PollerTest.java @@ -18,91 +18,136 @@ */ package org.apache.brooklyn.core.feed; -import static org.testng.Assert.assertTrue; - +import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntitySpec; -import org.apache.brooklyn.core.feed.PollHandler; -import org.apache.brooklyn.core.feed.Poller; +import org.apache.brooklyn.api.entity.ImplementedBy; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.entity.AbstractEntity; +import org.apache.brooklyn.core.entity.EntityAsserts; +import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; -import org.apache.brooklyn.core.test.entity.TestEntity; -import org.apache.brooklyn.test.Asserts; -import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.feed.function.FunctionFeed; +import org.apache.brooklyn.feed.function.FunctionPollConfig; import org.apache.brooklyn.util.core.task.DynamicTasks; -import org.apache.brooklyn.util.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.apache.brooklyn.util.core.task.Tasks; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import com.google.common.base.Functions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + public class PollerTest extends BrooklynAppUnitTestSupport { - private static final Logger LOG = LoggerFactory.getLogger(PollerTest.class); - - private TestEntity entity; - private Poller<Integer> poller; - - @BeforeMethod(alwaysRun=true) - @Override - public void setUp() throws Exception { - super.setUp(); - entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); - poller = new Poller<Integer>(entity, false); + @DataProvider(name = "specProvider") + public Object[][] specProvider() { + EntitySpec<FeedExceptionEntity> pollFailer = EntitySpec.create(FeedExceptionEntity.class) + .configure(FeedExceptionEntity.POLLER, new PollFailer()); + EntitySpec<FeedExceptionEntity> taskFailer = EntitySpec.create(FeedExceptionEntity.class) + .configure(FeedExceptionEntity.POLLER, new TaskFailer()); + return new Object[][]{{pollFailer}, {taskFailer}}; + } + + @Test(dataProvider = "specProvider") + public void testFeedContinuesWhenPollerThrows(EntitySpec<FeedExceptionEntity> spec) { + Map<?, ?> timeoutFlags = ImmutableMap.of("timeout", "100ms"); + FeedExceptionEntity fee = app.createAndManageChild(spec); + app.start(ImmutableList.of(app.newSimulatedLocation())); + EntityAsserts.assertAttributeEqualsEventually(timeoutFlags, fee, FeedExceptionEntity.FLAG, true); + + fee.startThrowingPollExceptions(); + EntityAsserts.assertAttributeEqualsEventually(timeoutFlags, fee, FeedExceptionEntity.FLAG, false); + EntityAsserts.assertAttributeEqualsContinually(timeoutFlags, fee, FeedExceptionEntity.FLAG, false); + + fee.stopThrowingPollExceptions(); + EntityAsserts.assertAttributeEqualsEventually(timeoutFlags, fee, FeedExceptionEntity.FLAG, true); + EntityAsserts.assertAttributeEqualsContinually(timeoutFlags, fee, FeedExceptionEntity.FLAG, true); } - - @AfterMethod(alwaysRun=true) - @Override - public void tearDown() throws Exception { - if (poller != null) poller.stop(); - super.tearDown(); + + @ImplementedBy(FeedExceptionEntityImpl.class) + public static interface FeedExceptionEntity extends Entity { + ConfigKey<ThrowingPoller> POLLER = ConfigKeys.newConfigKey(ThrowingPoller.class, "poller"); + AttributeSensor<Boolean> FLAG = Sensors.newBooleanSensor("flag"); + + void startThrowingPollExceptions(); + void stopThrowingPollExceptions(); + } + + public static class FeedExceptionEntityImpl extends AbstractEntity implements FeedExceptionEntity { + private ThrowingPoller poller; + + @Override + public void init() { + super.init(); + poller = config().get(POLLER); + FunctionFeed.builder() + .entity(this) + .period(1L) + .poll(new FunctionPollConfig<Boolean, Boolean>(FLAG) + .callable(poller) + .onException(Functions.constant(false))) + .build(); + } + + public void startThrowingPollExceptions() { + this.poller.setShouldThrow(true); + } + + public void stopThrowingPollExceptions() { + this.poller.setShouldThrow(false); + } } - - @Test(groups={"Integration", "WIP"}) // because takes > 1 second - public void testPollingSubTaskFailsOnceKeepsGoing() throws Exception { - final AtomicInteger counter = new AtomicInteger(); - poller.scheduleAtFixedRate( - new Callable<Integer>() { - @Override public Integer call() throws Exception { - int result = counter.incrementAndGet(); - if (result % 2 == 0) { - DynamicTasks.queue("in-poll", new Runnable() { - public void run() { - throw new IllegalStateException("Simulating error in sub-task for poll"); - }}); + + private static class TaskFailer extends ThrowingPoller { + public Boolean execute(final boolean shouldThrow) { + Task<Boolean> t = Tasks.<Boolean>builder() + .body(new Callable<Boolean>() { + @Override + public Boolean call() { + if (shouldThrow) { + throw new IllegalArgumentException("exception in feed task"); + } + return true; } - return result; - } - }, - new PollHandler<Integer>() { - @Override public boolean checkSuccess(Integer val) { - return true; - } - @Override public void onSuccess(Integer val) { - - } - @Override public void onFailure(Integer val) { - } - @Override - public void onException(Exception exception) { - LOG.info("Exception in test poller", exception); - } - @Override public String getDescription() { - return "mypollhandler"; - } - }, - new Duration(10, TimeUnit.MILLISECONDS)); - poller.start(); - - Asserts.succeedsContinually(MutableMap.of("timeout", 2*1000, "period", 500), new Runnable() { - int oldCounter = -1; - @Override public void run() { - assertTrue(counter.get() > oldCounter); - oldCounter = counter.get(); + }) + .build(); + return DynamicTasks.queueIfPossible(t).orSubmitAsync().asTask().getUnchecked(); + } + } + + private static class PollFailer extends ThrowingPoller { + public Boolean execute(final boolean shouldThrow) { + if (shouldThrow) { + throw new IllegalArgumentException("exception in poller"); + } + return true; + } + } + + private static abstract class ThrowingPoller implements Callable<Boolean> { + protected final Object throwLock = new Object[0]; + boolean shouldThrow = false; + + abstract Boolean execute(boolean shouldThrow); + + @Override + public Boolean call() throws Exception { + synchronized (throwLock) { + return execute(shouldThrow); } - }); + } + + public void setShouldThrow(boolean shouldThrow) { + synchronized (throwLock) { + this.shouldThrow = shouldThrow; + } + } } + }
