Fix STOP effector for BrooklynNode The shutdown effector was setting a STOP state which prevented the machine from being released. Changes: * Convert the shutdown effector as a utility functionality - just call the remote shutdown endpoint with the passed parameters * Store effector parameters in the EffectorCallTag, methods calls to fetch them. Passing the parameters along the call chain would cause backwards incompatible changes. * Pass stop parameters to the shutdown effector so that it's functionality can be fine tuned by calling stop directly * Don't unamange the node if the machine is not released. * Add logic to wait for the graceful shutdown of the process before killing it forcibly (the java process will take some time to shut down cleanly after closing the rest connection)
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/17afcce6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/17afcce6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/17afcce6 Branch: refs/heads/master Commit: 17afcce60e1f9e5c810a5088dcec355f68471a41 Parents: c2d91a1 Author: Svetoslav Neykov <[email protected]> Authored: Fri Dec 19 16:06:58 2014 +0200 Committer: Svetoslav Neykov <[email protected]> Committed: Sat Jan 17 00:16:58 2015 +0200 ---------------------------------------------------------------------- .../brooklyn/entity/basic/BrooklynTaskTags.java | 33 +++++- .../management/internal/EffectorUtils.java | 2 +- .../entity/brooklynnode/BrooklynNodeImpl.java | 115 +++++++++++-------- .../BrooklynNodeUpgradeEffectorBody.java | 7 +- 4 files changed, 101 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/17afcce6/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java b/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java index 5c92b43..8a9a21b 100644 --- a/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java +++ b/core/src/main/java/brooklyn/entity/basic/BrooklynTaskTags.java @@ -37,6 +37,7 @@ import brooklyn.entity.Entity; import brooklyn.management.ExecutionManager; import brooklyn.management.Task; import brooklyn.management.entitlement.EntitlementContext; +import brooklyn.util.config.ConfigBag; import brooklyn.util.guava.Maybe; import brooklyn.util.javalang.MemoryUsageTracker; import brooklyn.util.stream.Streams; @@ -274,9 +275,11 @@ public class BrooklynTaskTags extends TaskTags { public static class EffectorCallTag { protected final String entityId; protected final String effectorName; - protected EffectorCallTag(String entityId, String effectorName) { + protected transient ConfigBag parameters; + protected EffectorCallTag(String entityId, String effectorName, ConfigBag parameters) { this.entityId = checkNotNull(entityId, "entityId"); this.effectorName = checkNotNull(effectorName, "effectorName"); + this.parameters = parameters; } public String toString() { return EFFECTOR_TAG+"@"+entityId+":"+effectorName; @@ -300,10 +303,16 @@ public class BrooklynTaskTags extends TaskTags { public String getEffectorName() { return effectorName; } + public ConfigBag getParameters() { + return parameters; + } + public void setParameters(ConfigBag parameters) { + this.parameters = parameters; + } } - public static EffectorCallTag tagForEffectorCall(Entity entity, String effectorName) { - return new EffectorCallTag(entity.getId(), effectorName); + public static EffectorCallTag tagForEffectorCall(Entity entity, String effectorName, ConfigBag parameters) { + return new EffectorCallTag(entity.getId(), effectorName, parameters); } /** @@ -341,7 +350,7 @@ public class BrooklynTaskTags extends TaskTags { public static EffectorCallTag getEffectorCallTag(Task<?> task, boolean recurse) { Task<?> t = task; while (t!=null) { - for (Object tag: task.getTags()) { + for (Object tag: t.getTags()) { if (tag instanceof EffectorCallTag) return (EffectorCallTag)tag; } @@ -357,7 +366,23 @@ public class BrooklynTaskTags extends TaskTags { EffectorCallTag result = getEffectorCallTag(task, true); return (result == null) ? null : result.getEffectorName(); } + + public static ConfigBag getEffectorParameters(Task<?> task) { + EffectorCallTag result = getEffectorCallTag(task, true); + return (result == null) ? null : result.getParameters(); + } + + public static ConfigBag getCurrentEffectorParameters() { + return getEffectorParameters(Tasks.current()); + } + public static void setEffectorParameters(Task<?> task, ConfigBag parameters) { + EffectorCallTag result = getEffectorCallTag(task, true); + if (result == null) { + throw new IllegalStateException("No EffectorCallTag found, is the task an effector? Task: " + task); + } + result.setParameters(parameters); + } // ---------------- entitlement tags ---------------- public static class EntitlementTag { http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/17afcce6/core/src/main/java/brooklyn/management/internal/EffectorUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/management/internal/EffectorUtils.java b/core/src/main/java/brooklyn/management/internal/EffectorUtils.java index f2a3271..d55f3a8 100644 --- a/core/src/main/java/brooklyn/management/internal/EffectorUtils.java +++ b/core/src/main/java/brooklyn/management/internal/EffectorUtils.java @@ -340,7 +340,7 @@ public class EffectorUtils { .put("displayName", effector.getName()) .put("tags", MutableList.of( BrooklynTaskTags.EFFECTOR_TAG, - BrooklynTaskTags.tagForEffectorCall(entity, effector.getName()), + BrooklynTaskTags.tagForEffectorCall(entity, effector.getName(), parameters), BrooklynTaskTags.tagForTargetEntity(entity))) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/17afcce6/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java index 4b94c4f..401bcd4 100644 --- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java +++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java @@ -22,6 +22,7 @@ import java.net.InetAddress; import java.net.URI; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import org.apache.http.HttpStatus; import org.slf4j.Logger; @@ -33,8 +34,8 @@ import brooklyn.enricher.Enrichers; import brooklyn.entity.Effector; import brooklyn.entity.Entity; import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.BrooklynTaskTags; import brooklyn.entity.basic.Entities; -import brooklyn.entity.basic.EntityPredicates; import brooklyn.entity.basic.Lifecycle; import brooklyn.entity.basic.ServiceStateLogic; import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic; @@ -121,7 +122,9 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNod @Override protected void preStopConfirmCustom() { super.preStopConfirmCustom(); - if (Boolean.TRUE.equals(getAttribute(BrooklynNode.WEB_CONSOLE_ACCESSIBLE))) { + ConfigBag stopParameters = BrooklynTaskTags.getCurrentEffectorParameters(); + if (Boolean.TRUE.equals(getAttribute(BrooklynNode.WEB_CONSOLE_ACCESSIBLE)) && + stopParameters != null && !stopParameters.containsKey(ShutdownEffector.STOP_APPS_FIRST)) { Preconditions.checkState(getChildren().isEmpty(), "Can't stop instance with running applications."); } } @@ -133,19 +136,63 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNod // Shutdown only if accessible: any of stop_* could have already been called. // Don't check serviceUp=true because stop() will already have set serviceUp=false && expectedState=stopping if (Boolean.TRUE.equals(getAttribute(BrooklynNode.WEB_CONSOLE_ACCESSIBLE))) { - DynamicTasks.queue(Effectors.invocation(this, SHUTDOWN, MutableMap.of(ShutdownEffector.REQUEST_TIMEOUT, Duration.ONE_MINUTE))); + queueShutdownTask(); + queueWaitExitTask(); } else { log.info("Skipping children.isEmpty check and shutdown call, because web-console not up for {}", this); } } + private void queueWaitExitTask() { + //give time to the process to die gracefully after closing the shutdown call + DynamicTasks.queue(Tasks.builder().name("wait for graceful stop").body(new Runnable() { + @Override + public void run() { + DynamicTasks.markInessential(); + boolean cleanExit = Repeater.create() + .until(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + return !getDriver().isRunning(); + } + }) + .backoffTo(Duration.ONE_SECOND) + .limitTimeTo(Duration.ONE_MINUTE) + .run(); + if (!cleanExit) { + log.warn("Tenant " + this + " didn't stop cleanly after shutdown. Timeout waiting for process exit."); + } + } + }).build()); + } + @Override protected void postStop() { super.postStop(); - //Don't unmanage in entity's task context as it will self-cancel the task. - //The external thread doesn't guarantee that the unmanage will be called *after* the stop effector completes. - //How to delay and make sure that we don't cancel the (almost-complete) stop effector? - new UnmanageThread().start(); + ConfigBag stopParameters = BrooklynTaskTags.getCurrentEffectorParameters(); + //unmanage only if stopping the machine + if (stopParameters == null || stopParameters.get(StopSoftwareParameters.STOP_MACHINE)) { + //Don't unmanage in entity's task context as it will self-cancel the task. + //The external thread doesn't guarantee that the unmanage will be called *after* the stop effector completes. + //How to delay and make sure that we don't cancel the (almost-complete) stop effector? + new UnmanageThread().start(); + } + } + + private void queueShutdownTask() { + ConfigBag stopParameters = BrooklynTaskTags.getCurrentEffectorParameters(); + ConfigBag shutdownParameters; + if (stopParameters != null) { + shutdownParameters = ConfigBag.newInstanceCopying(stopParameters); + } else { + shutdownParameters = ConfigBag.newInstance(); + } + shutdownParameters.putIfAbsent(ShutdownEffector.REQUEST_TIMEOUT, Duration.ONE_MINUTE); + shutdownParameters.putIfAbsent(ShutdownEffector.FORCE_SHUTDOWN_ON_ERROR, Boolean.TRUE); + TaskAdaptable<Void> shutdownTask = Effectors.invocation(this, SHUTDOWN, shutdownParameters); + //Mark inessential so that even if it fails the process stop task will run afterwards to clean up. + TaskTags.markInessential(shutdownTask); + DynamicTasks.queue(shutdownTask); } public static class DeployBlueprintEffectorBody extends EffectorBody<String> implements DeployBlueprintEffector { @@ -238,15 +285,9 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNod } } catch (Exception e) { Exceptions.propagateIfFatal(e); - ServiceStateLogic.setExpectedState(entity(), Lifecycle.ON_FIRE); - if (initialState!=Lifecycle.RUNNING) { - // ignore failure in this task if the node is not currently running - Tasks.markInessential(); - } throw new PropagatedRuntimeException("Error shutting down remote node "+entity()+" (in state "+initialState+"): "+Exceptions.collapseText(e), e); } ServiceNotUpLogic.updateNotUpIndicator(entity(), SHUTDOWN.getName(), "Shutdown of remote node has completed successfuly"); - ServiceStateLogic.setExpectedState(entity(), Lifecycle.STOPPED); return null; } @@ -266,20 +307,12 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNod @Override public Void call(ConfigBag parameters) { Duration timeout = parameters.get(TIMEOUT); - MutableMap<?, ?> params = MutableMap.of( - ShutdownEffector.STOP_APPS_FIRST, Boolean.FALSE, - ShutdownEffector.SHUTDOWN_TIMEOUT, timeout, - ShutdownEffector.REQUEST_TIMEOUT, timeout); - Entity entity = entity(); - TaskAdaptable<Void> shutdownTask = Effectors.invocation(entity, SHUTDOWN, params); - if (!entity.getAttribute(SERVICE_UP)) { - TaskTags.markInessential(shutdownTask); - } - DynamicTasks.queue(shutdownTask).asTask().getUnchecked(); - waitForShutdown(entity, Duration.ONE_MINUTE); - - DynamicTasks.queue(Effectors.invocation(entity(), STOP, ConfigBag.EMPTY)).asTask().getUnchecked(); - Entities.destroy(entity); + + ConfigBag stopParameters = ConfigBag.newInstanceCopying(parameters); + stopParameters.put(ShutdownEffector.STOP_APPS_FIRST, Boolean.FALSE); + stopParameters.putIfAbsent(ShutdownEffector.SHUTDOWN_TIMEOUT, timeout); + stopParameters.putIfAbsent(ShutdownEffector.REQUEST_TIMEOUT, timeout); + DynamicTasks.queue(Effectors.invocation(entity(), STOP, stopParameters)).asTask().getUnchecked(); return null; } } @@ -290,32 +323,16 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNod @Override public Void call(ConfigBag parameters) { Duration timeout = parameters.get(TIMEOUT); - MutableMap<?, ?> params = MutableMap.of( - ShutdownEffector.STOP_APPS_FIRST, Boolean.TRUE, - ShutdownEffector.SHUTDOWN_TIMEOUT, timeout, - ShutdownEffector.REQUEST_TIMEOUT, timeout); - Entity entity = entity(); - TaskAdaptable<Void> shutdownTask = Effectors.invocation(entity, SHUTDOWN, params); - if (!entity.getAttribute(SERVICE_UP)) { - TaskTags.markInessential(shutdownTask); - } - DynamicTasks.queue(shutdownTask).asTask().getUnchecked(); - waitForShutdown(entity, Duration.ONE_MINUTE); - - DynamicTasks.queue(Effectors.invocation(entity(), STOP, ConfigBag.EMPTY)).asTask().getUnchecked(); - Entities.destroy(entity); + + ConfigBag stopParameters = ConfigBag.newInstanceCopying(parameters); + stopParameters.put(ShutdownEffector.STOP_APPS_FIRST, Boolean.TRUE); + stopParameters.putIfAbsent(ShutdownEffector.SHUTDOWN_TIMEOUT, timeout); + stopParameters.putIfAbsent(ShutdownEffector.REQUEST_TIMEOUT, timeout); + DynamicTasks.queue(Effectors.invocation(entity(), STOP, stopParameters)).asTask().getUnchecked(); return null; } } - protected static void waitForShutdown(Entity entity, Duration timeout) { - Repeater.create() - .until(entity, EntityPredicates.attributeEqualTo(BrooklynNode.WEB_CONSOLE_ACCESSIBLE, false)) - .backoffTo(Duration.FIVE_SECONDS) - .limitTimeTo(timeout) - .runRequiringTrue(); - } - public List<String> getClasspath() { List<String> classpath = getConfig(CLASSPATH); if (classpath == null || classpath.isEmpty()) { http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/17afcce6/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java index 37ded07..265b3f7 100644 --- a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java +++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java @@ -30,6 +30,7 @@ import brooklyn.entity.basic.Entities; import brooklyn.entity.basic.EntityInternal; import brooklyn.entity.basic.EntityTasks; import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.entity.basic.SoftwareProcess.StopSoftwareParameters; import brooklyn.entity.brooklynnode.BrooklynCluster; import brooklyn.entity.brooklynnode.BrooklynNode; import brooklyn.entity.brooklynnode.BrooklynNodeDriver; @@ -132,9 +133,11 @@ public class BrooklynNodeUpgradeEffectorBody extends EffectorBody<Void> { // 3 stop new version // 4 stop old version + ConfigBag stopParameters = ConfigBag.newInstance(); + stopParameters.put(StopSoftwareParameters.STOP_MACHINE, Boolean.FALSE); DynamicTasks.queue(Tasks.builder().name("shutdown original and transient nodes") - .add(Effectors.invocation(dryRunChild, BrooklynNode.SHUTDOWN, ConfigBag.EMPTY)) - .add(Effectors.invocation(entity(), BrooklynNode.SHUTDOWN, ConfigBag.EMPTY)) + .add(Effectors.invocation(dryRunChild, BrooklynNode.STOP_NODE_BUT_LEAVE_APPS, stopParameters)) + .add(Effectors.invocation(entity(), BrooklynNode.STOP_NODE_BUT_LEAVE_APPS, stopParameters)) .build()); // 5 move old files, and move new files
