Let latches limit the paralellism for the step they guard
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/47aecce0 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/47aecce0 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/47aecce0 Branch: refs/heads/master Commit: 47aecce0ce38107d67f1347999542f4d26cf6e61 Parents: 5f66e8f Author: Svetoslav Neykov <svetoslav.ney...@cloudsoftcorp.com> Authored: Mon Jan 16 18:40:55 2017 +0200 Committer: Svetoslav Neykov <svetoslav.ney...@cloudsoftcorp.com> Committed: Mon Jan 23 18:25:09 2017 +0200 ---------------------------------------------------------------------- .../core/sensor/DependentConfiguration.java | 49 ++++- .../core/sensor/MaxConcurrencySensor.java | 91 ++++++++ .../brooklyn/core/sensor/ReleaseableLatch.java | 95 +++++++++ .../core/sensor/MaxConcurrencySensorTest.java | 49 +++++ .../core/sensor/ReleaseableLatchRebindTest.java | 42 ++++ .../core/test/BrooklynMgmtUnitTestSupport.java | 3 +- .../entity/chef/ChefLifecycleEffectorTasks.java | 7 +- .../base/AbstractSoftwareProcessDriver.java | 69 +++++-- ...wareProcessDriverLifecycleEffectorTasks.java | 21 +- .../MachineLifecycleEffectorTasks.java | 174 +++++++++++++--- .../base/SoftwareProcessEntityLatchTest.java | 206 +++++++++++++------ .../mysql/DynamicToyMySqlEntityBuilder.java | 5 +- .../java/org/apache/brooklyn/test/Asserts.java | 6 +- .../org/apache/brooklyn/util/guava/Maybe.java | 5 + 14 files changed, 700 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/47aecce0/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java b/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java index 97d3a53..1b8a352 100644 --- a/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java @@ -52,6 +52,7 @@ import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.util.collections.CollectionFunctionals; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.flags.TypeCoercions; import org.apache.brooklyn.util.core.task.BasicExecutionContext; import org.apache.brooklyn.util.core.task.BasicTask; import org.apache.brooklyn.util.core.task.DeferredSupplier; @@ -569,6 +570,25 @@ public class DependentConfiguration { ); } + public static Maybe<ReleaseableLatch> maxConcurrencyImmediately(Object maxThreads) { + Maybe<?> resolvedMaxThreads = resolveImmediately(maxThreads); + if (resolvedMaxThreads.isAbsent()) return Maybe.absent(); + Integer resolvedMaxThreadsInt = TypeCoercions.coerce(resolvedMaxThreads, Integer.class); + + ReleaseableLatch result = ReleaseableLatch.Factory.newMaxConcurrencyLatch(resolvedMaxThreadsInt); + return Maybe.<ReleaseableLatch>of(result); + } + + public static Task<ReleaseableLatch> maxConcurrency(Object maxThreads) { + List<TaskAdaptable<Object>> taskArgs = getTaskAdaptable(maxThreads); + Function<List<Object>, ReleaseableLatch> transformer = new MaxThreadsTransformerFunction(maxThreads); + return transformMultiple( + MutableMap.of("displayName", String.format("creating max concurrency semaphore(%s)", maxThreads)), + transformer, + taskArgs + ); + } + @SuppressWarnings("unchecked") private static List<TaskAdaptable<Object>> getTaskAdaptable(Object... args){ List<TaskAdaptable<Object>> taskArgs = Lists.newArrayList(); @@ -624,13 +644,38 @@ public class DependentConfiguration { } + public static class MaxThreadsTransformerFunction implements Function<List<Object>, ReleaseableLatch> { + private final Object maxThreads; + + public MaxThreadsTransformerFunction(Object maxThreads) { + this.maxThreads = maxThreads; + } + + @Override + public ReleaseableLatch apply(List<Object> input) { + Iterator<?> taskArgsIterator = input.iterator(); + Integer maxThreadsNum = resolveArgument(maxThreads, taskArgsIterator, Integer.class); + return ReleaseableLatch.Factory.newMaxConcurrencyLatch(maxThreadsNum); + } + + } + + /** + * Same as {@link #resolveArgument(Object, Iterator, Class) with type of String + */ + private static String resolveArgument(Object argument, Iterator<?> taskArgsIterator) { + return resolveArgument(argument, taskArgsIterator, String.class); + } + /** * Resolves the argument as follows: * * If the argument is a DeferredSupplier, we will block and wait for it to resolve. If the argument is TaskAdaptable or TaskFactory, * we will assume that the resolved task has been queued on the {@code taskArgsIterator}, otherwise the argument has already been resolved. + * + * @param type coerces the return value to the requested type */ - private static String resolveArgument(Object argument, Iterator<?> taskArgsIterator) { + private static <T> T resolveArgument(Object argument, Iterator<?> taskArgsIterator, Class<T> type) { Object resolvedArgument; if (argument instanceof TaskAdaptable) { resolvedArgument = taskArgsIterator.next(); @@ -639,7 +684,7 @@ public class DependentConfiguration { } else { resolvedArgument = argument; } - return String.valueOf(resolvedArgument); + return TypeCoercions.coerce(resolvedArgument, type); } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/47aecce0/core/src/main/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensor.java new file mode 100644 index 0000000..bae9415 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensor.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.sensor; + +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.effector.AddSensor; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.core.task.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Can be used as: + * <pre> + * {@code + * brooklyn.initializers: + * - type: org.apache.brooklyn.core.sensor.MaxConcurrencySensor + * brooklyn.config: + * name: start-latch-value + * latch.concurrency.max: 10 + * } + * + * and is the short hand for: + * + * <pre> + * {@code + * brooklyn.initializers: + * - type: org.apache.brooklyn.core.sensor.StaticSensor + * brooklyn.config: + * name: start-latch-value + * static.value: + * $brooklyn.object: + * type: org.apache.brooklyn.core.sensor.ReleaseableLatch$Factory + * factoryMethod.name: newMaxConcurrencyLatch + * factoryMethod.args: [10] + * } + * </pre> + */ +public class MaxConcurrencySensor extends AddSensor<ReleaseableLatch> { + private static final Logger log = LoggerFactory.getLogger(MaxConcurrencySensor.class); + + public static final ConfigKey<String> SENSOR_TYPE = ConfigKeys.newConfigKeyWithDefault(AddSensor.SENSOR_TYPE, ReleaseableLatch.class.getName()); + public static final ConfigKey<Integer> MAX_CONCURRENCY = ConfigKeys.newIntegerConfigKey( + "latch.concurrency.max", + "The maximum number of threads that can execute the step for the latch this sensors is used at, in parallel.", + Integer.MAX_VALUE); + private Object maxConcurrency; + + public MaxConcurrencySensor(ConfigBag params) { + super(params); + maxConcurrency = params.getStringKey(MAX_CONCURRENCY.getName()); + } + + @Override + public void apply(@SuppressWarnings("deprecation") final org.apache.brooklyn.api.entity.EntityLocal entity) { + super.apply(entity); + final Task<ReleaseableLatch> resolveValueTask = DependentConfiguration.maxConcurrency(maxConcurrency); + + class SetValue implements Runnable { + @Override + public void run() { + ReleaseableLatch releaseableLatch = resolveValueTask.getUnchecked(); + log.debug(this+" setting sensor "+sensor+" to "+releaseableLatch+" on "+entity); + entity.sensors().set(sensor, releaseableLatch); + } + } + Task<ReleaseableLatch> setValueTask = Tasks.<ReleaseableLatch>builder().displayName("Setting " + sensor + " on " + entity).body(new SetValue()).build(); + + Entities.submit(entity, Tasks.sequential("Resolving and setting " + sensor + " on " + entity, resolveValueTask, setValueTask)); + } + +} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/47aecce0/core/src/main/java/org/apache/brooklyn/core/sensor/ReleaseableLatch.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/ReleaseableLatch.java b/core/src/main/java/org/apache/brooklyn/core/sensor/ReleaseableLatch.java new file mode 100644 index 0000000..41ed779 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/ReleaseableLatch.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.sensor; + +import java.util.concurrent.Semaphore; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.util.core.task.DeferredSupplier; +import org.apache.brooklyn.util.core.task.ImmediateSupplier; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.guava.Maybe; + +// DeferredSupplier used as a marker interface to prevent coercion. When resolved it must evaluate to {@code Boolean.TRUE}. +public interface ReleaseableLatch extends DeferredSupplier<Boolean>, ImmediateSupplier<Boolean> { + /** + * Increment usage count for the {@code caller} entity + */ + void acquire(Entity caller); + + /** + * Decrement usage count for the {@code caller} entity + */ + void release(Entity caller); + + static abstract class AbstractReleaseableLatch implements ReleaseableLatch { + // Instances coerce to TRUE as they are non-null. + @Override public Boolean get() {return Boolean.TRUE;} + @Override public Maybe<Boolean> getImmediately() {return Maybe.of(Boolean.TRUE);} + } + + ReleaseableLatch NOP = new Factory.NopLatch(); + + static class Factory { + private static class NopLatch extends AbstractReleaseableLatch { + @Override public void acquire(Entity caller) {} + @Override public void release(Entity caller) {} + } + + private static class MaxConcurrencyLatch extends AbstractReleaseableLatch { + private int permits; + private transient final Semaphore sem; + + public MaxConcurrencyLatch(int permits) { + this.permits = permits; + this.sem = new Semaphore(permits); + } + + @Override + public void acquire(Entity caller) { + try { + sem.acquire(); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + } + + @Override + public void release(Entity caller) { + sem.release(); + } + + // On rebind reset thread count + private Object readResolve() { + return newMaxConcurrencyLatch(permits); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "[permits=" + sem.availablePermits() + "/" + permits + "]"; + } + } + + public static ReleaseableLatch newMaxConcurrencyLatch(int maxThreadsNum) { + return new MaxConcurrencyLatch(maxThreadsNum); + } + + } + +} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/47aecce0/core/src/test/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensorTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensorTest.java b/core/src/test/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensorTest.java new file mode 100644 index 0000000..299c269 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/sensor/MaxConcurrencySensorTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.sensor; + +import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.entity.EntityAsserts; +import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; +import org.apache.brooklyn.entity.stock.BasicEntity; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableMap; + +public class MaxConcurrencySensorTest extends BrooklynAppUnitTestSupport { + private static final AttributeSensor<Integer> MAX_PERMITS = Sensors.newIntegerSensor("max.permits"); + private static final AttributeSensor<ReleaseableLatch> SENSOR = Sensors.newSensor(ReleaseableLatch.class, "myname"); + + @Test + public void testAddsStaticSensorOfTypeString() { + BasicEntity entity = app.createAndManageChild(EntitySpec.create(BasicEntity.class) + .addInitializer(new MaxConcurrencySensor(ConfigBag.newInstance(ImmutableMap.of( + MaxConcurrencySensor.SENSOR_NAME, "myname", + MaxConcurrencySensor.MAX_CONCURRENCY, DependentConfiguration.formatString("%d", + DependentConfiguration.attributeWhenReady(app, MAX_PERMITS))))))); + + int actualPermits = 10; + app.sensors().set(MAX_PERMITS, actualPermits); + + EntityAsserts.assertAttributeEventuallyNonNull(entity, SENSOR); + } + +} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/47aecce0/core/src/test/java/org/apache/brooklyn/core/sensor/ReleaseableLatchRebindTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/sensor/ReleaseableLatchRebindTest.java b/core/src/test/java/org/apache/brooklyn/core/sensor/ReleaseableLatchRebindTest.java new file mode 100644 index 0000000..39f3dff --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/sensor/ReleaseableLatchRebindTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.sensor; + +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp; +import org.apache.brooklyn.test.Asserts; +import org.testng.annotations.Test; + +public class ReleaseableLatchRebindTest extends RebindTestFixtureWithApp { + + @Test(timeOut = Asserts.THIRTY_SECONDS_TIMEOUT_MS) + public void testRebindResetsPermits() throws Exception { + final AttributeSensor<ReleaseableLatch> latchSensor = Sensors.newSensor(ReleaseableLatch.class, "latch"); + final ReleaseableLatch latchSemaphore = ReleaseableLatch.Factory.newMaxConcurrencyLatch(1); + origApp.sensors().set(latchSensor, latchSemaphore); + latchSemaphore.acquire(origApp); + + rebind(); + + ReleaseableLatch newSemaphore = newApp.sensors().get(latchSensor); + // makes sure permits are reset and we can acquire the semaphore again + newSemaphore.acquire(origApp); + } + +} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/47aecce0/core/src/test/java/org/apache/brooklyn/core/test/BrooklynMgmtUnitTestSupport.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/test/BrooklynMgmtUnitTestSupport.java b/core/src/test/java/org/apache/brooklyn/core/test/BrooklynMgmtUnitTestSupport.java index 5a000d2..31df8e3 100644 --- a/core/src/test/java/org/apache/brooklyn/core/test/BrooklynMgmtUnitTestSupport.java +++ b/core/src/test/java/org/apache/brooklyn/core/test/BrooklynMgmtUnitTestSupport.java @@ -23,6 +23,7 @@ import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.internal.BrooklynProperties; import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal; import org.apache.brooklyn.core.test.entity.LocalManagementContextForTests; +import org.apache.brooklyn.test.Asserts; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -51,7 +52,7 @@ public class BrooklynMgmtUnitTestSupport { } } - @AfterMethod(alwaysRun=true) + @AfterMethod(alwaysRun=true, timeOut=Asserts.THIRTY_SECONDS_TIMEOUT_MS) public void tearDown() throws Exception { try { destroyManagementContextSafely(mgmt); http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/47aecce0/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefLifecycleEffectorTasks.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefLifecycleEffectorTasks.java b/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefLifecycleEffectorTasks.java index 6a12dcd..4c30ba3 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefLifecycleEffectorTasks.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/chef/ChefLifecycleEffectorTasks.java @@ -20,6 +20,7 @@ package org.apache.brooklyn.entity.chef; import java.util.Collection; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.location.MachineLocation; @@ -27,6 +28,7 @@ import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks; import org.apache.brooklyn.core.entity.Attributes; import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; import org.apache.brooklyn.core.location.Machines; +import org.apache.brooklyn.core.sensor.ReleaseableLatch; import org.apache.brooklyn.entity.software.base.SoftwareProcess; import org.apache.brooklyn.entity.software.base.lifecycle.MachineLifecycleEffectorTasks; import org.apache.brooklyn.location.ssh.SshMachineLocation; @@ -57,7 +59,7 @@ import com.google.common.collect.ImmutableList; * <p> * Instances of this should use the {@link ChefConfig} config attributes to configure startup, * and invoke {@link #usePidFile(String)} or {@link #useService(String)} to determine check-running and stop behaviour. - * Alternatively this can be subclassed and {@link #postStartCustom()} and {@link #stopProcessesAtMachine()} overridden. + * Alternatively this can be subclassed and {@link #postStartCustom(AtomicReference)} and {@link #stopProcessesAtMachine()} overridden. * * @since 0.6.0 **/ @@ -237,7 +239,7 @@ public class ChefLifecycleEffectorTasks extends MachineLifecycleEffectorTasks im } @Override - protected void postStartCustom() { + protected void postStartCustom(AtomicReference<ReleaseableLatch> startLatchRef) { boolean result = false; result |= tryCheckStartPid(); result |= tryCheckStartService(); @@ -246,6 +248,7 @@ public class ChefLifecycleEffectorTasks extends MachineLifecycleEffectorTasks im log.warn("No way to check whether "+entity()+" is running; assuming yes"); } entity().sensors().set(SoftwareProcess.SERVICE_UP, true); + super.postStartCustom(startLatchRef); } protected boolean tryCheckStartPid() { http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/47aecce0/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java index 67b0eea..069ba16 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/AbstractSoftwareProcessDriver.java @@ -38,14 +38,18 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntityLocal; import org.apache.brooklyn.api.location.Location; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.mgmt.TaskAdaptable; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.entity.BrooklynConfigKeys; +import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic; +import org.apache.brooklyn.core.sensor.ReleaseableLatch; +import org.apache.brooklyn.entity.software.base.lifecycle.MachineLifecycleEffectorTasks; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.ResourceUtils; import org.apache.brooklyn.util.core.task.DynamicTasks; @@ -132,8 +136,9 @@ public abstract class AbstractSoftwareProcessDriver implements SoftwareProcessDr boolean skipInstall = locationInstalled.or(entityInstalled).or(false); if (!skipInstall) { DynamicTasks.queue("copy-pre-install-resources", new Runnable() { @Override public void run() { - waitForConfigKey(BrooklynConfigKeys.PRE_INSTALL_RESOURCES_LATCH); - copyPreInstallResources(); + try (CloseableLatch value = waitForLatch(BrooklynConfigKeys.PRE_INSTALL_RESOURCES_LATCH)) { + copyPreInstallResources(); + } }}); DynamicTasks.queue("pre-install", new Runnable() { @Override public void run() { @@ -145,18 +150,21 @@ public abstract class AbstractSoftwareProcessDriver implements SoftwareProcessDr }}); DynamicTasks.queue("setup", new Runnable() { @Override public void run() { - waitForConfigKey(BrooklynConfigKeys.SETUP_LATCH); - setup(); + try (CloseableLatch value = waitForLatch(BrooklynConfigKeys.SETUP_LATCH)) { + setup(); + } }}); DynamicTasks.queue("copy-install-resources", new Runnable() { @Override public void run() { - waitForConfigKey(BrooklynConfigKeys.INSTALL_RESOURCES_LATCH); - copyInstallResources(); + try (CloseableLatch value = waitForLatch(BrooklynConfigKeys.INSTALL_RESOURCES_LATCH)) { + copyInstallResources(); + } }}); DynamicTasks.queue("install (main)", new Runnable() { @Override public void run() { - waitForConfigKey(BrooklynConfigKeys.INSTALL_LATCH); - install(); + try (CloseableLatch value = waitForLatch(BrooklynConfigKeys.INSTALL_LATCH)) { + install(); + } }}); DynamicTasks.queue("post-install-command", new Runnable() { @Override public void run() { @@ -171,8 +179,9 @@ public abstract class AbstractSoftwareProcessDriver implements SoftwareProcessDr }}); DynamicTasks.queue("customize (main)", new Runnable() { @Override public void run() { - waitForConfigKey(BrooklynConfigKeys.CUSTOMIZE_LATCH); - customize(); + try (CloseableLatch value = waitForLatch(BrooklynConfigKeys.CUSTOMIZE_LATCH)) { + customize(); + } }}); DynamicTasks.queue("post-customize-command", new Runnable() { @Override public void run() { @@ -181,9 +190,10 @@ public abstract class AbstractSoftwareProcessDriver implements SoftwareProcessDr }}); DynamicTasks.queue("launch", new Runnable() { @Override public void run() { - DynamicTasks.queue("copy-runtime-resources", new Runnable() { @Override public void run() { - waitForConfigKey(BrooklynConfigKeys.RUNTIME_RESOURCES_LATCH); - copyRuntimeResources(); + DynamicTasks.queue("copy-runtime-resources", new Runnable() { public void run() { + try (CloseableLatch value = waitForLatch(BrooklynConfigKeys.RUNTIME_RESOURCES_LATCH)) { + copyRuntimeResources(); + } }}); DynamicTasks.queue("pre-launch-command", new Runnable() { @Override public void run() { @@ -191,8 +201,9 @@ public abstract class AbstractSoftwareProcessDriver implements SoftwareProcessDr }}); DynamicTasks.queue("launch (main)", new Runnable() { @Override public void run() { - waitForConfigKey(BrooklynConfigKeys.LAUNCH_LATCH); - launch(); + try (CloseableLatch value = waitForLatch(BrooklynConfigKeys.LAUNCH_LATCH)) { + launch(); + } }}); DynamicTasks.queue("post-launch-command", new Runnable() { @Override public void run() { @@ -206,6 +217,29 @@ public abstract class AbstractSoftwareProcessDriver implements SoftwareProcessDr }}); } + // Removes the checked Exception from the method signature + private static class CloseableLatch implements AutoCloseable { + private Entity caller; + private ReleaseableLatch releaseableLatch; + + public CloseableLatch(Entity caller, ReleaseableLatch releaseableLatch) { + super(); + this.caller = caller; + this.releaseableLatch = releaseableLatch; + } + + @Override + public void close() { + DynamicTasks.waitForLast(); + releaseableLatch.release(caller); + } + } + + private CloseableLatch waitForLatch(ConfigKey<Boolean> configKey) { + ReleaseableLatch releaseableLatch = MachineLifecycleEffectorTasks.waitForLatch((EntityInternal)entity, configKey); + return new CloseableLatch(entity, releaseableLatch); + } + @Override public abstract void stop(); @@ -624,11 +658,6 @@ public abstract class AbstractSoftwareProcessDriver implements SoftwareProcessDr return TemplateProcessor.processTemplateContents(templateContents, this, extraSubstitutions); } - protected void waitForConfigKey(ConfigKey<?> configKey) { - Object val = entity.config().get(configKey); - if (val != null) log.debug("{} finished waiting for {} (value {}); continuing...", new Object[] {this, configKey, val}); - } - public String getArchiveNameFormat() { return getEntity().config().get(SoftwareProcess.ARCHIVE_DIRECTORY_NAME_FORMAT); } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/47aecce0/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessDriverLifecycleEffectorTasks.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessDriverLifecycleEffectorTasks.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessDriverLifecycleEffectorTasks.java index 0cba30e..67cdca8 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessDriverLifecycleEffectorTasks.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessDriverLifecycleEffectorTasks.java @@ -19,6 +19,7 @@ package org.apache.brooklyn.entity.software.base; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import org.apache.brooklyn.api.location.MachineLocation; import org.apache.brooklyn.api.location.MachineProvisioningLocation; @@ -28,6 +29,7 @@ import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic; import org.apache.brooklyn.core.entity.trait.StartableMethods; +import org.apache.brooklyn.core.sensor.ReleaseableLatch; import org.apache.brooklyn.entity.software.base.SoftwareProcess.ChildStartableMode; import org.apache.brooklyn.entity.software.base.SoftwareProcess.RestartSoftwareParameters; import org.apache.brooklyn.entity.software.base.SoftwareProcess.RestartSoftwareParameters.RestartMachineMode; @@ -84,7 +86,9 @@ public class SoftwareProcessDriverLifecycleEffectorTasks extends MachineLifecycl @Override public void run() { try { - postStartCustom(); + // There's no preStartCustom call in the restart effector to get the latch value + // so nothing to release here - pass the nop value. + postStartCustom(new AtomicReference<>(ReleaseableLatch.NOP)); postRestartCustom(); } finally { ServiceStateLogic.setExpectedState(entity(), Lifecycle.RUNNING); @@ -118,13 +122,13 @@ public class SoftwareProcessDriverLifecycleEffectorTasks extends MachineLifecycl } @Override - protected void preStartCustom(MachineLocation machine) { + protected void preStartCustom(MachineLocation machine, AtomicReference<ReleaseableLatch> startLatchRef) { entity().initDriver(machine); // Note: must only apply config-sensors after adding to locations and creating driver; // otherwise can't do things like acquire free port from location // or allowing driver to set up ports; but must be careful in init not to block on these! - super.preStartCustom(machine); + super.preStartCustom(machine, startLatchRef); entity().preStart(); } @@ -170,7 +174,7 @@ public class SoftwareProcessDriverLifecycleEffectorTasks extends MachineLifecycl } @Override - protected void postStartCustom() { + protected void postStartCustom(AtomicReference<ReleaseableLatch> startLatchRef) { entity().postDriverStart(); if (entity().connectedSensors) { // many impls aren't idempotent - though they should be! @@ -181,11 +185,12 @@ public class SoftwareProcessDriverLifecycleEffectorTasks extends MachineLifecycl } entity().waitForServiceUp(); entity().postStart(); + super.postStartCustom(startLatchRef); } @Override - protected void preStopConfirmCustom() { - super.preStopConfirmCustom(); + protected void preStopConfirmCustom(AtomicReference<ReleaseableLatch> stopLatchRef) { + super.preStopConfirmCustom(stopLatchRef); entity().preStopConfirmCustom(); } @@ -256,8 +261,8 @@ public class SoftwareProcessDriverLifecycleEffectorTasks extends MachineLifecycl } @Override - protected void postStopCustom() { - super.postStopCustom(); + protected void postStopCustom(AtomicReference<ReleaseableLatch> stopLatchRef) { + super.postStopCustom(stopLatchRef); entity().postStop(); } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/47aecce0/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java index da69a2a..0282ca9 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -64,6 +65,7 @@ import org.apache.brooklyn.core.location.cloud.CloudLocationConfig; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.mgmt.entitlement.Entitlements; import org.apache.brooklyn.core.sensor.BasicAttributeSensor; +import org.apache.brooklyn.core.sensor.ReleaseableLatch; import org.apache.brooklyn.entity.machine.MachineInitTasks; import org.apache.brooklyn.entity.machine.ProvidesProvisioningFlags; import org.apache.brooklyn.entity.software.base.SoftwareProcess; @@ -79,6 +81,7 @@ import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.task.DynamicTasks; import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.core.task.ValueResolverIterator; import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.guava.Maybe; @@ -110,10 +113,10 @@ import com.google.common.reflect.TypeToken; * <ul> * <li> {@link #startProcessesAtMachine(Supplier)} (required) * <li> {@link #stopProcessesAtMachine()} (required, but can be left blank if you assume the VM will be destroyed) - * <li> {@link #preStartCustom(MachineLocation)} - * <li> {@link #postStartCustom()} - * <li> {@link #preStopCustom()} - * <li> {@link #postStopCustom()} + * <li> {@link #preStartCustom(MachineLocation, AtomicReference)} + * <li> {@link #postStartCustom(AtomicReference)} + * <li> {@link #preStopConfirmCustom(AtomicReference)} + * <li> {@link #postStopCustom(AtomicReference)} * </ul> * Note methods at this level typically look after the {@link Attributes#SERVICE_STATE_ACTUAL} sensor. * @@ -124,6 +127,13 @@ public abstract class MachineLifecycleEffectorTasks { private static final Logger log = LoggerFactory.getLogger(MachineLifecycleEffectorTasks.class); + private static final ThreadLocal<AtomicReference<ReleaseableLatch>> RELEASEABLE_LATCH_TL = new ThreadLocal<AtomicReference<ReleaseableLatch>>() { + @Override + protected AtomicReference<ReleaseableLatch> initialValue() { + return new AtomicReference<ReleaseableLatch>(ReleaseableLatch.NOP); + } + }; + public static final ConfigKey<Boolean> ON_BOX_BASE_DIR_RESOLVED = ConfigKeys.newBooleanConfigKey( "onbox.base.dir.resolved", "Whether the on-box base directory has been resolved (for internal use)"); @@ -362,9 +372,15 @@ public abstract class MachineLifecycleEffectorTasks { Preconditions.checkState(locationS != null, "Unsupported location "+location+", when starting "+entity()); final Supplier<MachineLocation> locationSF = locationS; - preStartAtMachineAsync(locationSF); - DynamicTasks.queue("start (processes)", new StartProcessesAtMachineTask(locationSF)); - postStartAtMachineAsync(); + final AtomicReference<ReleaseableLatch> startLatchRef = new AtomicReference<>(); + RELEASEABLE_LATCH_TL.set(startLatchRef); + try { + preStartAtMachineAsync(locationSF); + DynamicTasks.queue("start (processes)", new StartProcessesAtMachineTask(locationSF)); + postStartAtMachineAsync(); + } finally { + RELEASEABLE_LATCH_TL.remove(); + } } private class StartProcessesAtMachineTask implements Runnable { @@ -446,15 +462,27 @@ public abstract class MachineLifecycleEffectorTasks { } } - /** Wraps a call to {@link #preStartCustom(MachineLocation)}, after setting the hostname and address. */ + /** + * Wraps a call to {@link #preStartCustom(MachineLocation)}, after setting the hostname and address. + * @deprecated since 0.11.0. Use {@link #preStartAtMachineAsync(Supplier, AtomicReference)} instead. + */ + @Deprecated protected void preStartAtMachineAsync(final Supplier<MachineLocation> machineS) { - DynamicTasks.queue("pre-start", new PreStartTask(machineS.get())); + preStartAtMachineAsync(machineS, RELEASEABLE_LATCH_TL.get()); + } + + /** Wraps a call to {@link #preStartCustom(MachineLocation, AtomicReference)}, after setting the hostname and address. */ + protected void preStartAtMachineAsync(final Supplier<MachineLocation> machineS, AtomicReference<ReleaseableLatch> startLatchRef) { + DynamicTasks.queue("pre-start", new PreStartTask(machineS.get(), startLatchRef)); } private class PreStartTask implements Runnable { final MachineLocation machine; - private PreStartTask(MachineLocation machine) { + final AtomicReference<ReleaseableLatch> startLatchRef; + + private PreStartTask(MachineLocation machine, AtomicReference<ReleaseableLatch> startLatchRef) { this.machine = machine; + this.startLatchRef = startLatchRef; } @Override public void run() { @@ -523,7 +551,12 @@ public abstract class MachineLifecycleEffectorTasks { } } resolveOnBoxDir(entity(), machine); - preStartCustom(machine); + RELEASEABLE_LATCH_TL.set(startLatchRef); + try { + preStartCustom(machine); + } finally { + RELEASEABLE_LATCH_TL.set(null); + } } } @@ -584,17 +617,22 @@ public abstract class MachineLifecycleEffectorTasks { "("+paramSummary+" not compatible: "+oldParam+" / "+newParam+"); "+newLoc+" may require manual removal."); } + /** @deprecated since 0.11.0. Use {@link #preStartCustom(MachineLocation, AtomicReference)} instead. */ + @Deprecated + protected void preStartCustom(MachineLocation machine) { + preStartCustom(machine, RELEASEABLE_LATCH_TL.get()); + } + /** * Default pre-start hooks. * <p> * Can be extended by subclasses if needed. */ - protected void preStartCustom(MachineLocation machine) { + protected void preStartCustom(MachineLocation machine, AtomicReference<ReleaseableLatch> startLatchRef) { ConfigToAttributes.apply(entity()); // Opportunity to block startup until other dependent components are available - Object val = entity().getConfig(SoftwareProcess.START_LATCH); - if (val != null) log.debug("{} finished waiting for start-latch {}; continuing...", entity(), val); + startLatchRef.set(waitForLatch(entity(), SoftwareProcess.START_LATCH)); } protected Map<String, Object> obtainProvisioningFlags(final MachineProvisioningLocation<?> location) { @@ -606,25 +644,48 @@ public abstract class MachineLifecycleEffectorTasks { protected abstract String startProcessesAtMachine(final Supplier<MachineLocation> machineS); + /** @deprecated since 0.11.0. Use {@link #postStartAtMachineAsync(AtomicReference)} instead. */ + @Deprecated protected void postStartAtMachineAsync() { - DynamicTasks.queue("post-start", new PostStartTask()); + postStartAtMachineAsync(RELEASEABLE_LATCH_TL.get()); + } + + protected void postStartAtMachineAsync(AtomicReference<ReleaseableLatch> startLatchRef) { + DynamicTasks.queue("post-start", new PostStartTask(startLatchRef)); } private class PostStartTask implements Runnable { + private AtomicReference<ReleaseableLatch> startLatchRef; + + public PostStartTask(AtomicReference<ReleaseableLatch> startLatchRef) { + this.startLatchRef = startLatchRef; + } + @Override public void run() { - postStartCustom(); + RELEASEABLE_LATCH_TL.set(startLatchRef); + try { + postStartCustom(); + } finally { + RELEASEABLE_LATCH_TL.remove(); + } } } + /** @deprecated since 0.11.0. Use {@link #postStartCustom(AtomicReference)} instead. */ + @Deprecated + protected void postStartCustom() { + postStartCustom(RELEASEABLE_LATCH_TL.get()); + } + /** * Default post-start hooks. * <p> * Can be extended by subclasses, and typically will wait for confirmation of start. * The service not set to running until after this. Also invoked following a restart. */ - protected void postStartCustom() { - // nothing by default + protected void postStartCustom(AtomicReference<ReleaseableLatch> startLatchRef) { + startLatchRef.get().release(entity()); } /** @@ -743,7 +804,13 @@ public abstract class MachineLifecycleEffectorTasks { } protected void doStop(ConfigBag parameters, Callable<StopMachineDetails<Integer>> stopTask) { - preStopConfirmCustom(); + AtomicReference<ReleaseableLatch> stopLatchRef = new AtomicReference<>(); + RELEASEABLE_LATCH_TL.set(stopLatchRef); + try { + preStopConfirmCustom(); + } finally { + RELEASEABLE_LATCH_TL.remove(); + } log.info("Stopping {} in {}", entity(), entity().getLocations()); @@ -856,7 +923,7 @@ public abstract class MachineLifecycleEffectorTasks { entity().sensors().set(SoftwareProcess.SERVICE_UP, false); ServiceStateLogic.setExpectedState(entity(), Lifecycle.STOPPED); - DynamicTasks.queue("post-stop", new PostStopCustomTask()); + DynamicTasks.queue("post-stop", new PostStopCustomTask(stopLatchRef)); if (log.isDebugEnabled()) log.debug("Stopped software process entity "+entity()); } @@ -922,9 +989,20 @@ public abstract class MachineLifecycleEffectorTasks { } private class PostStopCustomTask implements Callable<Void> { + private AtomicReference<ReleaseableLatch> stopLatchRef; + + public PostStopCustomTask(AtomicReference<ReleaseableLatch> stopLatchRef) { + this.stopLatchRef = stopLatchRef; + } + @Override public Void call() { - postStopCustom(); + RELEASEABLE_LATCH_TL.set(stopLatchRef); + try { + postStopCustom(); + } finally { + RELEASEABLE_LATCH_TL.remove(); + } return null; } } @@ -944,22 +1022,33 @@ public abstract class MachineLifecycleEffectorTasks { stopMode == StopMode.IF_NOT_STOPPED && !isStopped; } + /** @deprecated since 0.11.0. Use {@link #preStopConfirmCustom(AtomicReference)} instead. */ + @Deprecated + protected void preStopConfirmCustom() { + preStopConfirmCustom(RELEASEABLE_LATCH_TL.get()); + } + /** * Override to check whether stop can be executed. * Throw if stop should be aborted. */ - protected void preStopConfirmCustom() { + protected void preStopConfirmCustom(AtomicReference<ReleaseableLatch> stopLatchRef) { // Opportunity to block stop() until other dependent components are ready for it - Object val = entity().getConfig(SoftwareProcess.STOP_LATCH); - if (val != null) log.debug("{} finished waiting for stop-latch {}; continuing...", entity(), val); + stopLatchRef.set(waitForLatch(entity(), SoftwareProcess.STOP_LATCH)); } protected void preStopCustom() { // nothing needed here } + /** @deprecated 0.11.0. Use {@link #postStopCustom(AtomicReference)} instead. */ + @Deprecated protected void postStopCustom() { - // nothing needed here + postStopCustom(RELEASEABLE_LATCH_TL.get()); + } + + protected void postStopCustom(AtomicReference<ReleaseableLatch> stopLatchRef) { + stopLatchRef.get().release(entity()); } protected void preRestartCustom() { @@ -1083,4 +1172,39 @@ public abstract class MachineLifecycleEffectorTasks { entity().sensors().set(Attributes.SUBNET_ADDRESS, null); } + public static ReleaseableLatch waitForLatch(EntityInternal entity, ConfigKey<Boolean> configKey) { + Maybe<?> rawValue = entity.config().getRaw(configKey); + if (rawValue.isAbsent()) { + return ReleaseableLatch.NOP; + } else { + ValueResolverIterator<Boolean> iter = resolveLatchIterator(entity, rawValue.get(), configKey); + Maybe<ReleaseableLatch> releasableLatchMaybe = iter.next(ReleaseableLatch.class); + if (releasableLatchMaybe.isPresent()) { + ReleaseableLatch latch = releasableLatchMaybe.get(); + log.debug("{} finished waiting for {} (value {}); waiting to acquire the latch", new Object[] {entity, configKey, latch}); + Tasks.setBlockingDetails("Acquiring " + configKey + " " + latch); + try { + latch.acquire(entity); + } finally { + Tasks.resetBlockingDetails(); + } + log.debug("{} Acquired latch {} (value {}); continuing...", new Object[] {entity, configKey, latch}); + return latch; + } else { + // If iter.next() above returned absent due to a resolve error next line will throw with the cause + Boolean val = iter.last().get(); + if (rawValue != null) log.debug("{} finished waiting for {} (value {}); continuing...", new Object[] {entity, configKey, val}); + return ReleaseableLatch.NOP; + } + } + } + + private static ValueResolverIterator<Boolean> resolveLatchIterator(EntityInternal entity, Object val, ConfigKey<Boolean> key) { + return Tasks.resolving(val, Boolean.class) + .context(entity.getExecutionContext()) + .description("config " + key.getName()) + .iterator(); + } + + } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/47aecce0/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityLatchTest.java ---------------------------------------------------------------------- diff --git a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityLatchTest.java b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityLatchTest.java index 976a943..601c0f2 100644 --- a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityLatchTest.java +++ b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityLatchTest.java @@ -21,10 +21,12 @@ package org.apache.brooklyn.entity.software.base; import static org.apache.brooklyn.core.mgmt.BrooklynTaskTags.getEffectorName; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntitySpec; @@ -32,25 +34,31 @@ import org.apache.brooklyn.api.location.LocationSpec; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.entity.Attributes; +import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.sensor.DependentConfiguration; +import org.apache.brooklyn.core.sensor.ReleaseableLatch; import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; +import org.apache.brooklyn.entity.group.DynamicCluster; import org.apache.brooklyn.entity.software.base.SoftwareProcessEntityTest.MyService; import org.apache.brooklyn.entity.software.base.SoftwareProcessEntityTest.SimulatedDriver; -import org.apache.brooklyn.entity.stock.BasicEntity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; import org.apache.brooklyn.location.byon.FixedListMachineProvisioningLocation; import org.apache.brooklyn.location.ssh.SshMachineLocation; import org.apache.brooklyn.test.Asserts; import org.apache.brooklyn.util.core.task.TaskInternal; +import org.apache.brooklyn.util.guava.Maybe; import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import com.google.common.base.Function; +import com.google.common.base.Functions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -58,11 +66,15 @@ import com.google.common.collect.Lists; public class SoftwareProcessEntityLatchTest extends BrooklynAppUnitTestSupport { + // NB: These tests don't actually require ssh to localhost -- only that 'localhost' resolves. @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(SoftwareProcessEntityLatchTest.class); + private static final ImmutableList<String> SOFTWARE_PROCESS_START_TASKS = ImmutableList.of("setup", "copyInstallResources", "install", "customize", "copyRuntimeResources", "launch"); + private static final ImmutableList<String> SOFTWARE_PROCESS_STOP_TASKS = ImmutableList.<String>builder().addAll(SOFTWARE_PROCESS_START_TASKS).add("stop").build(); + private SshMachineLocation machine; private FixedListMachineProvisioningLocation<SshMachineLocation> loc; @@ -81,78 +93,99 @@ public class SoftwareProcessEntityLatchTest extends BrooklynAppUnitTestSupport { loc.addMachine(machine); return loc; } - - @Test - public void testStartLatchBlocks() throws Exception { - runTestLatchBlocks(SoftwareProcess.START_LATCH, ImmutableList.<String>of()); - } - - @Test - public void testSetupLatchBlocks() throws Exception { - runTestLatchBlocks(SoftwareProcess.SETUP_LATCH, ImmutableList.<String>of()); - } - - @Test - public void testIntallResourcesLatchBlocks() throws Exception { - runTestLatchBlocks(SoftwareProcess.INSTALL_RESOURCES_LATCH, ImmutableList.of("setup")); - } - - @Test - public void testInstallLatchBlocks() throws Exception { - runTestLatchBlocks(SoftwareProcess.INSTALL_LATCH, ImmutableList.of("setup", "copyInstallResources")); + + @DataProvider + public Object[][] latchAndTaskNamesProvider() { + return new Object[][] { + {SoftwareProcess.START_LATCH, ImmutableList.<String>of()}, + {SoftwareProcess.SETUP_LATCH, ImmutableList.<String>of()}, + {SoftwareProcess.INSTALL_RESOURCES_LATCH, ImmutableList.of("setup")}, + {SoftwareProcess.INSTALL_LATCH, ImmutableList.of("setup", "copyInstallResources")}, + {SoftwareProcess.CUSTOMIZE_LATCH, ImmutableList.of("setup", "copyInstallResources", "install")}, + {SoftwareProcess.RUNTIME_RESOURCES_LATCH, ImmutableList.of("setup", "copyInstallResources", "install", "customize")}, + {SoftwareProcess.LAUNCH_LATCH, ImmutableList.of("setup", "copyInstallResources", "install", "customize", "copyRuntimeResources")}, + {SoftwareProcess.STOP_LATCH, SOFTWARE_PROCESS_START_TASKS}, + }; } - @Test - public void testCustomizeLatchBlocks() throws Exception { - runTestLatchBlocks(SoftwareProcess.CUSTOMIZE_LATCH, ImmutableList.of("setup", "copyInstallResources", "install")); + @Test(dataProvider="latchAndTaskNamesProvider") + public void testBooleanLatchBlocks(final ConfigKey<Boolean> latch, List<String> preLatchEvents) throws Exception { + doTestLatchBlocks(latch, preLatchEvents, Boolean.TRUE, Functions.<Void>constant(null)); } - @Test - public void testRuntimeResourcesLatchBlocks() throws Exception { - runTestLatchBlocks(SoftwareProcess.RUNTIME_RESOURCES_LATCH, ImmutableList.of("setup", "copyInstallResources", "install", "customize")); - } + @Test(dataProvider="latchAndTaskNamesProvider") + public void testReleaseableLatchBlocks(final ConfigKey<Boolean> latch, final List<String> preLatchEvents) throws Exception { + final ReleaseableLatch latchSemaphore = ReleaseableLatch.Factory.newMaxConcurrencyLatch(0); + doTestLatchBlocks(latch, preLatchEvents, latchSemaphore, new Function<MyService, Void>() { + @Override + public Void apply(MyService entity) { + String taskName = (latch == SoftwareProcess.STOP_LATCH) ? "stop" : "start"; + assertEffectorBlockingDetailsEventually(entity, taskName, "Acquiring " + latch + " " + latchSemaphore); + assertDriverEventsEquals(entity, preLatchEvents); + latchSemaphore.release(entity); + return null; + } + }); - @Test - public void testLaunchLatchBlocks() throws Exception { - runTestLatchBlocks(SoftwareProcess.LAUNCH_LATCH, ImmutableList.of("setup", "copyInstallResources", "install", "customize", "copyRuntimeResources")); } - @Test - public void testStopLatchBlocks() throws Exception { - final AttributeSensor<Boolean> stopper = Sensors.newBooleanSensor("stop.now"); - final BasicEntity triggerEntity = app.createAndManageChild(EntitySpec.create(BasicEntity.class)); + public void doTestLatchBlocks(ConfigKey<Boolean> latch, List<String> preLatchEvents, Object latchValue, Function<? super MyService, Void> customAssertFn) throws Exception { + final AttributeSensor<Object> latchSensor = Sensors.newSensor(Object.class, "latch"); final MyService entity = app.createAndManageChild(EntitySpec.create(MyService.class) - .configure(SoftwareProcess.STOP_LATCH, DependentConfiguration.attributeWhenReady(app, stopper))); - + .configure(ConfigKeys.newConfigKey(Object.class, latch.getName()), (Object)DependentConfiguration.attributeWhenReady(app, latchSensor))); + + final Task<Void> task; final Task<Void> startTask = Entities.invokeEffector(app, app, MyService.START, ImmutableMap.of("locations", ImmutableList.of(loc))); - triggerEntity.sensors().set(Attributes.SERVICE_UP, true); - startTask.get(Duration.THIRTY_SECONDS); + if (latch != SoftwareProcess.STOP_LATCH) { + task = startTask; + } else { + startTask.get(Duration.THIRTY_SECONDS); + task = Entities.invokeEffector(app, app, MyService.STOP); + } - final Task<Void> stopTask = Entities.invokeEffector(app, app, MyService.STOP); + assertEffectorBlockingDetailsEventually(entity, task.getDisplayName(), "Waiting for config " + latch.getName()); + assertDriverEventsEquals(entity, preLatchEvents); + assertFalse(task.isDone()); - assertEffectorBlockingDetailsEventually(entity, MyService.STOP.getName(), "Waiting for config " + SoftwareProcess.STOP_LATCH.getName()); + app.sensors().set(latchSensor, latchValue); - app.sensors().set(stopper, true); - stopTask.get(Asserts.DEFAULT_LONG_TIMEOUT); + customAssertFn.apply(entity); - assertDriverEventsEquals(entity, ImmutableList.of("setup", "copyInstallResources", "install", "customize", "copyRuntimeResources", "launch", "stop")); + task.get(Duration.THIRTY_SECONDS); + assertDriverEventsEquals(entity, getLatchPostTasks(latch)); } + @Test(dataProvider="latchAndTaskNamesProvider", timeOut=Asserts.THIRTY_SECONDS_TIMEOUT_MS) + public void testConcurrency(ConfigKey<Boolean> latch, List<String> _) throws Exception { + final int maxConcurrency = 2; + final ReleaseableLatch latchSemaphore = ReleaseableLatch.Factory.newMaxConcurrencyLatch(maxConcurrency); + final AttributeSensor<Object> latchSensor = Sensors.newSensor(Object.class, "latch"); + final CountingLatch countingLatch = new CountingLatch(latchSemaphore, maxConcurrency); + @SuppressWarnings({"unused"}) + DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class) + .configure(DynamicCluster.INITIAL_SIZE, maxConcurrency*2) + .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(MyService.class) + .configure(ConfigKeys.newConfigKey(Object.class, latch.getName()), (Object)DependentConfiguration.attributeWhenReady(app, latchSensor)))); + app.sensors().set(latchSensor, countingLatch); + final Task<Void> startTask = Entities.invokeEffector(app, app, MyService.START, ImmutableMap.of("locations", ImmutableList.of(app.newLocalhostProvisioningLocation()))); + startTask.get(); + final Task<Void> stopTask = Entities.invokeEffector(app, app, MyService.STOP, ImmutableMap.<String, Object>of()); + stopTask.get(); + assertEquals(countingLatch.getCounter(), 0); + // Check we have actually used the latch + assertNotEquals(countingLatch.getMaxCounter(), 0, "Latch not acquired at all"); + // In theory this is 0 < maxCnt <= maxConcurrency contract, but in practice + // we should always reach the maximum due to the sleeps below. + // Change if found to fail in the wild. + assertEquals(countingLatch.getMaxCounter(), maxConcurrency); + } - protected void runTestLatchBlocks(final ConfigKey<Boolean> latch, List<String> preLatchEvents) throws Exception { - final BasicEntity triggerEntity = app.createAndManageChild(EntitySpec.create(BasicEntity.class)); - final MyService entity = app.createAndManageChild(EntitySpec.create(MyService.class) - .configure(latch, DependentConfiguration.attributeWhenReady(triggerEntity, Attributes.SERVICE_UP))); - - final Task<Void> task = Entities.invokeEffector(app, app, MyService.START, ImmutableMap.of("locations", ImmutableList.of(loc))); - - assertEffectorBlockingDetailsEventually(entity, MyService.START.getName(), "Waiting for config " + latch.getName()); - assertDriverEventsEquals(entity, preLatchEvents); - - assertFalse(task.isDone()); - triggerEntity.sensors().set(Attributes.SERVICE_UP, true); - task.get(Duration.THIRTY_SECONDS); - assertDriverEventsEquals(entity, ImmutableList.of("setup", "copyInstallResources", "install", "customize", "copyRuntimeResources", "launch")); + protected List<String> getLatchPostTasks(final ConfigKey<?> latch) { + if (latch == SoftwareProcess.STOP_LATCH) { + return SOFTWARE_PROCESS_STOP_TASKS; + } else { + return SOFTWARE_PROCESS_START_TASKS; + } } private void assertDriverEventsEquals(MyService entity, List<String> expectedEvents) { @@ -189,4 +222,53 @@ public class SoftwareProcessEntityLatchTest extends BrooklynAppUnitTestSupport { } throw new IllegalStateException("No blocking details for "+task+" (walked task chain "+taskChain+")"); } + + private static class CountingLatch implements ReleaseableLatch { + ReleaseableLatch delegate; + AtomicInteger cnt = new AtomicInteger(); + AtomicInteger maxCnt = new AtomicInteger(); + private int maxConcurrency; + + public CountingLatch(ReleaseableLatch delegate, int maxConcurrency) { + this.delegate = delegate; + this.maxConcurrency = maxConcurrency; + } + + public Boolean get() { + return delegate.get(); + } + + public Maybe<Boolean> getImmediately() { + return delegate.getImmediately(); + } + + public void acquire(Entity caller) { + delegate.acquire(caller); + assertCount(cnt.incrementAndGet()); + } + + public void release(Entity caller) { + cnt.decrementAndGet(); + delegate.release(caller); + } + + public int getMaxCounter() { + return maxCnt.get(); + } + public int getCounter() { + return cnt.get(); + } + private void assertCount(int newCnt) { + synchronized(maxCnt) { + maxCnt.set(Math.max(newCnt, maxCnt.get())); + } + assertTrue(newCnt <= maxConcurrency, "maxConcurrency limit failed at " + newCnt + " (max " + maxConcurrency + ")"); + if (newCnt < maxConcurrency) { + Time.sleep(Duration.millis(100)); + } else { + Time.sleep(Duration.millis(20)); + } + } + + } } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/47aecce0/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/mysql/DynamicToyMySqlEntityBuilder.java ---------------------------------------------------------------------- diff --git a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/mysql/DynamicToyMySqlEntityBuilder.java b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/mysql/DynamicToyMySqlEntityBuilder.java index f5193ed..c25dc02 100644 --- a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/mysql/DynamicToyMySqlEntityBuilder.java +++ b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/mysql/DynamicToyMySqlEntityBuilder.java @@ -19,6 +19,7 @@ package org.apache.brooklyn.entity.software.base.test.mysql; import java.io.File; +import java.util.concurrent.atomic.AtomicReference; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntityInitializer; @@ -29,6 +30,7 @@ import org.apache.brooklyn.api.location.OsDetails; import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks; import org.apache.brooklyn.core.entity.Attributes; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; +import org.apache.brooklyn.core.sensor.ReleaseableLatch; import org.apache.brooklyn.entity.software.base.lifecycle.MachineLifecycleEffectorTasks; import org.apache.brooklyn.entity.stock.BasicStartable; import org.slf4j.Logger; @@ -124,7 +126,7 @@ public class DynamicToyMySqlEntityBuilder { return "submitted start"; } @Override - protected void postStartCustom() { + protected void postStartCustom(AtomicReference<ReleaseableLatch> startLatchRef) { // if it's still up after 5s assume we are good Time.sleep(Duration.FIVE_SECONDS); if (!DynamicTasks.queue(SshEffectorTasks.isPidFromFileRunning(dir(entity)+"/*/data/*.pid")).get()) { @@ -150,6 +152,7 @@ public class DynamicToyMySqlEntityBuilder { // Really should set this with a Feed that checks pid periodically. // Should this instead be using SERVICE_NOT_UP_INDICATORS? entity().sensors().set(Attributes.SERVICE_UP, true); + super.postStartCustom(startLatchRef); } @Override http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/47aecce0/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java ---------------------------------------------------------------------- diff --git a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java index 0d6e770..9097b74 100644 --- a/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java +++ b/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java @@ -73,6 +73,10 @@ import groovy.lang.Closure; @Beta public class Asserts { + // Used in annotations so needs to be a constant - can't be initialized similarly to DEFAULT_LONG_TIMEOUT + // TODO Can we force this by default on all unit tests, beforeMethod, afterMethod methods? + public static final long THIRTY_SECONDS_TIMEOUT_MS = 30000; + /** * Timeout for use when something should happen. This is the *default timeout* that should * be used by tests (unless that test is asserting performance). @@ -90,7 +94,7 @@ public class Asserts { static { String defaultTimeout = System.getProperty("brooklyn.test.defaultTimeout"); if (defaultTimeout == null){ - DEFAULT_LONG_TIMEOUT = Duration.THIRTY_SECONDS; + DEFAULT_LONG_TIMEOUT = Duration.millis(THIRTY_SECONDS_TIMEOUT_MS); } else { DEFAULT_LONG_TIMEOUT = Duration.of(defaultTimeout); } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/47aecce0/utils/common/src/main/java/org/apache/brooklyn/util/guava/Maybe.java ---------------------------------------------------------------------- diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/guava/Maybe.java b/utils/common/src/main/java/org/apache/brooklyn/util/guava/Maybe.java index 4be923a..67bca18 100644 --- a/utils/common/src/main/java/org/apache/brooklyn/util/guava/Maybe.java +++ b/utils/common/src/main/java/org/apache/brooklyn/util/guava/Maybe.java @@ -157,6 +157,11 @@ public abstract class Maybe<T> implements Serializable, Supplier<T> { return ofDisallowingNull(value); } + /** Creates a new Maybe object out of the {@link Optional} argument */ + public static <T> Maybe<T> fromOptional(Optional<T> value) { + return Maybe.fromNullable(value.orNull()); + } + /** creates an instance wrapping a {@link SoftReference}, so it might go absent later on. * if null is supplied the result is a present null. */ public static <T> Maybe<T> soft(@Nonnull T value) {