Fix STOP effector for BrooklynNode

The shutdown effector was setting a STOP state which prevented the machine from 
being released.
Changes:
  * Convert the shutdown effector as a utility functionality - just call the 
remote shutdown endpoint with the passed parameters
  * Store effector parameters in the EffectorCallTag, methods calls to fetch 
them. Passing the parameters along the call chain would cause backwards 
incompatible changes.
  * Pass stop parameters to the shutdown effector so that it's functionality 
can be fine tuned by calling stop directly
  * Don't unamange the node if the machine is not released.
  * Add logic to wait for the graceful shutdown of the process before killing 
it forcibly (the java process will take some time to shut down cleanly after 
closing the rest connection)


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/17afcce6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/17afcce6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/17afcce6

Branch: refs/heads/master
Commit: 17afcce60e1f9e5c810a5088dcec355f68471a41
Parents: c2d91a1
Author: Svetoslav Neykov <[email protected]>
Authored: Fri Dec 19 16:06:58 2014 +0200
Committer: Svetoslav Neykov <[email protected]>
Committed: Sat Jan 17 00:16:58 2015 +0200

----------------------------------------------------------------------
 .../brooklyn/entity/basic/BrooklynTaskTags.java |  33 +++++-
 .../management/internal/EffectorUtils.java      |   2 +-
 .../entity/brooklynnode/BrooklynNodeImpl.java   | 115 +++++++++++--------
 .../BrooklynNodeUpgradeEffectorBody.java        |   7 +-
 4 files changed, 101 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/17afcce6/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java 
b/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java
index 5c92b43..8a9a21b 100644
--- a/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java
+++ b/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java
@@ -37,6 +37,7 @@ import brooklyn.entity.Entity;
 import brooklyn.management.ExecutionManager;
 import brooklyn.management.Task;
 import brooklyn.management.entitlement.EntitlementContext;
+import brooklyn.util.config.ConfigBag;
 import brooklyn.util.guava.Maybe;
 import brooklyn.util.javalang.MemoryUsageTracker;
 import brooklyn.util.stream.Streams;
@@ -274,9 +275,11 @@ public class BrooklynTaskTags extends TaskTags {
     public static class EffectorCallTag {
         protected final String entityId;
         protected final String effectorName;
-        protected EffectorCallTag(String entityId, String effectorName) {
+        protected transient ConfigBag parameters;
+        protected EffectorCallTag(String entityId, String effectorName, 
ConfigBag parameters) {
             this.entityId = checkNotNull(entityId, "entityId");
             this.effectorName = checkNotNull(effectorName, "effectorName");
+            this.parameters = parameters;
         }
         public String toString() {
             return EFFECTOR_TAG+"@"+entityId+":"+effectorName;
@@ -300,10 +303,16 @@ public class BrooklynTaskTags extends TaskTags {
         public String getEffectorName() {
             return effectorName;
         }
+        public ConfigBag getParameters() {
+            return parameters;
+        }
+        public void setParameters(ConfigBag parameters) {
+            this.parameters = parameters;
+        }
     }
     
-    public static EffectorCallTag tagForEffectorCall(Entity entity, String 
effectorName) {
-        return new EffectorCallTag(entity.getId(), effectorName);
+    public static EffectorCallTag tagForEffectorCall(Entity entity, String 
effectorName, ConfigBag parameters) {
+        return new EffectorCallTag(entity.getId(), effectorName, parameters);
     }
     
     /**
@@ -341,7 +350,7 @@ public class BrooklynTaskTags extends TaskTags {
     public static EffectorCallTag getEffectorCallTag(Task<?> task, boolean 
recurse) {
         Task<?> t = task;
         while (t!=null) {
-            for (Object tag: task.getTags()) {
+            for (Object tag: t.getTags()) {
                 if (tag instanceof EffectorCallTag)
                     return (EffectorCallTag)tag;
             }
@@ -357,7 +366,23 @@ public class BrooklynTaskTags extends TaskTags {
         EffectorCallTag result = getEffectorCallTag(task, true);
         return (result == null) ? null : result.getEffectorName();
     }
+
+    public static ConfigBag getEffectorParameters(Task<?> task) {
+        EffectorCallTag result = getEffectorCallTag(task, true);
+        return (result == null) ? null : result.getParameters();
+    }
+
+    public static ConfigBag getCurrentEffectorParameters() {
+        return getEffectorParameters(Tasks.current());
+    }
     
+    public static void setEffectorParameters(Task<?> task, ConfigBag 
parameters) {
+        EffectorCallTag result = getEffectorCallTag(task, true);
+        if (result == null) {
+            throw new IllegalStateException("No EffectorCallTag found, is the 
task an effector? Task: " + task);
+        }
+        result.setParameters(parameters);
+    }
     // ---------------- entitlement tags ----------------
     
     public static class EntitlementTag {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/17afcce6/core/src/main/java/brooklyn/management/internal/EffectorUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/management/internal/EffectorUtils.java 
b/core/src/main/java/brooklyn/management/internal/EffectorUtils.java
index f2a3271..d55f3a8 100644
--- a/core/src/main/java/brooklyn/management/internal/EffectorUtils.java
+++ b/core/src/main/java/brooklyn/management/internal/EffectorUtils.java
@@ -340,7 +340,7 @@ public class EffectorUtils {
                 .put("displayName", effector.getName())
                 .put("tags", MutableList.of(
                         BrooklynTaskTags.EFFECTOR_TAG, 
-                        BrooklynTaskTags.tagForEffectorCall(entity, 
effector.getName()), 
+                        BrooklynTaskTags.tagForEffectorCall(entity, 
effector.getName(), parameters), 
                         BrooklynTaskTags.tagForTargetEntity(entity)))
                 .build();
     }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/17afcce6/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
 
b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
index 4b94c4f..401bcd4 100644
--- 
a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
+++ 
b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
@@ -22,6 +22,7 @@ import java.net.InetAddress;
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 import org.apache.http.HttpStatus;
 import org.slf4j.Logger;
@@ -33,8 +34,8 @@ import brooklyn.enricher.Enrichers;
 import brooklyn.entity.Effector;
 import brooklyn.entity.Entity;
 import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.BrooklynTaskTags;
 import brooklyn.entity.basic.Entities;
-import brooklyn.entity.basic.EntityPredicates;
 import brooklyn.entity.basic.Lifecycle;
 import brooklyn.entity.basic.ServiceStateLogic;
 import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic;
@@ -121,7 +122,9 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl 
implements BrooklynNod
     @Override
     protected void preStopConfirmCustom() {
         super.preStopConfirmCustom();
-        if 
(Boolean.TRUE.equals(getAttribute(BrooklynNode.WEB_CONSOLE_ACCESSIBLE))) {
+        ConfigBag stopParameters = 
BrooklynTaskTags.getCurrentEffectorParameters();
+        if 
(Boolean.TRUE.equals(getAttribute(BrooklynNode.WEB_CONSOLE_ACCESSIBLE)) &&
+                stopParameters != null && 
!stopParameters.containsKey(ShutdownEffector.STOP_APPS_FIRST)) {
             Preconditions.checkState(getChildren().isEmpty(), "Can't stop 
instance with running applications.");
         }
     }
@@ -133,19 +136,63 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl 
implements BrooklynNod
         // Shutdown only if accessible: any of stop_* could have already been 
called.
         // Don't check serviceUp=true because stop() will already have set 
serviceUp=false && expectedState=stopping
         if 
(Boolean.TRUE.equals(getAttribute(BrooklynNode.WEB_CONSOLE_ACCESSIBLE))) {
-            DynamicTasks.queue(Effectors.invocation(this, SHUTDOWN, 
MutableMap.of(ShutdownEffector.REQUEST_TIMEOUT, Duration.ONE_MINUTE)));
+            queueShutdownTask();
+            queueWaitExitTask();
         } else {
             log.info("Skipping children.isEmpty check and shutdown call, 
because web-console not up for {}", this);
         }
     }
     
+    private void queueWaitExitTask() {
+        //give time to the process to die gracefully after closing the 
shutdown call
+        DynamicTasks.queue(Tasks.builder().name("wait for graceful 
stop").body(new Runnable() {
+            @Override
+            public void run() {
+                DynamicTasks.markInessential();
+                boolean cleanExit = Repeater.create()
+                    .until(new Callable<Boolean>() {
+                        @Override
+                        public Boolean call() throws Exception {
+                            return !getDriver().isRunning();
+                        }
+                    })
+                    .backoffTo(Duration.ONE_SECOND)
+                    .limitTimeTo(Duration.ONE_MINUTE)
+                    .run();
+                if (!cleanExit) {
+                    log.warn("Tenant " + this + " didn't stop cleanly after 
shutdown. Timeout waiting for process exit.");
+                }
+            }
+        }).build());
+    }
+
     @Override
     protected void postStop() {
         super.postStop();
-        //Don't unmanage in entity's task context as it will self-cancel the 
task.
-        //The external thread doesn't guarantee that the unmanage will be 
called *after* the stop effector completes.
-        //How to delay and make sure that we don't cancel the 
(almost-complete) stop effector?
-        new UnmanageThread().start();
+        ConfigBag stopParameters = 
BrooklynTaskTags.getCurrentEffectorParameters();
+        //unmanage only if stopping the machine
+        if (stopParameters == null || 
stopParameters.get(StopSoftwareParameters.STOP_MACHINE)) {
+            //Don't unmanage in entity's task context as it will self-cancel 
the task.
+            //The external thread doesn't guarantee that the unmanage will be 
called *after* the stop effector completes.
+            //How to delay and make sure that we don't cancel the 
(almost-complete) stop effector?
+            new UnmanageThread().start();
+        }
+    }
+
+    private void queueShutdownTask() {
+        ConfigBag stopParameters = 
BrooklynTaskTags.getCurrentEffectorParameters();
+        ConfigBag shutdownParameters;
+        if (stopParameters != null) {
+            shutdownParameters = ConfigBag.newInstanceCopying(stopParameters);
+        } else {
+            shutdownParameters = ConfigBag.newInstance();
+        }
+        shutdownParameters.putIfAbsent(ShutdownEffector.REQUEST_TIMEOUT, 
Duration.ONE_MINUTE);
+        
shutdownParameters.putIfAbsent(ShutdownEffector.FORCE_SHUTDOWN_ON_ERROR, 
Boolean.TRUE);
+        TaskAdaptable<Void> shutdownTask = Effectors.invocation(this, 
SHUTDOWN, shutdownParameters);
+        //Mark inessential so that even if it fails the process stop task will 
run afterwards to clean up.
+        TaskTags.markInessential(shutdownTask);
+        DynamicTasks.queue(shutdownTask);
     }
 
     public static class DeployBlueprintEffectorBody extends 
EffectorBody<String> implements DeployBlueprintEffector {
@@ -238,15 +285,9 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl 
implements BrooklynNod
                 }
             } catch (Exception e) {
                 Exceptions.propagateIfFatal(e);
-                ServiceStateLogic.setExpectedState(entity(), 
Lifecycle.ON_FIRE);
-                if (initialState!=Lifecycle.RUNNING) {
-                    // ignore failure in this task if the node is not 
currently running
-                    Tasks.markInessential();
-                }
                 throw new PropagatedRuntimeException("Error shutting down 
remote node "+entity()+" (in state "+initialState+"): 
"+Exceptions.collapseText(e), e);
             }
             ServiceNotUpLogic.updateNotUpIndicator(entity(), 
SHUTDOWN.getName(), "Shutdown of remote node has completed successfuly");
-            ServiceStateLogic.setExpectedState(entity(), Lifecycle.STOPPED);
             return null;
         }
 
@@ -266,20 +307,12 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl 
implements BrooklynNod
         @Override
         public Void call(ConfigBag parameters) {
             Duration timeout = parameters.get(TIMEOUT);
-            MutableMap<?, ?> params = MutableMap.of(
-                    ShutdownEffector.STOP_APPS_FIRST, Boolean.FALSE,
-                    ShutdownEffector.SHUTDOWN_TIMEOUT, timeout,
-                    ShutdownEffector.REQUEST_TIMEOUT, timeout);
-            Entity entity = entity();
-            TaskAdaptable<Void> shutdownTask = Effectors.invocation(entity, 
SHUTDOWN, params);
-            if (!entity.getAttribute(SERVICE_UP)) {
-                TaskTags.markInessential(shutdownTask);
-            }
-            DynamicTasks.queue(shutdownTask).asTask().getUnchecked();
-            waitForShutdown(entity, Duration.ONE_MINUTE);
-            
-            DynamicTasks.queue(Effectors.invocation(entity(), STOP, 
ConfigBag.EMPTY)).asTask().getUnchecked();
-            Entities.destroy(entity);
+
+            ConfigBag stopParameters = 
ConfigBag.newInstanceCopying(parameters);
+            stopParameters.put(ShutdownEffector.STOP_APPS_FIRST, 
Boolean.FALSE);
+            stopParameters.putIfAbsent(ShutdownEffector.SHUTDOWN_TIMEOUT, 
timeout);
+            stopParameters.putIfAbsent(ShutdownEffector.REQUEST_TIMEOUT, 
timeout);
+            DynamicTasks.queue(Effectors.invocation(entity(), STOP, 
stopParameters)).asTask().getUnchecked();
             return null;
         }
     }
@@ -290,32 +323,16 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl 
implements BrooklynNod
         @Override
         public Void call(ConfigBag parameters) {
             Duration timeout = parameters.get(TIMEOUT);
-            MutableMap<?, ?> params = MutableMap.of(
-                    ShutdownEffector.STOP_APPS_FIRST, Boolean.TRUE,
-                    ShutdownEffector.SHUTDOWN_TIMEOUT, timeout,
-                    ShutdownEffector.REQUEST_TIMEOUT, timeout);
-            Entity entity = entity();
-            TaskAdaptable<Void> shutdownTask = Effectors.invocation(entity, 
SHUTDOWN, params);
-            if (!entity.getAttribute(SERVICE_UP)) {
-                TaskTags.markInessential(shutdownTask);
-            }
-            DynamicTasks.queue(shutdownTask).asTask().getUnchecked();
-            waitForShutdown(entity, Duration.ONE_MINUTE);
-            
-            DynamicTasks.queue(Effectors.invocation(entity(), STOP, 
ConfigBag.EMPTY)).asTask().getUnchecked();
-            Entities.destroy(entity);
+
+            ConfigBag stopParameters = 
ConfigBag.newInstanceCopying(parameters);
+            stopParameters.put(ShutdownEffector.STOP_APPS_FIRST, Boolean.TRUE);
+            stopParameters.putIfAbsent(ShutdownEffector.SHUTDOWN_TIMEOUT, 
timeout);
+            stopParameters.putIfAbsent(ShutdownEffector.REQUEST_TIMEOUT, 
timeout);
+            DynamicTasks.queue(Effectors.invocation(entity(), STOP, 
stopParameters)).asTask().getUnchecked();
             return null;
         }
     }
 
-    protected static void waitForShutdown(Entity entity, Duration timeout) {
-        Repeater.create()
-            .until(entity, 
EntityPredicates.attributeEqualTo(BrooklynNode.WEB_CONSOLE_ACCESSIBLE, false))
-            .backoffTo(Duration.FIVE_SECONDS)
-            .limitTimeTo(timeout)
-            .runRequiringTrue();
-    }
-
     public List<String> getClasspath() {
         List<String> classpath = getConfig(CLASSPATH);
         if (classpath == null || classpath.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/17afcce6/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
----------------------------------------------------------------------
diff --git 
a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
 
b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
index 37ded07..265b3f7 100644
--- 
a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
+++ 
b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
@@ -30,6 +30,7 @@ import brooklyn.entity.basic.Entities;
 import brooklyn.entity.basic.EntityInternal;
 import brooklyn.entity.basic.EntityTasks;
 import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.basic.SoftwareProcess.StopSoftwareParameters;
 import brooklyn.entity.brooklynnode.BrooklynCluster;
 import brooklyn.entity.brooklynnode.BrooklynNode;
 import brooklyn.entity.brooklynnode.BrooklynNodeDriver;
@@ -132,9 +133,11 @@ public class BrooklynNodeUpgradeEffectorBody extends 
EffectorBody<Void> {
 
         // 3 stop new version
         // 4 stop old version
+        ConfigBag stopParameters = ConfigBag.newInstance();
+        stopParameters.put(StopSoftwareParameters.STOP_MACHINE, Boolean.FALSE);
         DynamicTasks.queue(Tasks.builder().name("shutdown original and 
transient nodes")
-            .add(Effectors.invocation(dryRunChild, BrooklynNode.SHUTDOWN, 
ConfigBag.EMPTY))
-            .add(Effectors.invocation(entity(), BrooklynNode.SHUTDOWN, 
ConfigBag.EMPTY))
+            .add(Effectors.invocation(dryRunChild, 
BrooklynNode.STOP_NODE_BUT_LEAVE_APPS, stopParameters))
+            .add(Effectors.invocation(entity(), 
BrooklynNode.STOP_NODE_BUT_LEAVE_APPS, stopParameters))
             .build());
 
         // 5 move old files, and move new files

Reply via email to