BROOKLYN-214: fix cancelling of AttributeWhenReady task Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/d5c07225 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/d5c07225 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/d5c07225
Branch: refs/heads/master Commit: d5c072257250e1b54e08efe2ac31b4b1ff03a6e0 Parents: d058158 Author: Aled Sage <aled.s...@gmail.com> Authored: Tue Jan 12 13:24:43 2016 +0000 Committer: Aled Sage <aled.s...@gmail.com> Committed: Tue Jan 12 13:24:43 2016 +0000 ---------------------------------------------------------------------- .../spi/dsl/BrooklynDslDeferredSupplier.java | 40 ++++++- .../DependentConfigPollingYamlTest.java | 117 +++++++++++++++++++ 2 files changed, 155 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5c07225/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java ---------------------------------------------------------------------- diff --git a/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java b/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java index 65bf561..a417e32 100644 --- a/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java +++ b/brooklyn-server/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java @@ -18,7 +18,10 @@ */ package org.apache.brooklyn.camp.brooklyn.spi.dsl; +import java.io.IOException; +import java.io.ObjectInputStream; import java.io.Serializable; +import java.util.concurrent.locks.ReentrantLock; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.mgmt.ExecutionContext; @@ -55,6 +58,11 @@ import com.fasterxml.jackson.annotation.JsonProperty; * and should not accessed until after the components / entities are created * and are being started. * (TODO the precise semantics of this are under development.) + * + * The threading model is that only one thread can call {@link #get()} at a time. An interruptible + * lock is obtained using {@link #lock} for the duration of that method. It is important to not + * use {@code synchronized} because that is not interruptible - if someone tries to get the value + * and interrupts after a short wait, then we must release the lock immediately and return. * <p> **/ public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier<T>, TaskFactory<Task<T>>, Serializable { @@ -63,6 +71,15 @@ public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier private static final Logger log = LoggerFactory.getLogger(BrooklynDslDeferredSupplier.class); + /** + * Lock to be used, rather than {@code synchronized} blocks, for anything long-running. + * Use {@link #getLock()} rather than this field directly, to ensure it is reinitialised + * after rebinding. + * + * @see https://issues.apache.org/jira/browse/BROOKLYN-214 + */ + private transient ReentrantLock lock; + // TODO json of this object should *be* this, not wrapped this ($brooklyn:literal is a bit of a hack, though it might work!) @JsonInclude @JsonProperty(value="$brooklyn:literal") @@ -72,8 +89,9 @@ public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier public BrooklynDslDeferredSupplier() { PlanInterpretationNode sourceNode = BrooklynDslInterpreter.currentNode(); dsl = sourceNode!=null ? sourceNode.getOriginalValue() : null; + lock = new ReentrantLock(); } - + /** returns the current entity; for use in implementations of {@link #get()} */ protected final static EntityInternal entity() { return (EntityInternal) BrooklynTaskTags.getTargetOrContextEntity(Tasks.current()); @@ -88,7 +106,13 @@ public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier } @Override - public final synchronized T get() { + public final T get() { + try { + getLock().lockInterruptibly(); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + try { if (log.isDebugEnabled()) log.debug("Queuing task to resolve "+dsl); @@ -110,7 +134,19 @@ public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier } catch (Exception e) { throw Exceptions.propagate(e); + } finally { + getLock().unlock(); + } + } + + // Use this method, rather than the direct field, to ensure it is initialised after rebinding. + protected ReentrantLock getLock() { + synchronized (this) { + if (lock == null) { + lock = new ReentrantLock(); + } } + return lock; } @Override http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5c07225/brooklyn-server/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/DependentConfigPollingYamlTest.java ---------------------------------------------------------------------- diff --git a/brooklyn-server/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/DependentConfigPollingYamlTest.java b/brooklyn-server/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/DependentConfigPollingYamlTest.java new file mode 100644 index 0000000..10df5f0 --- /dev/null +++ b/brooklyn-server/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/DependentConfigPollingYamlTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.camp.brooklyn; + +import static org.testng.Assert.assertTrue; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.core.test.entity.TestEntity; +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; + +@Test +public class DependentConfigPollingYamlTest extends AbstractYamlTest { + private static final Logger log = LoggerFactory.getLogger(DependentConfigPollingYamlTest.class); + + private ExecutorService executor; + + @BeforeMethod(alwaysRun = true) + @Override + public void setUp() { + super.setUp(); + executor = Executors.newCachedThreadPool(); + } + + @AfterMethod(alwaysRun = true) + @Override + public void tearDown() { + if (executor != null) executor.shutdownNow(); + super.tearDown(); + } + + // Test for BROOKLYN-214. Previously, the brief Tasks.resolving would cause a thread to be + // leaked. This was because it would call into BrooklynDslDeferredSupplier.get, which would + // wait on a synchronized block and thus not be interruptible - the thread would be consumed + // forever, until the attributeWhenReady returned true! + // + // Integration test, because takes several seconds. + @Test(groups="Integration") + public void testResolveAttributeWhenReadyWithTimeoutDoesNotLeaveThreadRunning() throws Exception { + String yaml = Joiner.on("\n").join( + "services:", + "- type: org.apache.brooklyn.core.test.entity.TestEntity", + " id: myentity", + " brooklyn.config:", + " test.confName: $brooklyn:entity(\"myentity\").attributeWhenReady(\"mysensor\")"); + + final Entity app = createAndStartApplication(yaml); + final TestEntity entity = (TestEntity) Iterables.getOnlyElement(app.getChildren()); + + // Cause a thread to block, getting the config - previousy (before fixing 214) this would be in + // the synchronized block if BrooklynDslDeferredSupplier.get(). + // The sleep is to ensure we really did get into the locking code. + executor.submit(new Callable<Object>() { + public Object call() { + return entity.config().get(TestEntity.CONF_NAME); + }}); + Thread.sleep(100); + + // Try to resolve the value many times, each in its own task, but with a short timeout for each. + final int numIterations = 20; + final int preNumThreads = Thread.activeCount(); + + for (int i = 0; i < numIterations; i++) { + // Same as RestValueResolver.getImmediateValue + Tasks.resolving(entity.config().getRaw(TestEntity.CONF_NAME).get()) + .as(Object.class) + .defaultValue("UNRESOLVED") + .timeout(Duration.millis(100)) + .context(entity) + .swallowExceptions() + .get(); + } + + // Confirm we haven't left threads behind. + Asserts.succeedsEventually(new Runnable() { + public void run() { + int postNumThreads = Thread.activeCount(); + String msg = "pre="+preNumThreads+"; post="+postNumThreads+"; iterations="+numIterations; + log.info(msg); + assertTrue(postNumThreads < preNumThreads + (numIterations / 2), msg); + }}); + } + + @Override + protected Logger getLogger() { + return log; + } +}