Repository: brooklyn-server Updated Branches: refs/heads/master d389e40d5 -> 7bdae1bae
BROOKLYN-533: adds maxConcurrentMachineDeletions Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/c8f470c0 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/c8f470c0 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/c8f470c0 Branch: refs/heads/master Commit: c8f470c097db6ae8892c39ce2aeb304789bf8ddd Parents: e85d264 Author: Aled Sage <aled.s...@gmail.com> Authored: Mon Sep 18 12:11:32 2017 +0100 Committer: Aled Sage <aled.s...@gmail.com> Committed: Mon Sep 18 13:23:35 2017 +0100 ---------------------------------------------------------------------- .../location/jclouds/JcloudsLocation.java | 73 +++++- .../api/JcloudsLocationConfigPublic.java | 6 + .../JcloudsMaxConcurrencyStubbedTest.java | 223 +++++++++++++++++++ 3 files changed, 292 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c8f470c0/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java ---------------------------------------------------------------------- diff --git a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java index a914847..8c01e24 100644 --- a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java +++ b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java @@ -23,7 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvis; import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth; import static org.apache.brooklyn.util.ssh.BashCommands.sbinPath; -import static org.jclouds.compute.predicates.NodePredicates.*; +import static org.jclouds.compute.predicates.NodePredicates.withIds; import static org.jclouds.util.Throwables2.getFirstThrowableOfType; import java.io.ByteArrayOutputStream; @@ -48,7 +48,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import javax.xml.ws.WebServiceException; -import com.google.common.primitives.Ints; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.location.LocationSpec; import org.apache.brooklyn.api.location.MachineLocation; @@ -146,7 +145,6 @@ import org.jclouds.compute.domain.OsFamily; import org.jclouds.compute.domain.Template; import org.jclouds.compute.domain.TemplateBuilder; import org.jclouds.compute.options.TemplateOptions; -import org.jclouds.compute.predicates.NodePredicates; import org.jclouds.domain.Credentials; import org.jclouds.domain.LocationScope; import org.jclouds.domain.LoginCredentials; @@ -183,6 +181,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.collect.Sets.SetView; import com.google.common.net.HostAndPort; +import com.google.common.primitives.Ints; /** * For provisioning and managing VMs in a particular provider/region, using jclouds. @@ -252,6 +251,14 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im } config().set(MACHINE_CREATION_SEMAPHORE, new Semaphore(maxConcurrent, true)); } + + if (getConfig(MACHINE_DELETION_SEMAPHORE) == null) { + Integer maxConcurrent = getConfig(MAX_CONCURRENT_MACHINE_DELETIONS); + if (maxConcurrent == null || maxConcurrent < 1) { + throw new IllegalStateException(MAX_CONCURRENT_MACHINE_DELETIONS.getName() + " must be >= 1, but was "+maxConcurrent); + } + config().set(MACHINE_DELETION_SEMAPHORE, new Semaphore(maxConcurrent, true)); + } return this; } @@ -275,6 +282,7 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im .parent(this) .configure(config().getLocalBag().getAllConfig()) // FIXME Should this just be inherited? .configure(MACHINE_CREATION_SEMAPHORE, getMachineCreationSemaphore()) + .configure(MACHINE_DELETION_SEMAPHORE, getMachineDeletionSemaphore()) .configure(newFlags)); } @@ -380,6 +388,10 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im return checkNotNull(getConfig(MACHINE_CREATION_SEMAPHORE), MACHINE_CREATION_SEMAPHORE.getName()); } + protected Semaphore getMachineDeletionSemaphore() { + return checkNotNull(getConfig(MACHINE_DELETION_SEMAPHORE), MACHINE_DELETION_SEMAPHORE.getName()); + } + protected CloudMachineNamer getCloudMachineNamer(ConfigBag config) { String namerClass = config.get(LocationConfigKeys.CLOUD_MACHINE_NAMER_CLASS); if (Strings.isNonBlank(namerClass)) { @@ -2105,6 +2117,11 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im @Override public void release(MachineLocation rawMachine) { + Duration preSemaphoreTimestamp = null; + Duration semaphoreTimestamp = null; + Duration destroyTimestamp = null; + Stopwatch destroyingStopwatch = Stopwatch.createStarted(); + String instanceId = vmInstanceIds.remove(rawMachine); if (instanceId == null) { LOG.info("Attempted release of unknown machine "+rawMachine+" in "+toString()); @@ -2142,14 +2159,34 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im } try { - releaseNode(instanceId); - } catch (Exception e) { - LOG.error("Problem releasing machine "+machine+" in "+this+", instance id "+instanceId+ - "; ignoring and continuing, " - + (tothrow==null ? "will throw subsequently" : "swallowing due to previous error")+": "+e, e); - if (tothrow==null) tothrow = e; - } + preSemaphoreTimestamp = Duration.of(destroyingStopwatch); + Semaphore machineDeletionSemaphore = getMachineDeletionSemaphore(); + boolean acquired = machineDeletionSemaphore.tryAcquire(0, TimeUnit.SECONDS); + if (!acquired) { + LOG.info("Waiting in {} for machine-deletion permit ({} other queuing requests already)", new Object[] {this, machineDeletionSemaphore.getQueueLength()}); + Stopwatch blockStopwatch = Stopwatch.createStarted(); + machineDeletionSemaphore.acquire(); + LOG.info("Acquired in {} machine-deletion permit, after waiting {}", this, Time.makeTimeStringRounded(blockStopwatch)); + } else { + LOG.debug("Acquired in {} machine-deletion permit immediately", this); + } + semaphoreTimestamp = Duration.of(destroyingStopwatch); + try { + releaseNode(instanceId); + destroyTimestamp = Duration.of(destroyingStopwatch); + } catch (Exception e) { + LOG.error("Problem releasing machine "+machine+" in "+this+", instance id "+instanceId+ + "; ignoring and continuing, " + + (tothrow==null ? "will throw subsequently" : "swallowing due to previous error")+": "+e, e); + if (tothrow==null) tothrow = e; + } finally { + machineDeletionSemaphore.release(); + } + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + removeChild(machine); try { @@ -2162,8 +2199,24 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im } if (tothrow != null) { + LOG.error("Problem releasing machine " + machine + " (propagating) " + + " after "+Duration.of(destroyingStopwatch).toStringRounded() + + (semaphoreTimestamp != null ? " (" + + "semaphore obtained in "+Duration.of(semaphoreTimestamp).subtract(preSemaphoreTimestamp).toStringRounded()+";" + + (destroyTimestamp != null ? " node destroyed in "+Duration.of(destroyTimestamp).subtract(semaphoreTimestamp).toStringRounded() : "") + + ")" + : "") + + ": "+tothrow.getMessage()); + throw Exceptions.propagate(tothrow); } + + String logMessage = "Released machine " + machine +":" + + " total time "+Duration.of(destroyingStopwatch).toStringRounded() + + " (" + + "semaphore obtained in "+Duration.of(semaphoreTimestamp).subtract(preSemaphoreTimestamp).toStringRounded()+";" + + " node destroyed in "+Duration.of(destroyTimestamp).subtract(semaphoreTimestamp).toStringRounded()+")"; + LOG.info(logMessage); } protected void releaseSafely(MachineLocation machine) { http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c8f470c0/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/api/JcloudsLocationConfigPublic.java ---------------------------------------------------------------------- diff --git a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/api/JcloudsLocationConfigPublic.java b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/api/JcloudsLocationConfigPublic.java index 55b0e64..8c9a3c4 100644 --- a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/api/JcloudsLocationConfigPublic.java +++ b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/api/JcloudsLocationConfigPublic.java @@ -228,9 +228,15 @@ public interface JcloudsLocationConfigPublic extends CloudLocationConfig { public static final ConfigKey<Integer> MAX_CONCURRENT_MACHINE_CREATIONS = ConfigKeys.newIntegerConfigKey( "maxConcurrentMachineCreations", "Maximum number of concurrent machine-creations", Integer.MAX_VALUE); + public static final ConfigKey<Integer> MAX_CONCURRENT_MACHINE_DELETIONS = ConfigKeys.newIntegerConfigKey( + "maxConcurrentMachineDeletions", "Maximum number of concurrent machine-deletions", Integer.MAX_VALUE); + public static final ConfigKey<Semaphore> MACHINE_CREATION_SEMAPHORE = ConfigKeys.newConfigKey( Semaphore.class, "machineCreationSemaphore", "Semaphore for controlling concurrent machine creation", null); + public static final ConfigKey<Semaphore> MACHINE_DELETION_SEMAPHORE = ConfigKeys.newConfigKey( + Semaphore.class, "machineDeletionSemaphore", "Semaphore for controlling concurrent machine deletion", null); + @SuppressWarnings("serial") public static final ConfigKey<Map<String,Object>> TEMPLATE_OPTIONS = ConfigKeys.newConfigKey( new TypeToken<Map<String, Object>>() {}, "templateOptions", "Additional jclouds template options"); http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c8f470c0/locations/jclouds/src/test/java/org/apache/brooklyn/location/jclouds/JcloudsMaxConcurrencyStubbedTest.java ---------------------------------------------------------------------- diff --git a/locations/jclouds/src/test/java/org/apache/brooklyn/location/jclouds/JcloudsMaxConcurrencyStubbedTest.java b/locations/jclouds/src/test/java/org/apache/brooklyn/location/jclouds/JcloudsMaxConcurrencyStubbedTest.java new file mode 100644 index 0000000..880048a --- /dev/null +++ b/locations/jclouds/src/test/java/org/apache/brooklyn/location/jclouds/JcloudsMaxConcurrencyStubbedTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.location.jclouds; + +import static org.testng.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.brooklyn.location.jclouds.StubbedComputeServiceRegistry.AbstractNodeCreator; +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.time.Duration; +import org.jclouds.compute.domain.NodeMetadata; +import org.jclouds.compute.domain.NodeMetadata.Status; +import org.jclouds.compute.domain.NodeMetadataBuilder; +import org.jclouds.compute.domain.Template; +import org.jclouds.domain.LoginCredentials; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +/** + * Simulates the creation of a VM that has multiple IPs. Checks that we choose the right address. + */ +public class JcloudsMaxConcurrencyStubbedTest extends AbstractJcloudsStubbedUnitTest { + + private static class ConcurrencyMonitor { + private final Object mutex = new Object(); + private final AtomicInteger concurrentCalls = new AtomicInteger(); + private final AtomicInteger maxConcurrentCalls = new AtomicInteger(); + private CountDownLatch latch; + + public void setLatch(CountDownLatch latch) { + this.latch = latch; + } + + public int getMaxConcurrentCalls() { + return maxConcurrentCalls.get(); + } + + public void onStart() { + synchronized (mutex) { + int concurrentCallCount = concurrentCalls.incrementAndGet(); + if (concurrentCallCount > maxConcurrentCalls.get()) { + maxConcurrentCalls.set(concurrentCallCount); + } + } + if (latch != null) { + try { + latch.await(); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + } + } + + public void onEnd() { + synchronized (mutex) { + concurrentCalls.decrementAndGet(); + } + } + } + + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(JcloudsMaxConcurrencyStubbedTest.class); + + private ListeningExecutorService executor; + private ConcurrencyMonitor creationConcurrencyMonitor; + private ConcurrencyMonitor deletionConcurrencyMonitor; + + @BeforeMethod(alwaysRun=true) + @Override + public void setUp() throws Exception { + super.setUp(); + executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); + creationConcurrencyMonitor = new ConcurrencyMonitor(); + deletionConcurrencyMonitor = new ConcurrencyMonitor(); + } + + @Override + public void tearDown() throws Exception { + try { + super.tearDown(); + } finally { + if (executor != null) executor.shutdownNow(); + } + } + + protected AbstractNodeCreator newNodeCreator() { + return new AbstractNodeCreator() { + @Override protected NodeMetadata newNode(String group, Template template) { + try { + creationConcurrencyMonitor.onStart(); + + NodeMetadata result = new NodeMetadataBuilder() + .id("myid") + .credentials(LoginCredentials.builder().identity("myuser").credential("mypassword").build()) + .loginPort(22) + .status(Status.RUNNING) + .publicAddresses(ImmutableList.of("173.194.32.123")) + .privateAddresses(ImmutableList.of("172.168.10.11")) + .build(); + return result; + } finally { + creationConcurrencyMonitor.onEnd(); + } + } + @Override public void destroyNode(String id) { + try { + deletionConcurrencyMonitor.onStart(); + + super.destroyNode(id); + } finally { + deletionConcurrencyMonitor.onEnd(); + } + } + @Override public Set<? extends NodeMetadata> destroyNodesMatching(Predicate<? super NodeMetadata> filter) { + try { + deletionConcurrencyMonitor.onStart(); + + return super.destroyNodesMatching(filter); + } finally { + deletionConcurrencyMonitor.onEnd(); + } + } + + }; + } + + @Test + public void testConcurrentCreateCalls() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + creationConcurrencyMonitor.setLatch(latch); + + initNodeCreatorAndJcloudsLocation(newNodeCreator(), ImmutableMap.of(JcloudsLocation.MAX_CONCURRENT_MACHINE_CREATIONS, 2)); + + List<ListenableFuture<?>> futures = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + futures.add(executor.submit(new Callable<JcloudsSshMachineLocation>() { + public JcloudsSshMachineLocation call() throws Exception { + return obtainMachine(); + }})); + } + + assertMaxConcurrentCallsEventually(creationConcurrencyMonitor, 2); + assertMaxConcurrentCallsContinually(creationConcurrencyMonitor, 2); + latch.countDown(); + Futures.allAsList(futures).get(); + } + + @Test + public void testConcurrentDeletionCalls() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + deletionConcurrencyMonitor.setLatch(latch); + + initNodeCreatorAndJcloudsLocation(newNodeCreator(), ImmutableMap.of(JcloudsLocation.MAX_CONCURRENT_MACHINE_DELETIONS, 2)); + + List<JcloudsSshMachineLocation> machines = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + machines.add(obtainMachine()); + } + + List<ListenableFuture<?>> futures = new ArrayList<>(); + for (final JcloudsSshMachineLocation machine : machines) { + futures.add(executor.submit(new Callable<Void>() { + public Void call() throws Exception { + jcloudsLocation.release(machine); + return null; + }})); + } + + assertMaxConcurrentCallsEventually(deletionConcurrencyMonitor, 2); + assertMaxConcurrentCallsContinually(deletionConcurrencyMonitor, 2); + latch.countDown(); + Futures.allAsList(futures).get(); + } + + void assertMaxConcurrentCallsEventually(ConcurrencyMonitor monitor, int expected) { + Asserts.succeedsEventually(new Runnable() { + public void run() { + assertEquals(monitor.getMaxConcurrentCalls(), expected); + }}); + } + + void assertMaxConcurrentCallsContinually(ConcurrencyMonitor monitor, int expected) { + Asserts.succeedsContinually(MutableMap.of("timeout", Duration.millis(100)), new Runnable() { + public void run() { + assertEquals(monitor.getMaxConcurrentCalls(), expected); + }}); + } +}