Don't try to re-submit DST secondary tasks if already executed Effectors invoked against an entity will be submitted twice - once in the entity's context and again in the TaskQueueingContext of the caller (see brooklyn.management.internal.LocalManagementContext.runAtEntity(Entity, TaskAdaptable<T>)). Usually it's not a problem because the ExecutionContext will notice it's already submitted and ignore it. But if the entity is unmanaged in the time being then the call to get the entity's ExecutionContext will fail.
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/160b3ca0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/160b3ca0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/160b3ca0 Branch: refs/heads/master Commit: 160b3ca0fc6fefd9ef5b6e6929c7d3594436cf09 Parents: 807e6d1 Author: Svetoslav Neykov <[email protected]> Authored: Tue Jul 7 17:24:24 2015 +0300 Committer: Svetoslav Neykov <[email protected]> Committed: Tue Jul 7 17:29:16 2015 +0300 ---------------------------------------------------------------------- .../entity/group/DynamicClusterImpl.java | 6 +- .../util/task/DynamicSequentialTask.java | 10 ++- .../entity/effector/EffectorTaskTest.java | 79 ++++++++++++++++++++ 3 files changed, 88 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/160b3ca0/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java index 447a4bb..0e2f164 100644 --- a/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java +++ b/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java @@ -874,11 +874,7 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus try { if (member instanceof Startable) { Task<?> task = member.invoke(Startable.STOP, Collections.<String,Object>emptyMap()); - try { - task.get(); - } catch (Exception e) { - throw Exceptions.propagate(e); - } + task.getUnchecked(); } } finally { Entities.unmanage(member); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/160b3ca0/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java b/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java index 967c831..7756388 100644 --- a/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java +++ b/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java @@ -209,12 +209,18 @@ public class DynamicSequentialTask<T> extends BasicTask<T> implements HasTaskChi throw new IllegalStateException(message); } synchronized (task) { - if (task.isSubmitted() && !task.isDone()) { + if (task.isSubmitted()) { if (log.isTraceEnabled()) { log.trace("DST "+this+" skipping submission of child "+task+" because it is already submitted"); } } else { - ec.submit(task); + try { + ec.submit(task); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + // Give some context when the submit fails (happens when the target is already unmanaged) + throw new IllegalStateException("Failure submitting task "+task+" in "+this+": "+e.getMessage(), e); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/160b3ca0/core/src/test/java/brooklyn/entity/effector/EffectorTaskTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/entity/effector/EffectorTaskTest.java b/core/src/test/java/brooklyn/entity/effector/EffectorTaskTest.java index e9584db..bc7677c 100644 --- a/core/src/test/java/brooklyn/entity/effector/EffectorTaskTest.java +++ b/core/src/test/java/brooklyn/entity/effector/EffectorTaskTest.java @@ -19,6 +19,7 @@ package brooklyn.entity.effector; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; import org.testng.Assert; import org.testng.annotations.Test; @@ -28,6 +29,7 @@ import brooklyn.entity.Effector; import brooklyn.entity.Entity; import brooklyn.entity.basic.AbstractEntity; import brooklyn.entity.basic.BrooklynTaskTags; +import brooklyn.entity.basic.Entities; import brooklyn.entity.basic.EntityInternal; import brooklyn.entity.effector.EffectorTasks.EffectorTaskFactory; import brooklyn.entity.proxying.EntitySpec; @@ -37,12 +39,14 @@ import brooklyn.management.Task; import brooklyn.test.entity.TestEntity; import brooklyn.util.collections.MutableMap; import brooklyn.util.config.ConfigBag; +import brooklyn.util.exceptions.Exceptions; import brooklyn.util.task.DynamicSequentialTask; import brooklyn.util.task.DynamicTasks; import brooklyn.util.task.TaskBuilder; import brooklyn.util.task.Tasks; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; public class EffectorTaskTest extends BrooklynAppUnitTestSupport { @@ -53,6 +57,7 @@ public class EffectorTaskTest extends BrooklynAppUnitTestSupport { .description("doubles the given number") .parameter(Integer.class, "numberToDouble") .impl(new EffectorBody<Integer>() { + @Override public Integer call(ConfigBag parameters) { // do a sanity check Assert.assertNotNull(entity()); @@ -352,4 +357,78 @@ public class EffectorTaskTest extends BrooklynAppUnitTestSupport { Assert.assertEquals(doubler.invoke(DoublingEntity.DOUBLE, MutableMap.of("numberToDouble", 3, "numberToStartWith", 3)).get(), (Integer)7); } + + public static final Effector<Void> DUMMY = Effectors.effector(Void.class, "dummy") + .impl(new EffectorBody<Void>() { + @Override + public Void call(ConfigBag parameters) { + return null; + } + }) + .build(); + + public static final Effector<Void> STALL = Effectors.effector(Void.class, "stall") + .parameter(AtomicBoolean.class, "lock") + .impl(new EffectorBody<Void>() { + @Override + public Void call(ConfigBag parameters) { + AtomicBoolean lock = (AtomicBoolean)parameters.getStringKey("lock"); + synchronized(lock) { + if (!lock.get()) { + try { + lock.wait(); + } catch (InterruptedException e) { + Exceptions.propagate(e); + } + } + } + return null; + } + }) + .build(); + + public static final Effector<Void> CONTEXT = Effectors.effector(Void.class, "stall_caller") + .parameter(AtomicBoolean.class, "lock") + .impl(new EffectorBody<Void>() { + @Override + public Void call(ConfigBag parameters) { + Entity child = Iterables.getOnlyElement(entity().getChildren()); + AtomicBoolean lock = new AtomicBoolean(); + Task<Void> dummyTask = null; + + try { + // Queue a (DST secondary) task which waits until notified, so that tasks queued later will get blocked + queue(Effectors.invocation(entity(), STALL, ImmutableMap.of("lock", lock))); + + // Start a new task - submitted directly to child's ExecutionContext, as well as added as a + // DST secondary of the current effector. + dummyTask = child.invoke(DUMMY, ImmutableMap.<String, Object>of()); + dummyTask.getUnchecked(); + + // Execution completed in the child's ExecutionContext, but still queued as a secondary. + // Destroy the child entity so that no subsequent tasks can be executed in its context. + Entities.destroy(child); + } finally { + // Let STALL complete + synchronized(lock) { + lock.set(true); + lock.notifyAll(); + } + // At this point DUMMY will be unblocked and the DST will try to execute it as a secondary. + // Submission will be ignored because DUMMY already executed. + // If it's not ignored then submission will fail because entity is already unmanaged. + } + return null; + } + }) + .build(); + + + @Test + public void testNestedEffectorExecutedAsSecondaryTask() throws Exception { + app.createAndManageChild(EntitySpec.create(TestEntity.class)); + Task<Void> effTask = app.invoke(CONTEXT, ImmutableMap.<String, Object>of()); + effTask.get(); + } + }
