Repository: incubator-brooklyn Updated Branches: refs/heads/master beeb4d8ba -> 1ae252737
Wait for STOP effector to complete before unamanging BrooklynNode Otherwise if there still are running tasks for the entity they will be canceled. Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/7cfba37f Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/7cfba37f Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/7cfba37f Branch: refs/heads/master Commit: 7cfba37f7b322ea137b18410785d8e2f282f5df0 Parents: 7234b83 Author: Svetoslav Neykov <[email protected]> Authored: Wed Jan 28 16:31:57 2015 +0200 Committer: Svetoslav Neykov <[email protected]> Committed: Thu Jan 29 15:56:38 2015 +0200 ---------------------------------------------------------------------- .../brooklyn/entity/basic/BrooklynTaskTags.java | 25 ++++++++++++++ .../entity/brooklynnode/BrooklynNodeImpl.java | 35 +++++++++++++++---- .../entity/brooklynnode/BrooklynNodeTest.java | 36 ++++++++++++++++++++ 3 files changed, 90 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7cfba37f/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java b/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java index 8a9a21b..41b2371 100644 --- a/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java +++ b/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java @@ -46,6 +46,7 @@ import brooklyn.util.task.Tasks; import brooklyn.util.text.StringEscapes.BashStringEscapes; import brooklyn.util.text.Strings; +import com.google.common.annotations.Beta; import com.google.common.base.Functions; import com.google.common.base.Objects; import com.google.common.base.Preconditions; @@ -346,6 +347,30 @@ public class BrooklynTaskTags extends TaskTags { return false; } + /** + * finds the task up the {@code child} hierarchy handling the {@code effector} call, + * returns null if one doesn't exist. + */ + @Beta + public static Task<?> getClosestEffectorTask(Task<?> child, Effector<?> effector) { + Task<?> t = child; + while (t != null) { + Set<Object> tags = t.getTags(); + if (tags.contains(EFFECTOR_TAG)) { + for (Object tag: tags) { + if (tag instanceof EffectorCallTag) { + EffectorCallTag et = (EffectorCallTag) tag; + if (effector != null && !et.getEffectorName().equals(effector.getName())) + continue; + return t; + } + } + } + t = t.getSubmittedByTask(); + } + return null; + } + /** finds the first {@link EffectorCallTag} tag on this tag, or optionally on submitters, or null */ public static EffectorCallTag getEffectorCallTag(Task<?> task, boolean recurse) { Task<?> t = task; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7cfba37f/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java index 401bcd4..1f9b869 100644 --- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java +++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import javax.annotation.Nullable; + import org.apache.http.HttpStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,11 +47,13 @@ import brooklyn.entity.brooklynnode.effector.SetHighAvailabilityModeEffectorBody import brooklyn.entity.brooklynnode.effector.SetHighAvailabilityPriorityEffectorBody; import brooklyn.entity.effector.EffectorBody; import brooklyn.entity.effector.Effectors; +import brooklyn.entity.trait.Startable; import brooklyn.event.feed.ConfigToAttributes; import brooklyn.event.feed.http.HttpFeed; import brooklyn.event.feed.http.HttpPollConfig; import brooklyn.event.feed.http.HttpValueFunctions; import brooklyn.event.feed.http.JsonFunctions; +import brooklyn.management.Task; import brooklyn.management.TaskAdaptable; import brooklyn.management.ha.ManagementNodeState; import brooklyn.util.collections.Jsonya; @@ -67,6 +71,7 @@ import brooklyn.util.task.TaskTags; import brooklyn.util.task.Tasks; import brooklyn.util.text.Strings; import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -81,9 +86,25 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNod RendererHints.register(WEB_CONSOLE_URI, RendererHints.namedActionWithUrl()); } - private class UnmanageThread extends Thread { + private static class UnmanageTask implements Runnable { + private Task<?> latchTask; + private Entity unmanageEntity; + + public UnmanageTask(@Nullable Task<?> latchTask, Entity unmanageEntity) { + this.latchTask = latchTask; + this.unmanageEntity = unmanageEntity; + } + public void run() { - Entities.unmanage(BrooklynNodeImpl.this); + if (latchTask != null) { + latchTask.blockUntilEnded(); + } else { + log.debug("No latch task provided for UnmanageTask, falling back to fixed wait"); + Time.sleep(Duration.FIVE_SECONDS); + } + synchronized (this) { + Entities.unmanage(unmanageEntity); + } } } @@ -172,10 +193,12 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNod ConfigBag stopParameters = BrooklynTaskTags.getCurrentEffectorParameters(); //unmanage only if stopping the machine if (stopParameters == null || stopParameters.get(StopSoftwareParameters.STOP_MACHINE)) { - //Don't unmanage in entity's task context as it will self-cancel the task. - //The external thread doesn't guarantee that the unmanage will be called *after* the stop effector completes. - //How to delay and make sure that we don't cancel the (almost-complete) stop effector? - new UnmanageThread().start(); + // Don't unmanage in entity's task context as it will self-cancel the task. Wait for the stop effector to complete. + // If this is not enough (still getting Caused by: java.util.concurrent.CancellationException: null) then + // we could search for the top most task with entity context == this and wait on it. Even stronger would be + // to wait for BrooklynTaskTags.getTasksInEntityContext(ExecutionManager, this).isEmpty(); + Task<?> stopEffectorTask = BrooklynTaskTags.getClosestEffectorTask(Tasks.current(), Startable.STOP); + getManagementContext().getExecutionManager().submit(new UnmanageTask(stopEffectorTask, this)); } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7cfba37f/software/base/src/test/java/brooklyn/entity/brooklynnode/BrooklynNodeTest.java ---------------------------------------------------------------------- diff --git a/software/base/src/test/java/brooklyn/entity/brooklynnode/BrooklynNodeTest.java b/software/base/src/test/java/brooklyn/entity/brooklynnode/BrooklynNodeTest.java index d1a9390..20d004a 100644 --- a/software/base/src/test/java/brooklyn/entity/brooklynnode/BrooklynNodeTest.java +++ b/software/base/src/test/java/brooklyn/entity/brooklynnode/BrooklynNodeTest.java @@ -18,6 +18,7 @@ */ package brooklyn.entity.brooklynnode; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import java.util.List; @@ -27,16 +28,22 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import com.google.common.collect.ImmutableMap; + import brooklyn.entity.basic.Attributes; import brooklyn.entity.basic.Entities; import brooklyn.entity.drivers.downloads.DownloadResolver; import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.trait.Startable; import brooklyn.event.feed.ConfigToAttributes; import brooklyn.location.Location; import brooklyn.location.basic.SshMachineLocation; +import brooklyn.test.Asserts; import brooklyn.test.entity.TestApplication; import brooklyn.util.collections.MutableMap; import brooklyn.util.collections.MutableSet; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; public class BrooklynNodeTest { @@ -44,6 +51,19 @@ public class BrooklynNodeTest { private TestApplication app; private SshMachineLocation loc; + + public static class SlowStopBrooklynNode extends BrooklynNodeImpl { + public SlowStopBrooklynNode() {} + + @Override + protected void postStop() { + super.postStop(); + + //Make sure UnmanageTask will wait for the STOP effector to complete. + Time.sleep(Duration.FIVE_SECONDS); + } + + } @BeforeMethod(alwaysRun=true) public void setUp() throws Exception { @@ -87,6 +107,22 @@ public class BrooklynNodeTest { assertTrue(urls.contains(expectedUrl), "urls="+urls); } + @Test(groups = "Integration") + public void testUnmanageOnStop() throws Exception { + final BrooklynNode node = app.addChild(EntitySpec.create(BrooklynNode.class).impl(SlowStopBrooklynNode.class)); + Entities.manage(node); + assertTrue(Entities.isManaged(node), "Entity " + node + " must be managed."); + node.invoke(Startable.STOP, ImmutableMap.<String,Object>of()).asTask().getUnchecked(); + //The UnmanageTask will unblock after the STOP effector completes, so we are competing with it here. + Asserts.succeedsEventually(new Runnable() { + @Override + public void run() { + assertFalse(Entities.isManaged(node)); + } + }); + } + + @Test public void testCanStartSameNode() throws Exception { // not very interesting as do not have REST when run in this project
