Adds maxConcurrentChildCommands parameter to DynamicCluster

The option configures the maximum number of simultaneous Startable
effector invocations that will be made on members of the group.


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/4e30074c
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/4e30074c
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/4e30074c

Branch: refs/heads/master
Commit: 4e30074ca9b09a49c6a4b052d6f28d05608b36eb
Parents: 5b9f896
Author: Sam Corbett <sam.corb...@cloudsoftcorp.com>
Authored: Tue Nov 29 16:11:12 2016 +0000
Committer: Sam Corbett <sam.corb...@cloudsoftcorp.com>
Committed: Tue Nov 29 16:11:12 2016 +0000

----------------------------------------------------------------------
 .../apache/brooklyn/core/entity/Entities.java   |   2 +-
 .../brooklyn/entity/group/DynamicCluster.java   |  11 +-
 .../entity/group/DynamicClusterImpl.java        | 153 ++++++++++++++++++-
 .../entity/group/DynamicClusterRebindTest.java  |  54 +++++++
 .../entity/group/DynamicClusterTest.java        | 132 ++++++++++++++++
 5 files changed, 343 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java 
b/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
index 2821652..69670ec 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
@@ -625,7 +625,7 @@ public class Entities {
     /**
      * Return all descendants of given entity matching the given predicate and 
optionally the entity itself.
      * 
-     * @see {@link EntityPredicates} for useful second arguments.
+     * @see EntityPredicates
      */
     @SuppressWarnings("unused")
     public static Iterable<Entity> descendants(Entity root, Predicate<? super 
Entity> matching, boolean includeSelf) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java 
b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
index f2112e8..3f62f82 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
@@ -103,7 +103,7 @@ public interface DynamicCluster extends AbstractGroup, 
Cluster, MemberReplaceabl
             "dynamiccluster.restartMode", 
             "How this cluster should handle restarts; "
             + "by default it is disallowed, but this key can specify a 
different mode. "
-            + "Modes supported by dynamic cluster are 'off', 'sequqential', or 
'parallel'. "
+            + "Modes supported by dynamic cluster are 'off', 'sequential', or 
'parallel'. "
             + "However subclasses can define their own modes or may ignore 
this.", null);
 
     @SetFromFlag("quarantineFailedEntities")
@@ -183,6 +183,15 @@ public interface DynamicCluster extends AbstractGroup, 
Cluster, MemberReplaceabl
     ConfigKey<Integer> CLUSTER_MEMBER_ID = ConfigKeys.newIntegerConfigKey(
             "cluster.member.id", "The unique ID number (sequential) of a 
member of a cluster");
 
+    @Beta
+    @SetFromFlag("maxConcurrentChildCommands")
+    ConfigKey<Integer> MAX_CONCURRENT_CHILD_COMMANDS = 
ConfigKeys.builder(Integer.class)
+            .name("dynamiccluster.maxConcurrentChildCommands")
+            .description("[Beta] The maximum number of effector invocations 
that will be made on children at once " +
+                    "(e.g. start, stop, restart). Any value null or less than 
or equal to zero means invocations are unbounded")
+            .defaultValue(0)
+            .build();
+
     AttributeSensor<List<Location>> SUB_LOCATIONS = new 
BasicAttributeSensor<List<Location>>(
             new TypeToken<List<Location>>() {},
             "dynamiccluster.subLocations", "Locations for each availability 
zone to use");

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java 
b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
index 8725b12..4ed0ac0 100644
--- 
a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
+++ 
b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
@@ -30,10 +30,12 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import javax.annotation.Nullable;
 
+import org.apache.brooklyn.api.effector.Effector;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.entity.Group;
@@ -98,6 +100,8 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import com.google.common.reflect.TypeToken;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 
 /**
  * A cluster of entities that can dynamically increase or decrease the number 
of entities.
@@ -108,6 +112,12 @@ public class DynamicClusterImpl extends AbstractGroupImpl 
implements DynamicClus
     private static final AttributeSensor<Supplier<Integer>> 
NEXT_CLUSTER_MEMBER_ID = Sensors.newSensor(new TypeToken<Supplier<Integer>>() 
{},
             "next.cluster.member.id", "Returns the ID number of the next 
member to be added");
 
+    /**
+     * Controls the maximum number of effector invocations the cluster will 
make on members at once.
+     * Only used if {@link #MAX_CONCURRENT_CHILD_COMMANDS} is configured.
+     */
+    private transient Semaphore childTaskSemaphore;
+
     private volatile FunctionFeed clusterOneAndAllMembersUp;
 
     // TODO better mechanism for arbitrary class name to instance type coercion
@@ -212,9 +222,16 @@ public class DynamicClusterImpl extends AbstractGroupImpl 
implements DynamicClus
     public void init() {
         super.init();
         initialiseMemberId();
+        initialiseTaskPermitSemaphore();
         connectAllMembersUp();
     }
 
+    @Override
+    public void rebind() {
+        super.rebind();
+        initialiseTaskPermitSemaphore();
+    }
+
     private void initialiseMemberId() {
         synchronized (mutex) {
             if (sensors().get(NEXT_CLUSTER_MEMBER_ID) == null) {
@@ -223,6 +240,17 @@ public class DynamicClusterImpl extends AbstractGroupImpl 
implements DynamicClus
         }
     }
 
+    private void initialiseTaskPermitSemaphore() {
+        synchronized (mutex) {
+            if (getChildTaskSemaphore() == null) {
+                Integer maxChildTasks = 
config().get(MAX_CONCURRENT_CHILD_COMMANDS);
+                if (maxChildTasks != null && maxChildTasks > 0) {
+                    childTaskSemaphore = new Semaphore(maxChildTasks);
+                }
+            }
+        }
+    }
+
     private void connectAllMembersUp() {
         clusterOneAndAllMembersUp = FunctionFeed.builder()
                 .entity(this)
@@ -551,8 +579,9 @@ public class DynamicClusterImpl extends AbstractGroupImpl 
implements DynamicClus
                 Iterables.filter(getChildren(), 
Predicates.and(Predicates.instanceOf(Startable.class), 
EntityPredicates.isManaged()))));
         } else if ("parallel".equalsIgnoreCase(mode)) {
             ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING);
-            DynamicTasks.queue(Effectors.invocationParallel(Startable.RESTART, 
null, 
-                Iterables.filter(getChildren(), 
Predicates.and(Predicates.instanceOf(Startable.class), 
EntityPredicates.isManaged()))));
+            for (Entity member : Iterables.filter(getChildren(), 
Predicates.and(Predicates.instanceOf(Startable.class), 
EntityPredicates.isManaged()))) {
+                DynamicTasks.queue(newThrottledEffectorTask(member, 
Startable.RESTART, Collections.emptyMap()));
+            }
         } else {
             throw new IllegalArgumentException("Unknown 
"+RESTART_MODE.getName()+" '"+mode+"'");
         }
@@ -788,7 +817,12 @@ public class DynamicClusterImpl extends AbstractGroupImpl 
implements DynamicClus
 
         // FIXME symmetry in order of added as child, managed, started, and 
added to group
         final Iterable<Entity> removedStartables = (Iterable<Entity>) 
(Iterable<?>) Iterables.filter(removedEntities, Startable.class);
-        Task<?> invoke = Entities.invokeEffector(this, removedStartables, 
Startable.STOP, Collections.<String,Object>emptyMap());
+        ImmutableList.Builder<Task<?>> tasks = ImmutableList.builder();
+        for (Entity member : removedStartables) {
+            tasks.add(newThrottledEffectorTask(member, Startable.STOP, 
Collections.emptyMap()));
+        }
+        Task<?> invoke = Tasks.parallel(tasks.build());
+        DynamicTasks.queueIfPossible(invoke).orSubmitAsync();
         try {
             invoke.get();
             return removedEntities;
@@ -826,8 +860,11 @@ public class DynamicClusterImpl extends AbstractGroupImpl 
implements DynamicClus
             addedEntities.add(entity);
             addedEntityLocations.put(entity, loc);
             if (entity instanceof Startable) {
+                // First members are used when subsequent members need some 
attributes from them
+                // before they start; make sure they're in the first batch.
+                boolean privileged = 
Boolean.TRUE.equals(entity.sensors().get(AbstractGroup.FIRST_MEMBER));
                 Map<String, ?> args = ImmutableMap.of("locations", 
MutableList.builder().addIfNotNull(loc).buildImmutable());
-                Task<Void> task = Effectors.invocation(entity, 
Startable.START, args).asTask();
+                Task<?> task = newThrottledEffectorTask(entity, 
Startable.START, args, privileged);
                 tasks.put(entity, task);
             }
         }
@@ -1041,14 +1078,116 @@ public class DynamicClusterImpl extends 
AbstractGroupImpl implements DynamicClus
 
     protected void stopAndRemoveNode(Entity member) {
         removeMember(member);
-
         try {
             if (member instanceof Startable) {
-                Task<?> task = member.invoke(Startable.STOP, 
Collections.<String,Object>emptyMap());
+                Task<?> task = newThrottledEffectorTask(member, 
Startable.STOP, Collections.<String, Object>emptyMap());
+                DynamicTasks.queueIfPossible(task).orSubmitAsync();
                 task.getUnchecked();
             }
         } finally {
             Entities.unmanage(member);
         }
     }
+
+    @Nullable
+    protected Semaphore getChildTaskSemaphore() {
+        return childTaskSemaphore;
+    }
+
+    /**
+     * @return An unprivileged effector task.
+     * @see #newThrottledEffectorTask(Entity, Effector, Map, boolean)
+     */
+    protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T> 
effector, Map<?, ?> arguments) {
+        return newThrottledEffectorTask(target, effector, arguments, false);
+    }
+
+    /**
+     * Creates tasks that obtain permits from {@link #childTaskSemaphore} 
before invoking <code>effector</code>
+     * on <code>target</code>. Permits are released in a {@link 
ListenableFuture#addListener listener}. No
+     * permits are obtained if {@link #childTaskSemaphore} is 
<code>null</code>.
+     * @param target Entity to invoke effector on
+     * @param effector Effector to invoke on target
+     * @param arguments Effector arguments
+     * @param isPrivileged If true the method obtains a permit from {@link 
#childTaskSemaphore}
+     *                     immediately and returns the effector invocation 
task, otherwise it
+     *                     returns a task that sequentially obtains a permit 
then runs the effector.
+     * @return An unsubmitted task.
+     */
+    protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T> 
effector, Map<?, ?> arguments, boolean isPrivileged) {
+        final Task<?> toSubmit;
+        final Task<T> effectorTask = Effectors.invocation(target, effector, 
arguments).asTask();
+        if (getChildTaskSemaphore() != null) {
+            // permitObtained communicates to the release task whether the 
permit should really be released
+            // or not. ObtainPermit sets it to true when a permit is acquired.
+            final AtomicBoolean permitObtained = new AtomicBoolean();
+            final String description = "Waiting for permit to run " + 
effector.getName() + " on " + target;
+            final Runnable obtain = new ObtainPermit(getChildTaskSemaphore(), 
description, permitObtained);
+            // Acquire the permit now for the privileged task and just queue 
the effector invocation.
+            // If it's unprivileged then queue a task to obtain a permit first.
+            if (isPrivileged) {
+                obtain.run();
+                toSubmit = effectorTask;
+            } else {
+                Task<?> obtainMutex = Tasks.builder()
+                        .description(description)
+                        .body(new ObtainPermit(getChildTaskSemaphore(), 
description, permitObtained))
+                        .build();
+                toSubmit = Tasks.sequential(
+                        "Waiting for permit then running " + 
effector.getName() + " on " + target,
+                        obtainMutex, effectorTask);
+            }
+            toSubmit.addListener(new ReleasePermit(getChildTaskSemaphore(), 
permitObtained), MoreExecutors.sameThreadExecutor());
+        } else {
+            toSubmit = effectorTask;
+        }
+        return toSubmit;
+    }
+
+    private static class ObtainPermit implements Runnable {
+        private final Semaphore permit;
+        private final String description;
+        private final AtomicBoolean hasObtainedPermit;
+
+        private ObtainPermit(Semaphore permit, String description, 
AtomicBoolean hasObtainedPermit) {
+            this.permit = permit;
+            this.description = description;
+            this.hasObtainedPermit = hasObtainedPermit;
+        }
+
+        @Override
+        public void run() {
+            String oldDetails = Tasks.setBlockingDetails(description);
+            LOG.debug("{} acquiring permit from {}", this, permit);
+            try {
+                permit.acquire();
+                hasObtainedPermit.set(true);
+            } catch (InterruptedException e) {
+                throw Exceptions.propagate(e);
+            } finally {
+                Tasks.setBlockingDetails(oldDetails);
+            }
+        }
+    }
+
+    private static class ReleasePermit implements Runnable {
+        private final Semaphore permit;
+        private final AtomicBoolean wasPermitObtained;
+
+        private ReleasePermit(Semaphore permit, AtomicBoolean 
wasPermitObtained) {
+            this.permit = permit;
+            this.wasPermitObtained = wasPermitObtained;
+        }
+
+        @Override
+        public void run() {
+            if (wasPermitObtained.get()) {
+                LOG.debug("{} releasing permit from {}", this, permit);
+                permit.release();
+            } else {
+                LOG.debug("{} not releasing a permit from {} because it 
appears one was never obtained", this, permit);
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java
 
b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java
new file mode 100644
index 0000000..bbf3a2a
--- /dev/null
+++ 
b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.entity.group;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
+import org.apache.brooklyn.core.mgmt.rebind.RebindOptions;
+import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class DynamicClusterRebindTest extends RebindTestFixtureWithApp {
+
+    @Test
+    public void testThrottleAppliesAfterRebind() throws Exception {
+        DynamicCluster cluster = 
origApp.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1)
+                .configure(DynamicCluster.INITIAL_SIZE, 1)
+                .configure(DynamicCluster.MEMBER_SPEC, 
EntitySpec.create(DynamicClusterTest.ThrowOnAsyncStartEntity.class))
+                        
.configure(DynamicClusterTest.ThrowOnAsyncStartEntity.COUNTER, new 
AtomicInteger()));
+        
app().start(ImmutableList.of(origApp.newLocalhostProvisioningLocation()));
+        EntityAsserts.assertAttributeEquals(cluster, 
DynamicCluster.GROUP_SIZE, 1);
+
+        rebind(RebindOptions.create().terminateOrigManagementContext(true));
+        cluster = Entities.descendants(app(), 
DynamicCluster.class).iterator().next();
+        cluster.resize(10);
+        EntityAsserts.assertAttributeEqualsEventually(cluster, 
DynamicCluster.GROUP_SIZE, 10);
+        EntityAsserts.assertAttributeEquals(cluster, 
Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java 
b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
index 36d3c39..c3e7d7f 100644
--- 
a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
+++ 
b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -41,12 +42,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.brooklyn.api.effector.Effector;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.entity.ImplementedBy;
 import org.apache.brooklyn.api.location.Location;
 import org.apache.brooklyn.api.location.NoMachinesAvailableException;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.entity.Attributes;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityAsserts;
@@ -59,6 +64,7 @@ import org.apache.brooklyn.core.entity.trait.FailingEntity;
 import org.apache.brooklyn.core.entity.trait.Resizable;
 import org.apache.brooklyn.core.location.SimulatedLocation;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.core.sensor.DependentConfiguration;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.core.test.entity.TestEntityImpl;
@@ -67,10 +73,15 @@ import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.collections.QuorumCheck.QuorumChecks;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Function;
@@ -1225,4 +1236,125 @@ public class DynamicClusterTest extends 
BrooklynAppUnitTestSupport {
         assertEquals(found.size(), expectedNonFirstCount);
     }
 
+    @DataProvider
+    public Object[][] maxConcurrentCommandsTestProvider() {
+        return new Object[][]{{1}, {2}, {3}};
+    }
+
+    @Test(dataProvider = "maxConcurrentCommandsTestProvider")
+    public void 
testEntitiesStartAndStopSequentiallyWhenMaxConcurrentCommandsIsOne(int 
maxConcurrentCommands) {
+        EntitySpec<ThrowOnAsyncStartEntity> memberSpec = 
EntitySpec.create(ThrowOnAsyncStartEntity.class)
+                .configure(ThrowOnAsyncStartEntity.MAX_CONCURRENCY, 
maxConcurrentCommands)
+                .configure(ThrowOnAsyncStartEntity.COUNTER, new 
AtomicInteger());
+        DynamicCluster cluster = 
app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 
maxConcurrentCommands)
+                .configure(DynamicCluster.INITIAL_SIZE, 10)
+                .configure(DynamicCluster.MEMBER_SPEC, memberSpec));
+        app.start(ImmutableList.of(app.newSimulatedLocation()));
+        assertEquals(cluster.sensors().get(Attributes.SERVICE_STATE_ACTUAL), 
Lifecycle.RUNNING);
+    }
+
+    // Tests handling of the first member of a cluster by asserting that a 
group, whose
+    // other members wait for the first, always starts.
+    @Test
+    public void testFirstMemberInFirstBatchWhenMaxConcurrentCommandsSet() 
throws Exception {
+        final AtomicInteger counter = new AtomicInteger();
+        final DynamicCluster cluster = 
app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1)
+                .configure(DynamicCluster.INITIAL_SIZE, 3));
+
+        Task<Boolean> firstMemberUp = Tasks.<Boolean>builder()
+                .body(new Callable<Boolean>() {
+                    @Override
+                    public Boolean call() throws Exception {
+                        Task<Entity> first = 
DependentConfiguration.attributeWhenReady(cluster, DynamicCluster.FIRST);
+                        DynamicTasks.queueIfPossible(first).orSubmitAsync();
+                        final Entity source = first.get();
+                        final Task<Boolean> booleanTask = 
DependentConfiguration.attributeWhenReady(source, Attributes.SERVICE_UP);
+                        
DynamicTasks.queueIfPossible(booleanTask).orSubmitAsync();
+                        return booleanTask.get();
+                    }
+                })
+                .build();
+
+        EntitySpec<ThrowOnAsyncStartEntity> firstMemberSpec = 
EntitySpec.create(ThrowOnAsyncStartEntity.class)
+                .configure(ThrowOnAsyncStartEntity.COUNTER, counter)
+                .configure(ThrowOnAsyncStartEntity.START_LATCH, true);
+
+        EntitySpec<ThrowOnAsyncStartEntity> memberSpec = 
EntitySpec.create(ThrowOnAsyncStartEntity.class)
+                .configure(ThrowOnAsyncStartEntity.COUNTER, counter)
+                .configure(ThrowOnAsyncStartEntity.START_LATCH, firstMemberUp);
+
+        cluster.config().set(DynamicCluster.FIRST_MEMBER_SPEC, 
firstMemberSpec);
+        cluster.config().set(DynamicCluster.MEMBER_SPEC, memberSpec);
+
+        // app.start blocks so in the failure case this test would block 
forever.
+        Asserts.assertReturnsEventually(new Runnable() {
+            public void run() {
+                app.start(ImmutableList.of(app.newSimulatedLocation()));
+                EntityAsserts.assertAttributeEqualsEventually(cluster, 
Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+            }
+        }, Asserts.DEFAULT_LONG_TIMEOUT);
+    }
+
+    @Test
+    public void 
testChildCommandPermitNotReleasedWhenMemberStartTaskCancelledBeforeSubmission() 
{
+        // Tests that permits are not released when their start task is 
cancelled.
+        // Expected behaviour is:
+        // - permit obtained for first member. cancelled task submitted. 
permit released.
+        // - no permit obtained for second member. cancelled task submitted. 
no permit released.
+        DynamicCluster cluster = 
app.createAndManageChild(EntitySpec.create(CancelEffectorInvokeCluster.class)
+                .configure(DynamicCluster.MEMBER_SPEC, 
EntitySpec.create(TestEntity.class))
+                .configure(DynamicCluster.INITIAL_SIZE, 2)
+                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1));
+        final DynamicClusterImpl clusterImpl = 
DynamicClusterImpl.class.cast(Entities.deproxy(cluster));
+        assertNotNull(clusterImpl.getChildTaskSemaphore());
+        assertEquals(clusterImpl.getChildTaskSemaphore().availablePermits(), 
1);
+        try {
+            app.start(ImmutableList.<Location>of(app.newSimulatedLocation()));
+            Asserts.shouldHaveFailedPreviously("Cluster start should have 
failed because the member start was cancelled");
+        } catch (Exception e) {
+            // ignored.
+        }
+        assertEquals(clusterImpl.getChildTaskSemaphore().availablePermits(), 
1);
+    }
+
+    @ImplementedBy(ThrowOnAsyncStartEntityImpl.class)
+    public interface ThrowOnAsyncStartEntity extends TestEntity {
+        ConfigKey<Integer> MAX_CONCURRENCY = 
ConfigKeys.newConfigKey(Integer.class, "concurrency", "max concurrency", 1);
+        ConfigKey<AtomicInteger> COUNTER = 
ConfigKeys.newConfigKey(AtomicInteger.class, "counter");
+        ConfigKey<Boolean> START_LATCH = 
ConfigKeys.newConfigKey(Boolean.class, "startlatch");
+    }
+
+    public static class ThrowOnAsyncStartEntityImpl extends TestEntityImpl 
implements ThrowOnAsyncStartEntity {
+        private static final Logger LOG = 
LoggerFactory.getLogger(ThrowOnAsyncStartEntityImpl.class);
+        @Override
+        public void start(Collection<? extends Location> locs) {
+            int count = config().get(COUNTER).incrementAndGet();
+            try {
+                LOG.debug("{} starting (first={})", new Object[]{this, 
sensors().get(AbstractGroup.FIRST_MEMBER)});
+                config().get(START_LATCH);
+                // Throw if more than one entity is starting at the same time 
as this.
+                assertTrue(count <= config().get(MAX_CONCURRENCY), "expected " 
+ count + " <= " + config().get(MAX_CONCURRENCY));
+                super.start(locs);
+            } finally {
+                config().get(COUNTER).decrementAndGet();
+            }
+        }
+    }
+
+    /** Used in {@link 
#testChildCommandPermitNotReleasedWhenMemberStartTaskCancelledBeforeSubmission}.
 */
+    @ImplementedBy(CancelEffectorInvokeClusterImpl.class)
+    public interface CancelEffectorInvokeCluster extends DynamicCluster {}
+
+    /** Overrides {@link DynamicClusterImpl#newThrottledEffectorTask} to 
cancel each task before it's submitted. */
+    public static class CancelEffectorInvokeClusterImpl extends 
DynamicClusterImpl implements CancelEffectorInvokeCluster {
+        @Override
+        protected <T> Task<?> newThrottledEffectorTask(Entity target, 
Effector<T> effector, Map<?, ?> arguments, boolean isPrivileged) {
+            Task<?> unsubmitted = super.newThrottledEffectorTask(target, 
effector, arguments, isPrivileged);
+            unsubmitted.cancel(true);
+            return unsubmitted;
+        }
+    }
+
 }

Reply via email to