ReleaseableLatch - better support for failing tasks
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/ae8eb9e3 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/ae8eb9e3 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/ae8eb9e3 Branch: refs/heads/master Commit: ae8eb9e346df7f91acf229e3d6fd8f68c41a9c69 Parents: 47aecce Author: Svetoslav Neykov <svetoslav.ney...@cloudsoftcorp.com> Authored: Wed Jan 25 09:34:41 2017 +0200 Committer: Svetoslav Neykov <svetoslav.ney...@cloudsoftcorp.com> Committed: Wed Jan 25 09:39:02 2017 +0200 ---------------------------------------------------------------------- .../core/sensor/MaxConcurrencySensor.java | 16 ++- .../core/sensor/MaxConcurrencySensorTest.java | 3 +- .../entity/chef/ChefLifecycleEffectorTasks.java | 13 +- .../base/AbstractSoftwareProcessDriver.java | 3 +- ...wareProcessDriverLifecycleEffectorTasks.java | 11 +- .../MachineLifecycleEffectorTasks.java | 80 ++++------- .../base/SoftwareProcessEntityLatchTest.java | 131 ++++++++++++++++++- .../mysql/DynamicToyMySqlEntityBuilder.java | 10 +- 8 files changed, 186 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/ae8eb9e3/core/src/main/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensor.java index bae9415..fcdf7ba 100644 --- a/core/src/main/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensor.java +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensor.java @@ -18,11 +18,14 @@ */ package org.apache.brooklyn.core.sensor; +import org.apache.brooklyn.api.entity.EntityInitializer; import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.effector.AddSensor; import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.task.Tasks; import org.slf4j.Logger; @@ -55,24 +58,29 @@ import org.slf4j.LoggerFactory; * } * </pre> */ -public class MaxConcurrencySensor extends AddSensor<ReleaseableLatch> { +public class MaxConcurrencySensor implements EntityInitializer { private static final Logger log = LoggerFactory.getLogger(MaxConcurrencySensor.class); + public static final ConfigKey<String> SENSOR_NAME = ConfigKeys.newStringConfigKey("name", "The name of the sensor to create"); public static final ConfigKey<String> SENSOR_TYPE = ConfigKeys.newConfigKeyWithDefault(AddSensor.SENSOR_TYPE, ReleaseableLatch.class.getName()); public static final ConfigKey<Integer> MAX_CONCURRENCY = ConfigKeys.newIntegerConfigKey( "latch.concurrency.max", "The maximum number of threads that can execute the step for the latch this sensors is used at, in parallel.", Integer.MAX_VALUE); + private Object maxConcurrency; + private String sensorName; public MaxConcurrencySensor(ConfigBag params) { - super(params); - maxConcurrency = params.getStringKey(MAX_CONCURRENCY.getName()); + this.sensorName = params.get(SENSOR_NAME); + this.maxConcurrency = params.getStringKey(MAX_CONCURRENCY.getName()); } @Override public void apply(@SuppressWarnings("deprecation") final org.apache.brooklyn.api.entity.EntityLocal entity) { - super.apply(entity); + final AttributeSensor<ReleaseableLatch> sensor = Sensors.newSensor(ReleaseableLatch.class, sensorName); + ((EntityInternal) entity).getMutableEntityType().addSensor(sensor); + final Task<ReleaseableLatch> resolveValueTask = DependentConfiguration.maxConcurrency(maxConcurrency); class SetValue implements Runnable { http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/ae8eb9e3/core/src/test/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensorTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensorTest.java b/core/src/test/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensorTest.java index 299c269..c8f391c 100644 --- a/core/src/test/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensorTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensorTest.java @@ -30,7 +30,6 @@ import com.google.common.collect.ImmutableMap; public class MaxConcurrencySensorTest extends BrooklynAppUnitTestSupport { private static final AttributeSensor<Integer> MAX_PERMITS = Sensors.newIntegerSensor("max.permits"); - private static final AttributeSensor<ReleaseableLatch> SENSOR = Sensors.newSensor(ReleaseableLatch.class, "myname"); @Test public void testAddsStaticSensorOfTypeString() { @@ -43,7 +42,7 @@ public class MaxConcurrencySensorTest extends BrooklynAppUnitTestSupport { int actualPermits = 10; app.sensors().set(MAX_PERMITS, actualPermits); - EntityAsserts.assertAttributeEventuallyNonNull(entity, SENSOR); + EntityAsserts.assertAttributeEventuallyNonNull(entity, (AttributeSensor<?>)entity.getEntityType().getSensor("myname")); } } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/ae8eb9e3/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefLifecycleEffectorTasks.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefLifecycleEffectorTasks.java b/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefLifecycleEffectorTasks.java index 4c30ba3..8c3dad9 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefLifecycleEffectorTasks.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefLifecycleEffectorTasks.java @@ -28,15 +28,12 @@ import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks; import org.apache.brooklyn.core.entity.Attributes; import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; import org.apache.brooklyn.core.location.Machines; -import org.apache.brooklyn.core.sensor.ReleaseableLatch; import org.apache.brooklyn.entity.software.base.SoftwareProcess; import org.apache.brooklyn.entity.software.base.lifecycle.MachineLifecycleEffectorTasks; import org.apache.brooklyn.location.ssh.SshMachineLocation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.brooklyn.util.collections.Jsonya; -import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.collections.Jsonya.Navigator; +import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.task.DynamicTasks; import org.apache.brooklyn.util.core.task.TaskTags; @@ -48,6 +45,8 @@ import org.apache.brooklyn.util.ssh.BashCommands; import org.apache.brooklyn.util.text.Strings; import org.apache.brooklyn.util.time.Duration; import org.apache.brooklyn.util.time.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; @@ -59,7 +58,7 @@ import com.google.common.collect.ImmutableList; * <p> * Instances of this should use the {@link ChefConfig} config attributes to configure startup, * and invoke {@link #usePidFile(String)} or {@link #useService(String)} to determine check-running and stop behaviour. - * Alternatively this can be subclassed and {@link #postStartCustom(AtomicReference)} and {@link #stopProcessesAtMachine()} overridden. + * Alternatively this can be subclassed and {@link #postStartCustom()} and {@link #stopProcessesAtMachine()} overridden. * * @since 0.6.0 **/ @@ -239,7 +238,7 @@ public class ChefLifecycleEffectorTasks extends MachineLifecycleEffectorTasks im } @Override - protected void postStartCustom(AtomicReference<ReleaseableLatch> startLatchRef) { + protected void postStartCustom() { boolean result = false; result |= tryCheckStartPid(); result |= tryCheckStartService(); @@ -248,7 +247,7 @@ public class ChefLifecycleEffectorTasks extends MachineLifecycleEffectorTasks im log.warn("No way to check whether "+entity()+" is running; assuming yes"); } entity().sensors().set(SoftwareProcess.SERVICE_UP, true); - super.postStartCustom(startLatchRef); + super.postStartCustom(); } protected boolean tryCheckStartPid() { http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/ae8eb9e3/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java index 069ba16..3900b76 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java @@ -223,14 +223,13 @@ public abstract class AbstractSoftwareProcessDriver implements SoftwareProcessDr private ReleaseableLatch releaseableLatch; public CloseableLatch(Entity caller, ReleaseableLatch releaseableLatch) { - super(); this.caller = caller; this.releaseableLatch = releaseableLatch; } @Override public void close() { - DynamicTasks.waitForLast(); + DynamicTasks.drain(null, false); releaseableLatch.release(caller); } } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/ae8eb9e3/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessDriverLifecycleEffectorTasks.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessDriverLifecycleEffectorTasks.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessDriverLifecycleEffectorTasks.java index 67cdca8..4159c3f 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessDriverLifecycleEffectorTasks.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessDriverLifecycleEffectorTasks.java @@ -88,7 +88,7 @@ public class SoftwareProcessDriverLifecycleEffectorTasks extends MachineLifecycl try { // There's no preStartCustom call in the restart effector to get the latch value // so nothing to release here - pass the nop value. - postStartCustom(new AtomicReference<>(ReleaseableLatch.NOP)); + postStartCustom(); postRestartCustom(); } finally { ServiceStateLogic.setExpectedState(entity(), Lifecycle.RUNNING); @@ -174,7 +174,7 @@ public class SoftwareProcessDriverLifecycleEffectorTasks extends MachineLifecycl } @Override - protected void postStartCustom(AtomicReference<ReleaseableLatch> startLatchRef) { + protected void postStartCustom() { entity().postDriverStart(); if (entity().connectedSensors) { // many impls aren't idempotent - though they should be! @@ -185,7 +185,7 @@ public class SoftwareProcessDriverLifecycleEffectorTasks extends MachineLifecycl } entity().waitForServiceUp(); entity().postStart(); - super.postStartCustom(startLatchRef); + super.postStartCustom(); } @Override @@ -261,10 +261,9 @@ public class SoftwareProcessDriverLifecycleEffectorTasks extends MachineLifecycl } @Override - protected void postStopCustom(AtomicReference<ReleaseableLatch> stopLatchRef) { - super.postStopCustom(stopLatchRef); - + protected void postStopCustom() { entity().postStop(); + super.postStopCustom(); } @Override http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/ae8eb9e3/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java index 0282ca9..c0d80af 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java @@ -114,9 +114,9 @@ import com.google.common.reflect.TypeToken; * <li> {@link #startProcessesAtMachine(Supplier)} (required) * <li> {@link #stopProcessesAtMachine()} (required, but can be left blank if you assume the VM will be destroyed) * <li> {@link #preStartCustom(MachineLocation, AtomicReference)} - * <li> {@link #postStartCustom(AtomicReference)} + * <li> {@link #postStartCustom()} * <li> {@link #preStopConfirmCustom(AtomicReference)} - * <li> {@link #postStopCustom(AtomicReference)} + * <li> {@link #postStopCustom()} * </ul> * Note methods at this level typically look after the {@link Attributes#SERVICE_STATE_ACTUAL} sensor. * @@ -380,6 +380,11 @@ public abstract class MachineLifecycleEffectorTasks { postStartAtMachineAsync(); } finally { RELEASEABLE_LATCH_TL.remove(); + DynamicTasks.drain(null, false); + ReleaseableLatch startLatch = startLatchRef.get(); + if (startLatch != null) { + startLatch.release(entity()); + } } } @@ -644,48 +649,25 @@ public abstract class MachineLifecycleEffectorTasks { protected abstract String startProcessesAtMachine(final Supplier<MachineLocation> machineS); - /** @deprecated since 0.11.0. Use {@link #postStartAtMachineAsync(AtomicReference)} instead. */ - @Deprecated protected void postStartAtMachineAsync() { - postStartAtMachineAsync(RELEASEABLE_LATCH_TL.get()); - } - - protected void postStartAtMachineAsync(AtomicReference<ReleaseableLatch> startLatchRef) { - DynamicTasks.queue("post-start", new PostStartTask(startLatchRef)); + DynamicTasks.queue("post-start", new PostStartTask()); } private class PostStartTask implements Runnable { - private AtomicReference<ReleaseableLatch> startLatchRef; - - public PostStartTask(AtomicReference<ReleaseableLatch> startLatchRef) { - this.startLatchRef = startLatchRef; - } - @Override public void run() { - RELEASEABLE_LATCH_TL.set(startLatchRef); - try { - postStartCustom(); - } finally { - RELEASEABLE_LATCH_TL.remove(); - } + postStartCustom(); } } - /** @deprecated since 0.11.0. Use {@link #postStartCustom(AtomicReference)} instead. */ - @Deprecated - protected void postStartCustom() { - postStartCustom(RELEASEABLE_LATCH_TL.get()); - } - /** * Default post-start hooks. * <p> * Can be extended by subclasses, and typically will wait for confirmation of start. * The service not set to running until after this. Also invoked following a restart. */ - protected void postStartCustom(AtomicReference<ReleaseableLatch> startLatchRef) { - startLatchRef.get().release(entity()); + protected void postStartCustom() { + // nothing by default } /** @@ -792,7 +774,7 @@ public abstract class MachineLifecycleEffectorTasks { * If no errors were encountered call {@link #postStopCustom()} at the end. */ public void stop(ConfigBag parameters) { - doStop(parameters, new StopAnyProvisionedMachinesTask()); + doStopLatching(parameters, new StopAnyProvisionedMachinesTask()); } /** @@ -800,17 +782,26 @@ public abstract class MachineLifecycleEffectorTasks { * {@link #stopAnyProvisionedMachines}. */ public void suspend(ConfigBag parameters) { - doStop(parameters, new SuspendAnyProvisionedMachinesTask()); + doStopLatching(parameters, new SuspendAnyProvisionedMachinesTask()); } - protected void doStop(ConfigBag parameters, Callable<StopMachineDetails<Integer>> stopTask) { + protected void doStopLatching(ConfigBag parameters, Callable<StopMachineDetails<Integer>> stopTask) { AtomicReference<ReleaseableLatch> stopLatchRef = new AtomicReference<>(); RELEASEABLE_LATCH_TL.set(stopLatchRef); try { - preStopConfirmCustom(); + doStop(parameters, stopTask); } finally { RELEASEABLE_LATCH_TL.remove(); + DynamicTasks.drain(null, false); + ReleaseableLatch stopLatch = stopLatchRef.get(); + if (stopLatch != null) { + stopLatch.release(entity()); + } } + } + + protected void doStop(ConfigBag parameters, Callable<StopMachineDetails<Integer>> stopTask) { + preStopConfirmCustom(); log.info("Stopping {} in {}", entity(), entity().getLocations()); @@ -923,7 +914,7 @@ public abstract class MachineLifecycleEffectorTasks { entity().sensors().set(SoftwareProcess.SERVICE_UP, false); ServiceStateLogic.setExpectedState(entity(), Lifecycle.STOPPED); - DynamicTasks.queue("post-stop", new PostStopCustomTask(stopLatchRef)); + DynamicTasks.queue("post-stop", new PostStopCustomTask()); if (log.isDebugEnabled()) log.debug("Stopped software process entity "+entity()); } @@ -989,20 +980,9 @@ public abstract class MachineLifecycleEffectorTasks { } private class PostStopCustomTask implements Callable<Void> { - private AtomicReference<ReleaseableLatch> stopLatchRef; - - public PostStopCustomTask(AtomicReference<ReleaseableLatch> stopLatchRef) { - this.stopLatchRef = stopLatchRef; - } - @Override public Void call() { - RELEASEABLE_LATCH_TL.set(stopLatchRef); - try { - postStopCustom(); - } finally { - RELEASEABLE_LATCH_TL.remove(); - } + postStopCustom(); return null; } } @@ -1041,14 +1021,8 @@ public abstract class MachineLifecycleEffectorTasks { // nothing needed here } - /** @deprecated 0.11.0. Use {@link #postStopCustom(AtomicReference)} instead. */ - @Deprecated protected void postStopCustom() { - postStopCustom(RELEASEABLE_LATCH_TL.get()); - } - - protected void postStopCustom(AtomicReference<ReleaseableLatch> stopLatchRef) { - stopLatchRef.get().release(entity()); + // nothing needed here } protected void preRestartCustom() { http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/ae8eb9e3/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityLatchTest.java ---------------------------------------------------------------------- diff --git a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityLatchTest.java b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityLatchTest.java index 601c0f2..c5e7fc5 100644 --- a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityLatchTest.java +++ b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityLatchTest.java @@ -29,13 +29,18 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityInitializer; import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.entity.ImplementedBy; import org.apache.brooklyn.api.location.LocationSpec; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.effector.AddEffector; +import org.apache.brooklyn.core.effector.EffectorBody; import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.sensor.DependentConfiguration; import org.apache.brooklyn.core.sensor.ReleaseableLatch; @@ -43,10 +48,13 @@ import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; import org.apache.brooklyn.entity.group.DynamicCluster; import org.apache.brooklyn.entity.software.base.SoftwareProcessEntityTest.MyService; +import org.apache.brooklyn.entity.software.base.SoftwareProcessEntityTest.MyServiceImpl; import org.apache.brooklyn.entity.software.base.SoftwareProcessEntityTest.SimulatedDriver; import org.apache.brooklyn.location.byon.FixedListMachineProvisioningLocation; import org.apache.brooklyn.location.ssh.SshMachineLocation; import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.core.task.DynamicTasks; import org.apache.brooklyn.util.core.task.TaskInternal; import org.apache.brooklyn.util.guava.Maybe; import org.apache.brooklyn.util.time.Duration; @@ -175,11 +183,57 @@ public class SoftwareProcessEntityLatchTest extends BrooklynAppUnitTestSupport { // Check we have actually used the latch assertNotEquals(countingLatch.getMaxCounter(), 0, "Latch not acquired at all"); // In theory this is 0 < maxCnt <= maxConcurrency contract, but in practice - // we should always reach the maximum due to the sleeps below. + // we should always reach the maximum due to the sleeps in CountingLatch. // Change if found to fail in the wild. assertEquals(countingLatch.getMaxCounter(), maxConcurrency); } + @Test(dataProvider="latchAndTaskNamesProvider"/*, timeOut=Asserts.THIRTY_SECONDS_TIMEOUT_MS*/) + public void testFailedReleaseableUnblocks(final ConfigKey<Boolean> latch, List<String> _) throws Exception { + final int maxConcurrency = 1; + final ReleaseableLatch latchSemaphore = ReleaseableLatch.Factory.newMaxConcurrencyLatch(maxConcurrency); + final AttributeSensor<Object> latchSensor = Sensors.newSensor(Object.class, "latch"); + final CountingLatch countingLatch = new CountingLatch(latchSemaphore, maxConcurrency); + // FIRST_MEMBER_SPEC latches are not guaranteed to be acquired before MEMBER_SPEC latches + // so the start effector could complete, but the counting latch will catch if there are + // any unreleased semaphores. + @SuppressWarnings({"unused"}) + DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class) + .configure(DynamicCluster.INITIAL_SIZE, 2) + .configure(DynamicCluster.FIRST_MEMBER_SPEC, EntitySpec.create(FailingMyService.class) + .configure(ConfigKeys.newConfigKey(Object.class, latch.getName()), (Object)DependentConfiguration.attributeWhenReady(app, latchSensor))) + .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(MyService.class) + .configure(ConfigKeys.newConfigKey(Object.class, latch.getName()), (Object)DependentConfiguration.attributeWhenReady(app, latchSensor)))); + app.sensors().set(latchSensor, countingLatch); + final Task<Void> startTask = Entities.invokeEffector(app, app, MyService.START, ImmutableMap.of("locations", ImmutableList.of(app.newLocalhostProvisioningLocation()))); + //expected to fail but should complete quickly + assertTrue(startTask.blockUntilEnded(Asserts.DEFAULT_LONG_TIMEOUT), "timeout waiting for start effector to complete"); + assertTrue(latch == SoftwareProcess.STOP_LATCH || startTask.isError()); + final Task<Void> stopTask = Entities.invokeEffector(app, app, MyService.STOP, ImmutableMap.<String, Object>of()); + //expected to fail but should complete quickly + assertTrue(stopTask.blockUntilEnded(Asserts.DEFAULT_LONG_TIMEOUT), "timeout waiting for stop effector to complete"); + // stop task won't fail because the process stop failed; the error is ignored + assertTrue(stopTask.isDone()); + assertEquals(countingLatch.getCounter(), 0); + // Check we have actually used the latch + assertNotEquals(countingLatch.getMaxCounter(), 0, "Latch not acquired at all"); + // In theory this is 0 < maxCnt <= maxConcurrency contract, but in practice + // we should always reach the maximum due to the sleeps in CountingLatch. + // Change if found to fail in the wild. + assertEquals(countingLatch.getMaxCounter(), maxConcurrency); + } + + protected EntityInitializer createFailingEffectorInitializer(String name) { + return new AddEffector(AddEffector.newEffectorBuilder(Void.class, + ConfigBag.newInstance(ImmutableMap.of(AddEffector.EFFECTOR_NAME, name))) + .impl(new EffectorBody<Void>() { + @Override + public Void call(ConfigBag parameters) { + throw new IllegalStateException("Failed to start"); + } + }).build()); + } + protected List<String> getLatchPostTasks(final ConfigKey<?> latch) { if (latch == SoftwareProcess.STOP_LATCH) { return SOFTWARE_PROCESS_STOP_TASKS; @@ -270,5 +324,80 @@ public class SoftwareProcessEntityLatchTest extends BrooklynAppUnitTestSupport { } } + + } + + @ImplementedBy(FailingMyServiceImpl.class) + public static interface FailingMyService extends MyService {} + public static class FailingMyServiceImpl extends MyServiceImpl implements FailingMyService { + @Override + public Class<?> getDriverInterface() { + return FailingSimulatedDriver.class; + } } + static class FailingSimulatedDriver extends SimulatedDriver { + public FailingSimulatedDriver(@SuppressWarnings("deprecation") org.apache.brooklyn.api.entity.EntityLocal entity, SshMachineLocation machine) { + super(entity, machine); + } + + @Override + public void stop() { + super.stop(); + failOnStep(SoftwareProcess.STOP_LATCH); + } + + @Override + public void install() { + super.install(); + failOnStep(SoftwareProcess.INSTALL_LATCH); + } + + @Override + public void customize() { + super.customize(); + failOnStep(SoftwareProcess.CUSTOMIZE_LATCH); + } + + @Override + public void launch() { + super.launch(); + failOnStep(SoftwareProcess.START_LATCH); + failOnStep(SoftwareProcess.LAUNCH_LATCH); + } + + @Override + public void setup() { + super.setup(); + failOnStep(SoftwareProcess.SETUP_LATCH); + } + + @Override + public void copyInstallResources() { + super.copyInstallResources(); + failOnStep(SoftwareProcess.INSTALL_RESOURCES_LATCH); + } + + @Override + public void copyRuntimeResources() { + super.copyRuntimeResources(); + failOnStep(SoftwareProcess.RUNTIME_RESOURCES_LATCH); + } + + @Override + protected String getInstallLabelExtraSalt() { + return super.getInstallLabelExtraSalt(); + } + + protected void failOnStep(ConfigKey<Boolean> latch) { + if (((EntityInternal)entity).config().getRaw(latch).isPresent()) { + DynamicTasks.queue("Failing task", new Runnable() { + @Override + public void run() { + throw new IllegalStateException("forced fail"); + } + }); + } + } + +} } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/ae8eb9e3/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/mysql/DynamicToyMySqlEntityBuilder.java ---------------------------------------------------------------------- diff --git a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/mysql/DynamicToyMySqlEntityBuilder.java b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/mysql/DynamicToyMySqlEntityBuilder.java index c25dc02..82a7f6e 100644 --- a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/mysql/DynamicToyMySqlEntityBuilder.java +++ b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/mysql/DynamicToyMySqlEntityBuilder.java @@ -19,7 +19,6 @@ package org.apache.brooklyn.entity.software.base.test.mysql; import java.io.File; -import java.util.concurrent.atomic.AtomicReference; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntityInitializer; @@ -30,11 +29,8 @@ import org.apache.brooklyn.api.location.OsDetails; import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks; import org.apache.brooklyn.core.entity.Attributes; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; -import org.apache.brooklyn.core.sensor.ReleaseableLatch; import org.apache.brooklyn.entity.software.base.lifecycle.MachineLifecycleEffectorTasks; import org.apache.brooklyn.entity.stock.BasicStartable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.brooklyn.location.localhost.LocalhostMachineProvisioningLocation.LocalhostMachine; import org.apache.brooklyn.location.ssh.SshMachineLocation; import org.apache.brooklyn.util.core.task.DynamicTasks; @@ -44,6 +40,8 @@ import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper; import org.apache.brooklyn.util.ssh.BashCommands; import org.apache.brooklyn.util.time.Duration; import org.apache.brooklyn.util.time.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Predicates; import com.google.common.base.Splitter; @@ -126,7 +124,7 @@ public class DynamicToyMySqlEntityBuilder { return "submitted start"; } @Override - protected void postStartCustom(AtomicReference<ReleaseableLatch> startLatchRef) { + protected void postStartCustom() { // if it's still up after 5s assume we are good Time.sleep(Duration.FIVE_SECONDS); if (!DynamicTasks.queue(SshEffectorTasks.isPidFromFileRunning(dir(entity)+"/*/data/*.pid")).get()) { @@ -152,7 +150,7 @@ public class DynamicToyMySqlEntityBuilder { // Really should set this with a Feed that checks pid periodically. // Should this instead be using SERVICE_NOT_UP_INDICATORS? entity().sensors().set(Attributes.SERVICE_UP, true); - super.postStartCustom(startLatchRef); + super.postStartCustom(); } @Override