ReleaseableLatch - better support for failing tasks

Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/ae8eb9e3
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/ae8eb9e3
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/ae8eb9e3

Branch: refs/heads/master
Commit: ae8eb9e346df7f91acf229e3d6fd8f68c41a9c69
Parents: 47aecce
Author: Svetoslav Neykov <svetoslav.ney...@cloudsoftcorp.com>
Authored: Wed Jan 25 09:34:41 2017 +0200
Committer: Svetoslav Neykov <svetoslav.ney...@cloudsoftcorp.com>
Committed: Wed Jan 25 09:39:02 2017 +0200

----------------------------------------------------------------------
 .../core/sensor/MaxConcurrencySensor.java       |  16 ++-
 .../core/sensor/MaxConcurrencySensorTest.java   |   3 +-
 .../entity/chef/ChefLifecycleEffectorTasks.java |  13 +-
 .../base/AbstractSoftwareProcessDriver.java     |   3 +-
 ...wareProcessDriverLifecycleEffectorTasks.java |  11 +-
 .../MachineLifecycleEffectorTasks.java          |  80 ++++-------
 .../base/SoftwareProcessEntityLatchTest.java    | 131 ++++++++++++++++++-
 .../mysql/DynamicToyMySqlEntityBuilder.java     |  10 +-
 8 files changed, 186 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/ae8eb9e3/core/src/main/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensor.java 
b/core/src/main/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensor.java
index bae9415..fcdf7ba 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensor.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensor.java
@@ -18,11 +18,14 @@
  */
 package org.apache.brooklyn.core.sensor;
 
+import org.apache.brooklyn.api.entity.EntityInitializer;
 import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.effector.AddSensor;
 import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.slf4j.Logger;
@@ -55,24 +58,29 @@ import org.slf4j.LoggerFactory;
  * }
  * </pre>
  */
-public class MaxConcurrencySensor extends AddSensor<ReleaseableLatch> {
+public class MaxConcurrencySensor implements EntityInitializer {
     private static final Logger log = 
LoggerFactory.getLogger(MaxConcurrencySensor.class);
 
+    public static final ConfigKey<String> SENSOR_NAME = 
ConfigKeys.newStringConfigKey("name", "The name of the sensor to create");
     public static final ConfigKey<String> SENSOR_TYPE = 
ConfigKeys.newConfigKeyWithDefault(AddSensor.SENSOR_TYPE, 
ReleaseableLatch.class.getName());
     public static final ConfigKey<Integer> MAX_CONCURRENCY = 
ConfigKeys.newIntegerConfigKey(
             "latch.concurrency.max",
             "The maximum number of threads that can execute the step for the 
latch this sensors is used at, in parallel.",
             Integer.MAX_VALUE);
+
     private Object maxConcurrency;
+    private String sensorName;
 
     public MaxConcurrencySensor(ConfigBag params) {
-        super(params);
-        maxConcurrency = params.getStringKey(MAX_CONCURRENCY.getName());
+        this.sensorName = params.get(SENSOR_NAME);
+        this.maxConcurrency = params.getStringKey(MAX_CONCURRENCY.getName());
     }
 
     @Override
     public void apply(@SuppressWarnings("deprecation") final 
org.apache.brooklyn.api.entity.EntityLocal entity) {
-        super.apply(entity);
+        final AttributeSensor<ReleaseableLatch> sensor = 
Sensors.newSensor(ReleaseableLatch.class, sensorName);
+        ((EntityInternal) entity).getMutableEntityType().addSensor(sensor);
+
         final Task<ReleaseableLatch> resolveValueTask = 
DependentConfiguration.maxConcurrency(maxConcurrency);
 
         class SetValue implements Runnable {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/ae8eb9e3/core/src/test/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensorTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensorTest.java
 
b/core/src/test/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensorTest.java
index 299c269..c8f391c 100644
--- 
a/core/src/test/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensorTest.java
+++ 
b/core/src/test/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensorTest.java
@@ -30,7 +30,6 @@ import com.google.common.collect.ImmutableMap;
 
 public class MaxConcurrencySensorTest extends BrooklynAppUnitTestSupport {
     private static final AttributeSensor<Integer> MAX_PERMITS = 
Sensors.newIntegerSensor("max.permits");
-    private static final AttributeSensor<ReleaseableLatch> SENSOR = 
Sensors.newSensor(ReleaseableLatch.class, "myname");
 
     @Test
     public void testAddsStaticSensorOfTypeString() {
@@ -43,7 +42,7 @@ public class MaxConcurrencySensorTest extends 
BrooklynAppUnitTestSupport {
         int actualPermits = 10;
         app.sensors().set(MAX_PERMITS, actualPermits);
 
-        EntityAsserts.assertAttributeEventuallyNonNull(entity, SENSOR);
+        EntityAsserts.assertAttributeEventuallyNonNull(entity, 
(AttributeSensor<?>)entity.getEntityType().getSensor("myname"));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/ae8eb9e3/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefLifecycleEffectorTasks.java
----------------------------------------------------------------------
diff --git 
a/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefLifecycleEffectorTasks.java
 
b/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefLifecycleEffectorTasks.java
index 4c30ba3..8c3dad9 100644
--- 
a/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefLifecycleEffectorTasks.java
+++ 
b/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefLifecycleEffectorTasks.java
@@ -28,15 +28,12 @@ import 
org.apache.brooklyn.core.effector.ssh.SshEffectorTasks;
 import org.apache.brooklyn.core.entity.Attributes;
 import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
 import org.apache.brooklyn.core.location.Machines;
-import org.apache.brooklyn.core.sensor.ReleaseableLatch;
 import org.apache.brooklyn.entity.software.base.SoftwareProcess;
 import 
org.apache.brooklyn.entity.software.base.lifecycle.MachineLifecycleEffectorTasks;
 import org.apache.brooklyn.location.ssh.SshMachineLocation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.brooklyn.util.collections.Jsonya;
-import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.Jsonya.Navigator;
+import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
 import org.apache.brooklyn.util.core.task.TaskTags;
@@ -48,6 +45,8 @@ import org.apache.brooklyn.util.ssh.BashCommands;
 import org.apache.brooklyn.util.text.Strings;
 import org.apache.brooklyn.util.time.Duration;
 import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
@@ -59,7 +58,7 @@ import com.google.common.collect.ImmutableList;
  * <p>
  * Instances of this should use the {@link ChefConfig} config attributes to 
configure startup,
  * and invoke {@link #usePidFile(String)} or {@link #useService(String)} to 
determine check-running and stop behaviour.
- * Alternatively this can be subclassed and {@link 
#postStartCustom(AtomicReference)} and {@link #stopProcessesAtMachine()} 
overridden.
+ * Alternatively this can be subclassed and {@link #postStartCustom()} and 
{@link #stopProcessesAtMachine()} overridden.
  * 
  * @since 0.6.0
  **/
@@ -239,7 +238,7 @@ public class ChefLifecycleEffectorTasks extends 
MachineLifecycleEffectorTasks im
     }
 
     @Override
-    protected void postStartCustom(AtomicReference<ReleaseableLatch> 
startLatchRef) {
+    protected void postStartCustom() {
         boolean result = false;
         result |= tryCheckStartPid();
         result |= tryCheckStartService();
@@ -248,7 +247,7 @@ public class ChefLifecycleEffectorTasks extends 
MachineLifecycleEffectorTasks im
             log.warn("No way to check whether "+entity()+" is running; 
assuming yes");
         }
         entity().sensors().set(SoftwareProcess.SERVICE_UP, true);
-        super.postStartCustom(startLatchRef);
+        super.postStartCustom();
     }
     
     protected boolean tryCheckStartPid() {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/ae8eb9e3/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java
----------------------------------------------------------------------
diff --git 
a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java
 
b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java
index 069ba16..3900b76 100644
--- 
a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java
+++ 
b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java
@@ -223,14 +223,13 @@ public abstract class AbstractSoftwareProcessDriver 
implements SoftwareProcessDr
         private ReleaseableLatch releaseableLatch;
 
         public CloseableLatch(Entity caller, ReleaseableLatch 
releaseableLatch) {
-            super();
             this.caller = caller;
             this.releaseableLatch = releaseableLatch;
         }
 
         @Override
         public void close() {
-            DynamicTasks.waitForLast();
+            DynamicTasks.drain(null, false);
             releaseableLatch.release(caller);
         }
     }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/ae8eb9e3/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessDriverLifecycleEffectorTasks.java
----------------------------------------------------------------------
diff --git 
a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessDriverLifecycleEffectorTasks.java
 
b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessDriverLifecycleEffectorTasks.java
index 67cdca8..4159c3f 100644
--- 
a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessDriverLifecycleEffectorTasks.java
+++ 
b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessDriverLifecycleEffectorTasks.java
@@ -88,7 +88,7 @@ public class SoftwareProcessDriverLifecycleEffectorTasks 
extends MachineLifecycl
             try {
                 // There's no preStartCustom call in the restart effector to 
get the latch value
                 // so nothing to release here - pass the nop value.
-                postStartCustom(new AtomicReference<>(ReleaseableLatch.NOP));
+                postStartCustom();
                 postRestartCustom();
             } finally {
                 ServiceStateLogic.setExpectedState(entity(), 
Lifecycle.RUNNING);
@@ -174,7 +174,7 @@ public class SoftwareProcessDriverLifecycleEffectorTasks 
extends MachineLifecycl
     }
 
     @Override
-    protected void postStartCustom(AtomicReference<ReleaseableLatch> 
startLatchRef) {
+    protected void postStartCustom() {
         entity().postDriverStart();
         if (entity().connectedSensors) {
             // many impls aren't idempotent - though they should be!
@@ -185,7 +185,7 @@ public class SoftwareProcessDriverLifecycleEffectorTasks 
extends MachineLifecycl
         }
         entity().waitForServiceUp();
         entity().postStart();
-        super.postStartCustom(startLatchRef);
+        super.postStartCustom();
     }
     
     @Override
@@ -261,10 +261,9 @@ public class SoftwareProcessDriverLifecycleEffectorTasks 
extends MachineLifecycl
     }
     
     @Override
-    protected void postStopCustom(AtomicReference<ReleaseableLatch> 
stopLatchRef) {
-        super.postStopCustom(stopLatchRef);
-        
+    protected void postStopCustom() {
         entity().postStop();
+        super.postStopCustom();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/ae8eb9e3/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java
----------------------------------------------------------------------
diff --git 
a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java
 
b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java
index 0282ca9..c0d80af 100644
--- 
a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java
+++ 
b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java
@@ -114,9 +114,9 @@ import com.google.common.reflect.TypeToken;
  *  <li> {@link #startProcessesAtMachine(Supplier)} (required)
  *  <li> {@link #stopProcessesAtMachine()} (required, but can be left blank if 
you assume the VM will be destroyed)
  *  <li> {@link #preStartCustom(MachineLocation, AtomicReference)}
- *  <li> {@link #postStartCustom(AtomicReference)}
+ *  <li> {@link #postStartCustom()}
  *  <li> {@link #preStopConfirmCustom(AtomicReference)}
- *  <li> {@link #postStopCustom(AtomicReference)}
+ *  <li> {@link #postStopCustom()}
  * </ul>
  * Note methods at this level typically look after the {@link 
Attributes#SERVICE_STATE_ACTUAL} sensor.
  *
@@ -380,6 +380,11 @@ public abstract class MachineLifecycleEffectorTasks {
             postStartAtMachineAsync();
         } finally {
             RELEASEABLE_LATCH_TL.remove();
+            DynamicTasks.drain(null, false);
+            ReleaseableLatch startLatch = startLatchRef.get();
+            if (startLatch != null) {
+                startLatch.release(entity());
+            }
         }
     }
 
@@ -644,48 +649,25 @@ public abstract class MachineLifecycleEffectorTasks {
 
     protected abstract String startProcessesAtMachine(final 
Supplier<MachineLocation> machineS);
 
-    /** @deprecated since 0.11.0. Use {@link 
#postStartAtMachineAsync(AtomicReference)} instead. */
-    @Deprecated
     protected void postStartAtMachineAsync() {
-        postStartAtMachineAsync(RELEASEABLE_LATCH_TL.get());
-    }
-
-    protected void postStartAtMachineAsync(AtomicReference<ReleaseableLatch> 
startLatchRef) {
-        DynamicTasks.queue("post-start", new PostStartTask(startLatchRef));
+        DynamicTasks.queue("post-start", new PostStartTask());
     }
 
     private class PostStartTask implements Runnable {
-        private AtomicReference<ReleaseableLatch> startLatchRef;
-
-        public PostStartTask(AtomicReference<ReleaseableLatch> startLatchRef) {
-            this.startLatchRef = startLatchRef;
-        }
-
         @Override
         public void run() {
-            RELEASEABLE_LATCH_TL.set(startLatchRef);
-            try {
-                postStartCustom();
-            } finally {
-                RELEASEABLE_LATCH_TL.remove();
-            }
+            postStartCustom();
         }
     }
 
-    /** @deprecated since 0.11.0. Use {@link 
#postStartCustom(AtomicReference)} instead. */
-    @Deprecated
-    protected void postStartCustom() {
-        postStartCustom(RELEASEABLE_LATCH_TL.get());
-    }
-
     /**
      * Default post-start hooks.
      * <p>
      * Can be extended by subclasses, and typically will wait for confirmation 
of start.
      * The service not set to running until after this. Also invoked following 
a restart.
      */
-    protected void postStartCustom(AtomicReference<ReleaseableLatch> 
startLatchRef) {
-        startLatchRef.get().release(entity());
+    protected void postStartCustom() {
+        // nothing by default
     }
 
     /**
@@ -792,7 +774,7 @@ public abstract class MachineLifecycleEffectorTasks {
      * If no errors were encountered call {@link #postStopCustom()} at the end.
      */
     public void stop(ConfigBag parameters) {
-        doStop(parameters, new StopAnyProvisionedMachinesTask());
+        doStopLatching(parameters, new StopAnyProvisionedMachinesTask());
     }
 
     /**
@@ -800,17 +782,26 @@ public abstract class MachineLifecycleEffectorTasks {
      * {@link #stopAnyProvisionedMachines}.
      */
     public void suspend(ConfigBag parameters) {
-        doStop(parameters, new SuspendAnyProvisionedMachinesTask());
+        doStopLatching(parameters, new SuspendAnyProvisionedMachinesTask());
     }
 
-    protected void doStop(ConfigBag parameters, 
Callable<StopMachineDetails<Integer>> stopTask) {
+    protected void doStopLatching(ConfigBag parameters, 
Callable<StopMachineDetails<Integer>> stopTask) {
         AtomicReference<ReleaseableLatch> stopLatchRef = new 
AtomicReference<>();
         RELEASEABLE_LATCH_TL.set(stopLatchRef);
         try {
-            preStopConfirmCustom();
+            doStop(parameters, stopTask);
         } finally {
             RELEASEABLE_LATCH_TL.remove();
+            DynamicTasks.drain(null, false);
+            ReleaseableLatch stopLatch = stopLatchRef.get();
+            if (stopLatch != null) {
+                stopLatch.release(entity());
+            }
         }
+    }
+
+    protected void doStop(ConfigBag parameters, 
Callable<StopMachineDetails<Integer>> stopTask) {
+        preStopConfirmCustom();
 
         log.info("Stopping {} in {}", entity(), entity().getLocations());
 
@@ -923,7 +914,7 @@ public abstract class MachineLifecycleEffectorTasks {
         entity().sensors().set(SoftwareProcess.SERVICE_UP, false);
         ServiceStateLogic.setExpectedState(entity(), Lifecycle.STOPPED);
 
-        DynamicTasks.queue("post-stop", new PostStopCustomTask(stopLatchRef));
+        DynamicTasks.queue("post-stop", new PostStopCustomTask());
 
         if (log.isDebugEnabled()) log.debug("Stopped software process entity 
"+entity());
     }
@@ -989,20 +980,9 @@ public abstract class MachineLifecycleEffectorTasks {
     }
 
     private class PostStopCustomTask implements Callable<Void> {
-        private AtomicReference<ReleaseableLatch> stopLatchRef;
-
-        public PostStopCustomTask(AtomicReference<ReleaseableLatch> 
stopLatchRef) {
-            this.stopLatchRef = stopLatchRef;
-        }
-
         @Override
         public Void call() {
-            RELEASEABLE_LATCH_TL.set(stopLatchRef);
-            try {
-                postStopCustom();
-            } finally {
-                RELEASEABLE_LATCH_TL.remove();
-            }
+            postStopCustom();
             return null;
         }
     }
@@ -1041,14 +1021,8 @@ public abstract class MachineLifecycleEffectorTasks {
         // nothing needed here
     }
 
-    /** @deprecated 0.11.0. Use {@link #postStopCustom(AtomicReference)} 
instead. */
-    @Deprecated
     protected void postStopCustom() {
-        postStopCustom(RELEASEABLE_LATCH_TL.get());
-    }
-
-    protected void postStopCustom(AtomicReference<ReleaseableLatch> 
stopLatchRef) {
-        stopLatchRef.get().release(entity());
+        // nothing needed here
     }
 
     protected void preRestartCustom() {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/ae8eb9e3/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityLatchTest.java
----------------------------------------------------------------------
diff --git 
a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityLatchTest.java
 
b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityLatchTest.java
index 601c0f2..c5e7fc5 100644
--- 
a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityLatchTest.java
+++ 
b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityLatchTest.java
@@ -29,13 +29,18 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntityInitializer;
 import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.entity.ImplementedBy;
 import org.apache.brooklyn.api.location.LocationSpec;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.effector.AddEffector;
+import org.apache.brooklyn.core.effector.EffectorBody;
 import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.sensor.DependentConfiguration;
 import org.apache.brooklyn.core.sensor.ReleaseableLatch;
@@ -43,10 +48,13 @@ import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.entity.group.DynamicCluster;
 import 
org.apache.brooklyn.entity.software.base.SoftwareProcessEntityTest.MyService;
+import 
org.apache.brooklyn.entity.software.base.SoftwareProcessEntityTest.MyServiceImpl;
 import 
org.apache.brooklyn.entity.software.base.SoftwareProcessEntityTest.SimulatedDriver;
 import org.apache.brooklyn.location.byon.FixedListMachineProvisioningLocation;
 import org.apache.brooklyn.location.ssh.SshMachineLocation;
 import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
 import org.apache.brooklyn.util.core.task.TaskInternal;
 import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.time.Duration;
@@ -175,11 +183,57 @@ public class SoftwareProcessEntityLatchTest extends 
BrooklynAppUnitTestSupport {
         // Check we have actually used the latch
         assertNotEquals(countingLatch.getMaxCounter(), 0, "Latch not acquired 
at all");
         // In theory this is 0 < maxCnt <= maxConcurrency contract, but in 
practice
-        // we should always reach the maximum due to the sleeps below.
+        // we should always reach the maximum due to the sleeps in 
CountingLatch.
         // Change if found to fail in the wild.
         assertEquals(countingLatch.getMaxCounter(), maxConcurrency);
     }
 
+    @Test(dataProvider="latchAndTaskNamesProvider"/*, 
timeOut=Asserts.THIRTY_SECONDS_TIMEOUT_MS*/)
+    public void testFailedReleaseableUnblocks(final ConfigKey<Boolean> latch, 
List<String> _) throws Exception {
+        final int maxConcurrency = 1;
+        final ReleaseableLatch latchSemaphore = 
ReleaseableLatch.Factory.newMaxConcurrencyLatch(maxConcurrency);
+        final AttributeSensor<Object> latchSensor = 
Sensors.newSensor(Object.class, "latch");
+        final CountingLatch countingLatch = new CountingLatch(latchSemaphore, 
maxConcurrency);
+        // FIRST_MEMBER_SPEC latches are not guaranteed to be acquired before 
MEMBER_SPEC latches
+        // so the start effector could complete, but the counting latch will 
catch if there are
+        // any unreleased semaphores.
+        @SuppressWarnings({"unused"})
+        DynamicCluster cluster = 
app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.INITIAL_SIZE, 2)
+                .configure(DynamicCluster.FIRST_MEMBER_SPEC, 
EntitySpec.create(FailingMyService.class)
+                        .configure(ConfigKeys.newConfigKey(Object.class, 
latch.getName()), (Object)DependentConfiguration.attributeWhenReady(app, 
latchSensor)))
+                .configure(DynamicCluster.MEMBER_SPEC, 
EntitySpec.create(MyService.class)
+                        .configure(ConfigKeys.newConfigKey(Object.class, 
latch.getName()), (Object)DependentConfiguration.attributeWhenReady(app, 
latchSensor))));
+        app.sensors().set(latchSensor, countingLatch);
+        final Task<Void> startTask = Entities.invokeEffector(app, app, 
MyService.START, ImmutableMap.of("locations", 
ImmutableList.of(app.newLocalhostProvisioningLocation())));
+        //expected to fail but should complete quickly
+        assertTrue(startTask.blockUntilEnded(Asserts.DEFAULT_LONG_TIMEOUT), 
"timeout waiting for start effector to complete");
+        assertTrue(latch == SoftwareProcess.STOP_LATCH || startTask.isError());
+        final Task<Void> stopTask = Entities.invokeEffector(app, app, 
MyService.STOP, ImmutableMap.<String, Object>of());
+        //expected to fail but should complete quickly
+        assertTrue(stopTask.blockUntilEnded(Asserts.DEFAULT_LONG_TIMEOUT), 
"timeout waiting for stop effector to complete");
+        // stop task won't fail because the process stop failed; the error is 
ignored
+        assertTrue(stopTask.isDone());
+        assertEquals(countingLatch.getCounter(), 0);
+        // Check we have actually used the latch
+        assertNotEquals(countingLatch.getMaxCounter(), 0, "Latch not acquired 
at all");
+        // In theory this is 0 < maxCnt <= maxConcurrency contract, but in 
practice
+        // we should always reach the maximum due to the sleeps in 
CountingLatch.
+        // Change if found to fail in the wild.
+        assertEquals(countingLatch.getMaxCounter(), maxConcurrency);
+    }
+
+    protected EntityInitializer createFailingEffectorInitializer(String name) {
+        return new AddEffector(AddEffector.newEffectorBuilder(Void.class,
+                        
ConfigBag.newInstance(ImmutableMap.of(AddEffector.EFFECTOR_NAME, name)))
+                .impl(new EffectorBody<Void>() {
+                    @Override
+                    public Void call(ConfigBag parameters) {
+                        throw new IllegalStateException("Failed to start");
+                    }
+                }).build());
+    }
+
     protected List<String> getLatchPostTasks(final ConfigKey<?> latch) {
         if (latch == SoftwareProcess.STOP_LATCH) {
             return SOFTWARE_PROCESS_STOP_TASKS;
@@ -270,5 +324,80 @@ public class SoftwareProcessEntityLatchTest extends 
BrooklynAppUnitTestSupport {
             }
         }
 
+
+    }
+
+    @ImplementedBy(FailingMyServiceImpl.class)
+    public static interface FailingMyService extends MyService {}
+    public static class FailingMyServiceImpl extends MyServiceImpl implements 
FailingMyService {
+        @Override
+        public Class<?> getDriverInterface() {
+            return FailingSimulatedDriver.class;
+        }
     }
+    static class FailingSimulatedDriver extends SimulatedDriver {
+        public FailingSimulatedDriver(@SuppressWarnings("deprecation") 
org.apache.brooklyn.api.entity.EntityLocal entity, SshMachineLocation machine) {
+            super(entity, machine);
+        }
+
+        @Override
+        public void stop() {
+            super.stop();
+            failOnStep(SoftwareProcess.STOP_LATCH);
+        }
+
+        @Override
+        public void install() {
+            super.install();
+            failOnStep(SoftwareProcess.INSTALL_LATCH);
+        }
+
+        @Override
+        public void customize() {
+            super.customize();
+            failOnStep(SoftwareProcess.CUSTOMIZE_LATCH);
+        }
+
+        @Override
+        public void launch() {
+            super.launch();
+            failOnStep(SoftwareProcess.START_LATCH);
+            failOnStep(SoftwareProcess.LAUNCH_LATCH);
+        }
+
+        @Override
+        public void setup() {
+            super.setup();
+            failOnStep(SoftwareProcess.SETUP_LATCH);
+        }
+
+        @Override
+        public void copyInstallResources() {
+            super.copyInstallResources();
+            failOnStep(SoftwareProcess.INSTALL_RESOURCES_LATCH);
+        }
+
+        @Override
+        public void copyRuntimeResources() {
+            super.copyRuntimeResources();
+            failOnStep(SoftwareProcess.RUNTIME_RESOURCES_LATCH);
+        }
+
+        @Override
+        protected String getInstallLabelExtraSalt() {
+            return super.getInstallLabelExtraSalt();
+        }
+
+        protected void failOnStep(ConfigKey<Boolean> latch) {
+            if (((EntityInternal)entity).config().getRaw(latch).isPresent()) {
+                DynamicTasks.queue("Failing task", new Runnable() {
+                    @Override
+                    public void run() {
+                        throw new IllegalStateException("forced fail");
+                    }
+                });
+            }
+        }
+
+}
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/ae8eb9e3/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/mysql/DynamicToyMySqlEntityBuilder.java
----------------------------------------------------------------------
diff --git 
a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/mysql/DynamicToyMySqlEntityBuilder.java
 
b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/mysql/DynamicToyMySqlEntityBuilder.java
index c25dc02..82a7f6e 100644
--- 
a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/mysql/DynamicToyMySqlEntityBuilder.java
+++ 
b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/mysql/DynamicToyMySqlEntityBuilder.java
@@ -19,7 +19,6 @@
 package org.apache.brooklyn.entity.software.base.test.mysql;
 
 import java.io.File;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntityInitializer;
@@ -30,11 +29,8 @@ import org.apache.brooklyn.api.location.OsDetails;
 import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks;
 import org.apache.brooklyn.core.entity.Attributes;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
-import org.apache.brooklyn.core.sensor.ReleaseableLatch;
 import 
org.apache.brooklyn.entity.software.base.lifecycle.MachineLifecycleEffectorTasks;
 import org.apache.brooklyn.entity.stock.BasicStartable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import 
org.apache.brooklyn.location.localhost.LocalhostMachineProvisioningLocation.LocalhostMachine;
 import org.apache.brooklyn.location.ssh.SshMachineLocation;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
@@ -44,6 +40,8 @@ import 
org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
 import org.apache.brooklyn.util.ssh.BashCommands;
 import org.apache.brooklyn.util.time.Duration;
 import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Predicates;
 import com.google.common.base.Splitter;
@@ -126,7 +124,7 @@ public class DynamicToyMySqlEntityBuilder {
                 return "submitted start";
             }
             @Override
-            protected void postStartCustom(AtomicReference<ReleaseableLatch> 
startLatchRef) {
+            protected void postStartCustom() {
                 // if it's still up after 5s assume we are good
                 Time.sleep(Duration.FIVE_SECONDS);
                 if 
(!DynamicTasks.queue(SshEffectorTasks.isPidFromFileRunning(dir(entity)+"/*/data/*.pid")).get())
 {
@@ -152,7 +150,7 @@ public class DynamicToyMySqlEntityBuilder {
                 // Really should set this with a Feed that checks pid 
periodically.
                 // Should this instead be using SERVICE_NOT_UP_INDICATORS?
                 entity().sensors().set(Attributes.SERVICE_UP, true);
-                super.postStartCustom(startLatchRef);
+                super.postStartCustom();
             }
 
             @Override

Reply via email to