Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master beeb4d8ba -> 1ae252737


Wait for STOP effector to complete before unamanging BrooklynNode

Otherwise if there still are running tasks for the entity they will be canceled.


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/7cfba37f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/7cfba37f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/7cfba37f

Branch: refs/heads/master
Commit: 7cfba37f7b322ea137b18410785d8e2f282f5df0
Parents: 7234b83
Author: Svetoslav Neykov <[email protected]>
Authored: Wed Jan 28 16:31:57 2015 +0200
Committer: Svetoslav Neykov <[email protected]>
Committed: Thu Jan 29 15:56:38 2015 +0200

----------------------------------------------------------------------
 .../brooklyn/entity/basic/BrooklynTaskTags.java | 25 ++++++++++++++
 .../entity/brooklynnode/BrooklynNodeImpl.java   | 35 +++++++++++++++----
 .../entity/brooklynnode/BrooklynNodeTest.java   | 36 ++++++++++++++++++++
 3 files changed, 90 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7cfba37f/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java 
b/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java
index 8a9a21b..41b2371 100644
--- a/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java
+++ b/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java
@@ -46,6 +46,7 @@ import brooklyn.util.task.Tasks;
 import brooklyn.util.text.StringEscapes.BashStringEscapes;
 import brooklyn.util.text.Strings;
 
+import com.google.common.annotations.Beta;
 import com.google.common.base.Functions;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
@@ -346,6 +347,30 @@ public class BrooklynTaskTags extends TaskTags {
         return false;
     }
 
+    /**
+     * finds the task up the {@code child} hierarchy handling the {@code 
effector} call,
+     * returns null if one doesn't exist. 
+     */
+    @Beta
+    public static Task<?> getClosestEffectorTask(Task<?> child, Effector<?> 
effector) {
+        Task<?> t = child;
+        while (t != null) {
+            Set<Object> tags = t.getTags();
+            if (tags.contains(EFFECTOR_TAG)) {
+                for (Object tag: tags) {
+                    if (tag instanceof EffectorCallTag) {
+                        EffectorCallTag et = (EffectorCallTag) tag;
+                        if (effector != null && 
!et.getEffectorName().equals(effector.getName()))
+                            continue;
+                        return t;
+                    }
+                }
+            }
+            t = t.getSubmittedByTask();
+        }
+        return null;
+    }
+
     /** finds the first {@link EffectorCallTag} tag on this tag, or optionally 
on submitters, or null */
     public static EffectorCallTag getEffectorCallTag(Task<?> task, boolean 
recurse) {
         Task<?> t = task;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7cfba37f/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
 
b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
index 401bcd4..1f9b869 100644
--- 
a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
+++ 
b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
@@ -24,6 +24,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
+import javax.annotation.Nullable;
+
 import org.apache.http.HttpStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,11 +47,13 @@ import 
brooklyn.entity.brooklynnode.effector.SetHighAvailabilityModeEffectorBody
 import 
brooklyn.entity.brooklynnode.effector.SetHighAvailabilityPriorityEffectorBody;
 import brooklyn.entity.effector.EffectorBody;
 import brooklyn.entity.effector.Effectors;
+import brooklyn.entity.trait.Startable;
 import brooklyn.event.feed.ConfigToAttributes;
 import brooklyn.event.feed.http.HttpFeed;
 import brooklyn.event.feed.http.HttpPollConfig;
 import brooklyn.event.feed.http.HttpValueFunctions;
 import brooklyn.event.feed.http.JsonFunctions;
+import brooklyn.management.Task;
 import brooklyn.management.TaskAdaptable;
 import brooklyn.management.ha.ManagementNodeState;
 import brooklyn.util.collections.Jsonya;
@@ -67,6 +71,7 @@ import brooklyn.util.task.TaskTags;
 import brooklyn.util.task.Tasks;
 import brooklyn.util.text.Strings;
 import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -81,9 +86,25 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl 
implements BrooklynNod
         RendererHints.register(WEB_CONSOLE_URI, 
RendererHints.namedActionWithUrl());
     }
 
-    private class UnmanageThread extends Thread {
+    private static class UnmanageTask implements Runnable {
+        private Task<?> latchTask;
+        private Entity unmanageEntity;
+
+        public UnmanageTask(@Nullable Task<?> latchTask, Entity 
unmanageEntity) {
+            this.latchTask = latchTask;
+            this.unmanageEntity = unmanageEntity;
+        }
+
         public void run() {
-            Entities.unmanage(BrooklynNodeImpl.this);
+            if (latchTask != null) {
+                latchTask.blockUntilEnded();
+            } else {
+                log.debug("No latch task provided for UnmanageTask, falling 
back to fixed wait");
+                Time.sleep(Duration.FIVE_SECONDS);
+            }
+            synchronized (this) {
+                Entities.unmanage(unmanageEntity);
+            }
         }
     }
 
@@ -172,10 +193,12 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl 
implements BrooklynNod
         ConfigBag stopParameters = 
BrooklynTaskTags.getCurrentEffectorParameters();
         //unmanage only if stopping the machine
         if (stopParameters == null || 
stopParameters.get(StopSoftwareParameters.STOP_MACHINE)) {
-            //Don't unmanage in entity's task context as it will self-cancel 
the task.
-            //The external thread doesn't guarantee that the unmanage will be 
called *after* the stop effector completes.
-            //How to delay and make sure that we don't cancel the 
(almost-complete) stop effector?
-            new UnmanageThread().start();
+            // Don't unmanage in entity's task context as it will self-cancel 
the task. Wait for the stop effector to complete.
+            // If this is not enough (still getting Caused by: 
java.util.concurrent.CancellationException: null) then
+            // we could search for the top most task with entity context == 
this and wait on it. Even stronger would be
+            // to wait for 
BrooklynTaskTags.getTasksInEntityContext(ExecutionManager, this).isEmpty();
+            Task<?> stopEffectorTask = 
BrooklynTaskTags.getClosestEffectorTask(Tasks.current(), Startable.STOP);
+            getManagementContext().getExecutionManager().submit(new 
UnmanageTask(stopEffectorTask, this));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7cfba37f/software/base/src/test/java/brooklyn/entity/brooklynnode/BrooklynNodeTest.java
----------------------------------------------------------------------
diff --git 
a/software/base/src/test/java/brooklyn/entity/brooklynnode/BrooklynNodeTest.java
 
b/software/base/src/test/java/brooklyn/entity/brooklynnode/BrooklynNodeTest.java
index d1a9390..20d004a 100644
--- 
a/software/base/src/test/java/brooklyn/entity/brooklynnode/BrooklynNodeTest.java
+++ 
b/software/base/src/test/java/brooklyn/entity/brooklynnode/BrooklynNodeTest.java
@@ -18,6 +18,7 @@
  */
 package brooklyn.entity.brooklynnode;
 
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
 import java.util.List;
@@ -27,16 +28,22 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableMap;
+
 import brooklyn.entity.basic.Attributes;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.drivers.downloads.DownloadResolver;
 import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.trait.Startable;
 import brooklyn.event.feed.ConfigToAttributes;
 import brooklyn.location.Location;
 import brooklyn.location.basic.SshMachineLocation;
+import brooklyn.test.Asserts;
 import brooklyn.test.entity.TestApplication;
 import brooklyn.util.collections.MutableMap;
 import brooklyn.util.collections.MutableSet;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
 
 public class BrooklynNodeTest {
 
@@ -44,6 +51,19 @@ public class BrooklynNodeTest {
     
     private TestApplication app;
     private SshMachineLocation loc;
+    
+    public static class SlowStopBrooklynNode extends BrooklynNodeImpl {
+        public SlowStopBrooklynNode() {}
+        
+        @Override
+        protected void postStop() {
+            super.postStop();
+            
+            //Make sure UnmanageTask will wait for the STOP effector to 
complete.
+            Time.sleep(Duration.FIVE_SECONDS);
+        }
+        
+    }
 
     @BeforeMethod(alwaysRun=true)
     public void setUp() throws Exception {
@@ -87,6 +107,22 @@ public class BrooklynNodeTest {
         assertTrue(urls.contains(expectedUrl), "urls="+urls);
     }
     
+    @Test(groups = "Integration")
+    public void testUnmanageOnStop() throws Exception {
+        final BrooklynNode node = 
app.addChild(EntitySpec.create(BrooklynNode.class).impl(SlowStopBrooklynNode.class));
+        Entities.manage(node);
+        assertTrue(Entities.isManaged(node), "Entity " + node + " must be 
managed.");
+        node.invoke(Startable.STOP, 
ImmutableMap.<String,Object>of()).asTask().getUnchecked();
+        //The UnmanageTask will unblock after the STOP effector completes, so 
we are competing with it here.
+        Asserts.succeedsEventually(new Runnable() {
+            @Override
+            public void run() {
+                assertFalse(Entities.isManaged(node));
+            }
+        });
+    }
+    
+
     @Test
     public void testCanStartSameNode() throws Exception {
         // not very interesting as do not have REST when run in this project

Reply via email to