http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicyTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicyTest.java new file mode 100644 index 0000000..27441be --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicyTest.java @@ -0,0 +1,649 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.autoscaling; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.lang.management.OperatingSystemMXBean; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.api.policy.PolicySpec; +import org.apache.brooklyn.test.entity.TestApplication; +import org.apache.brooklyn.test.entity.TestCluster; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.Entities; +import brooklyn.entity.trait.Resizable; +import brooklyn.event.basic.BasicNotificationSensor; +import brooklyn.test.Asserts; +import brooklyn.util.collections.MutableList; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.time.Duration; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class AutoScalerPolicyTest { + + private static final Logger log = LoggerFactory.getLogger(AutoScalerPolicyTest.class); + + private static long TIMEOUT_MS = 10*1000; + private static long SHORT_WAIT_MS = 250; + private static long OVERHEAD_DURATION_MS = 500; + private static long EARLY_RETURN_MS = 10; + + private static final int MANY_TIMES_INVOCATION_COUNT = 10; + + AutoScalerPolicy policy; + TestCluster cluster; + LocallyResizableEntity resizable; + TestApplication app; + List<Integer> policyResizes = MutableList.of(); + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + log.info("resetting "+getClass().getSimpleName()); + app = TestApplication.Factory.newManagedInstanceForTests(); + cluster = app.createAndManageChild(EntitySpec.create(TestCluster.class).configure(TestCluster.INITIAL_SIZE, 1)); + resizable = new LocallyResizableEntity(cluster, cluster); + Entities.manage(resizable); + PolicySpec<AutoScalerPolicy> policySpec = PolicySpec.create(AutoScalerPolicy.class).configure(AutoScalerPolicy.RESIZE_OPERATOR, new ResizeOperator() { + @Override + public Integer resize(Entity entity, Integer desiredSize) { + log.info("resizing to "+desiredSize); + policyResizes.add(desiredSize); + return ((Resizable)entity).resize(desiredSize); + } + }); + policy = resizable.addPolicy(policySpec); + policyResizes.clear(); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (policy != null) policy.destroy(); + if (app != null) Entities.destroyAll(app.getManagementContext()); + cluster = null; + resizable = null; + policy = null; + } + + public void assertSizeEventually(Integer targetSize) { + Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, targetSize)); + assertEquals(policyResizes.get(policyResizes.size()-1), targetSize); + } + + @Test + public void testShrinkColdPool() throws Exception { + resizable.resize(4); + // all metrics as per-node here + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(30d/4, 10, 20)); + + // expect pool to shrink to 3 (i.e. maximum to have >= 10 per container) + assertSizeEventually(3); + } + + @Test + public void testShrinkColdPoolTotals() throws Exception { + resizable.resize(4); + // all metrics as totals here + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(30L, 4*10L, 4*20L)); + + // expect pool to shrink to 3 (i.e. maximum to have >= 10 per container) + assertSizeEventually(3); + } + + @Test + public void testShrinkColdPoolRoundsUpDesiredNumberOfContainers() throws Exception { + resizable.resize(4); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(1L, 4*10L, 4*20L)); + + Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, 1)); + } + + @Test + public void testGrowHotPool() throws Exception { + resizable.resize(2); + // all metrics as per-node here + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(21L, 10L, 20L)); + + // expect pool to grow to 3 (i.e. minimum to have <= 20 per container) + assertSizeEventually(3); + } + + @Test + public void testGrowHotPoolTotals() throws Exception { + resizable.resize(2); + // all metrics as totals here + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(41L, 2*10L, 2*20L)); + + // expect pool to grow to 3 (i.e. minimum to have <= 20 per container) + assertSizeEventually(3); + } + + @Test + public void testGrowShrinkRespectsResizeIterationIncrementAndResizeIterationMax() throws Exception { + resizable.resize(2); + policy.config().set(AutoScalerPolicy.RESIZE_UP_ITERATION_INCREMENT, 2); + policy.config().set(AutoScalerPolicy.RESIZE_UP_ITERATION_MAX, 4); + policy.config().set(AutoScalerPolicy.RESIZE_DOWN_ITERATION_INCREMENT, 3); + policy.config().set(AutoScalerPolicy.RESIZE_DOWN_ITERATION_MAX, 3); + + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(42/2, 10, 20)); + // expect pool to grow to 4 (i.e. to have <= 20 per container we need 3, but increment is 2) + assertSizeEventually(4); + + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(200/4, 10, 20)); + // a single hot message can only make it go to 8 + assertSizeEventually(8); + assertEquals(policyResizes, MutableList.of(4, 8)); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(200/8, 10, 20)); + assertSizeEventually(10); + assertEquals(policyResizes, MutableList.of(4, 8, 10)); + + // now shrink + policyResizes.clear(); + policy.config().set(AutoScalerPolicy.MIN_POOL_SIZE, 2); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(1, 10, 20)); + assertSizeEventually(7); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(1, 10, 20)); + assertSizeEventually(4); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(1, 10, 20)); + assertSizeEventually(2); + assertEquals(policyResizes, MutableList.of(7, 4, 2)); + } + + @Test + public void testHasId() throws Exception { + resizable.removePolicy(policy); + policy = AutoScalerPolicy.builder() + .minPoolSize(2) + .build(); + resizable.addPolicy(policy); + Assert.assertTrue(policy.getId()!=null); + } + + @Test + public void testNeverShrinkBelowMinimum() throws Exception { + resizable.removePolicy(policy); + policy = AutoScalerPolicy.builder() + .minPoolSize(2) + .build(); + resizable.addPolicy(policy); + + resizable.resize(4); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(4, 0L, 4*10L, 4*20L)); + + // expect pool to shrink only to the minimum + Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, 2)); + } + + @Test + public void testNeverGrowAboveMaximmum() throws Exception { + resizable.removePolicy(policy); + policy = AutoScalerPolicy.builder() + .maxPoolSize(5) + .build(); + resizable.addPolicy(policy); + + resizable.resize(4); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(4, 1000000L, 4*10L, 4*20L)); + + // expect pool to grow only to the maximum + Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, 5)); + } + + @Test + public void testNeverGrowColdPool() throws Exception { + resizable.resize(2); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(2, 1000L, 2*10L, 2*20L)); + + Thread.sleep(SHORT_WAIT_MS); + assertEquals(resizable.getCurrentSize(), (Integer)2); + } + + @Test + public void testNeverShrinkHotPool() throws Exception { + resizable.resizeSleepTime = 0; + resizable.resize(2); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(2, 0L, 2*10L, 2*20L)); + + // if had been a POOL_COLD, would have shrunk to 3 + Thread.sleep(SHORT_WAIT_MS); + assertEquals(resizable.getCurrentSize(), (Integer)2); + } + + @Test(groups="Integration") + public void testConcurrentShrinkShrink() throws Exception { + resizable.resizeSleepTime = 250; + resizable.resize(4); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(4, 30L, 4*10L, 4*20L)); + // would cause pool to shrink to 3 + + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(4, 1L, 4*10L, 4*20L)); + // now expect pool to shrink to 1 + + Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, 1)); + } + + @Test(groups="Integration") + public void testConcurrentGrowGrow() throws Exception { + resizable.resizeSleepTime = 250; + resizable.resize(2); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(2, 41L, 2*10L, 2*20L)); + // would cause pool to grow to 3 + + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(2, 81L, 2*10L, 2*20L)); + // now expect pool to grow to 5 + + Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, 5)); + } + + @Test(groups="Integration") + public void testConcurrentGrowShrink() throws Exception { + resizable.resizeSleepTime = 250; + resizable.resize(2); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(2, 81L, 2*10L, 2*20L)); + // would cause pool to grow to 5 + + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(2, 1L, 2*10L, 2*20L)); + // now expect pool to shrink to 1 + + Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, 1)); + } + + @Test(groups="Integration") + public void testConcurrentShrinkGrow() throws Exception { + resizable.resizeSleepTime = 250; + resizable.resize(4); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(4, 1L, 4*10L, 4*20L)); + // would cause pool to shrink to 1 + + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(4, 81L, 4*10L, 4*20L)); + // now expect pool to grow to 5 + + Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, 5)); + } + + // FIXME failed in jenkins (e.g. #1035); with "lists don't have the same size expected:<3> but was:<2>" + // Is it just too time sensitive? But I'd have expected > 3 rather than less + @Test(groups="WIP") + public void testRepeatedQueuedResizeTakesLatestValueRatherThanIntermediateValues() throws Exception { + // TODO is this too time sensitive? the resize takes only 250ms so if it finishes before the next emit we'd also see size=2 + resizable.resizeSleepTime = 500; + resizable.resize(4); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(4, 30L, 4*10L, 4*20L)); // shrink to 3 + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(4, 20L, 4*10L, 4*20L)); // shrink to 2 + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(4, 10L, 4*10L, 4*20L)); // shrink to 1 + + Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, 1)); + assertEquals(resizable.sizes, ImmutableList.of(4, 3, 1)); + } + + + @Test + public void testUsesResizeOperatorOverride() throws Exception { + resizable.removePolicy(policy); + + final AtomicInteger counter = new AtomicInteger(); + policy = AutoScalerPolicy.builder() + .resizeOperator(new ResizeOperator() { + @Override public Integer resize(Entity entity, Integer desiredSize) { + counter.incrementAndGet(); + return desiredSize; + }}) + .build(); + resizable.addPolicy(policy); + + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(1, 21L, 1*10L, 1*20L)); // grow to 2 + + Asserts.succeedsEventually(MutableMap.of("timeout",TIMEOUT_MS), new Runnable() { + public void run() { + assertTrue(counter.get() >= 1, "cccounter="+counter); + }}); + } + + @Test + public void testUsesCustomSensorOverride() throws Exception { + resizable.removePolicy(policy); + + @SuppressWarnings("rawtypes") + BasicNotificationSensor<Map> customPoolHotSensor = new BasicNotificationSensor<Map>(Map.class, "custom.hot", ""); + @SuppressWarnings("rawtypes") + BasicNotificationSensor<Map> customPoolColdSensor = new BasicNotificationSensor<Map>(Map.class, "custom.cold", ""); + @SuppressWarnings("rawtypes") + BasicNotificationSensor<Map> customPoolOkSensor = new BasicNotificationSensor<Map>(Map.class, "custom.ok", ""); + policy = AutoScalerPolicy.builder() + .poolHotSensor(customPoolHotSensor) + .poolColdSensor(customPoolColdSensor) + .poolOkSensor(customPoolOkSensor) + .build(); + resizable.addPolicy(policy); + + resizable.emit(customPoolHotSensor, message(1, 21L, 1*10L, 1*20L)); // grow to 2 + Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, 2)); + + resizable.emit(customPoolColdSensor, message(2, 1L, 1*10L, 1*20L)); // shrink to 1 + Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, 1)); + } + + @Test(groups="Integration") + public void testResizeUpStabilizationDelayIgnoresBlip() throws Exception { + long resizeUpStabilizationDelay = 1000L; + Duration minPeriodBetweenExecs = Duration.ZERO; + resizable.removePolicy(policy); + + policy = AutoScalerPolicy.builder() + .resizeUpStabilizationDelay(Duration.of(resizeUpStabilizationDelay, TimeUnit.MILLISECONDS)) + .minPeriodBetweenExecs(minPeriodBetweenExecs) + .build(); + resizable.addPolicy(policy); + resizable.resize(1); + + // Ignores temporary blip + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(1, 61L, 1*10L, 1*20L)); // would grow to 4 + Thread.sleep(resizeUpStabilizationDelay-OVERHEAD_DURATION_MS); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_OK_SENSOR, message(1, 11L, 4*10L, 4*20L)); // but 1 is still adequate + + assertEquals(resizable.getCurrentSize(), (Integer)1); + Asserts.succeedsContinually(MutableMap.of("duration", 2000L), new Runnable() { + @Override public void run() { + assertEquals(resizable.sizes, ImmutableList.of(1)); + }}); + } + + // FIXME failing in jenkins occassionally - have put it in the "Acceptance" group for now + // + // Error was things like it taking a couple of seconds too long to scale-up. This is *not* + // just caused by a slow GC (running with -verbose:gc shows during a failure several + // incremental GCs that usually don't amount to more than 0.2 of a second at most, often less). + // Doing a thread-dump etc immediately after the too-long delay shows no strange thread usage, + // and shows releng3 system load averages of numbers like 1.73, 2.87 and 1.22. + // + // Is healthy on normal machines. + @Test(groups={"Integration", "Acceptance"}, invocationCount=MANY_TIMES_INVOCATION_COUNT) + public void testRepeatedResizeUpStabilizationDelayTakesMaxSustainedDesired() throws Throwable { + try { + testResizeUpStabilizationDelayTakesMaxSustainedDesired(); + } catch (Throwable t) { + dumpThreadsEtc(); + throw t; + } + } + + @Test(groups="Integration") + public void testResizeUpStabilizationDelayTakesMaxSustainedDesired() throws Exception { + long resizeUpStabilizationDelay = 1100L; + Duration minPeriodBetweenExecs = Duration.ZERO; + resizable.removePolicy(policy); + + policy = AutoScalerPolicy.builder() + .resizeUpStabilizationDelay(Duration.of(resizeUpStabilizationDelay, TimeUnit.MILLISECONDS)) + .minPeriodBetweenExecs(minPeriodBetweenExecs) + .build(); + resizable.addPolicy(policy); + resizable.resize(1); + + // Will grow to only the max sustained in this time window + // (i.e. to 2 within the first $resizeUpStabilizationDelay milliseconds) + Stopwatch stopwatch = Stopwatch.createStarted(); + + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(1, 61L, 1*10L, 1*20L)); // would grow to 4 + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(1, 21L, 1*10L, 1*20L)); // would grow to 2 + Thread.sleep(resizeUpStabilizationDelay-OVERHEAD_DURATION_MS); + + long postSleepTime = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(1, 61L, 1*10L, 1*20L)); // would grow to 4 + + // Wait for it to reach size 2, and confirm take expected time + // TODO This is time sensitive, and sometimes fails in CI with size=4 if we wait for currentSize==2 (presumably GC kicking in?) + // Therefore do strong assertion of currentSize==2 later, so can write out times if it goes wrong. + Asserts.succeedsEventually(MutableMap.of("period", 1, "timeout", TIMEOUT_MS), new Runnable() { + public void run() { + assertTrue(resizable.getCurrentSize() >= 2, "currentSize="+resizable.getCurrentSize()); + }}); + assertEquals(resizable.getCurrentSize(), (Integer)2, + stopwatch.elapsed(TimeUnit.MILLISECONDS)+"ms after first emission; "+(stopwatch.elapsed(TimeUnit.MILLISECONDS)-postSleepTime)+"ms after last"); + + long timeToResizeTo2 = stopwatch.elapsed(TimeUnit.MILLISECONDS); + assertTrue(timeToResizeTo2 >= resizeUpStabilizationDelay-EARLY_RETURN_MS && + timeToResizeTo2 <= resizeUpStabilizationDelay+OVERHEAD_DURATION_MS, + "Resizing to 2: time="+timeToResizeTo2+"; resizeUpStabilizationDelay="+resizeUpStabilizationDelay); + + // Will then grow to 4 $resizeUpStabilizationDelay milliseconds after that emission + Asserts.succeedsEventually(MutableMap.of("period", 1, "timeout", TIMEOUT_MS), + currentSizeAsserter(resizable, 4)); + long timeToResizeTo4 = stopwatch.elapsed(TimeUnit.MILLISECONDS) - postSleepTime; + + assertTrue(timeToResizeTo4 >= resizeUpStabilizationDelay-EARLY_RETURN_MS && + timeToResizeTo4 <= resizeUpStabilizationDelay+OVERHEAD_DURATION_MS, + "Resizing to 4: timeToResizeTo4="+timeToResizeTo4+"; timeToResizeTo2="+timeToResizeTo2+"; resizeUpStabilizationDelay="+resizeUpStabilizationDelay); + } + + @Test(groups="Integration") + public void testResizeUpStabilizationDelayResizesAfterDelay() { + final long resizeUpStabilizationDelay = 1000L; + Duration minPeriodBetweenExecs = Duration.ZERO; + resizable.removePolicy(policy); + + policy = resizable.addPolicy(AutoScalerPolicy.builder() + .resizeUpStabilizationDelay(Duration.of(resizeUpStabilizationDelay, TimeUnit.MILLISECONDS)) + .minPeriodBetweenExecs(minPeriodBetweenExecs) + .buildSpec()); + resizable.resize(1); + + // After suitable delay, grows to desired + final long emitTime = System.currentTimeMillis(); + final Map<String, Object> need4 = message(1, 61L, 1*10L, 1*20L); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, need4); // would grow to 4 + final AtomicInteger emitCount = new AtomicInteger(0); + + Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() { + public void run() { + if (System.currentTimeMillis() - emitTime > (2+emitCount.get())*resizeUpStabilizationDelay) { + //first one may not have been received, in a registration race + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, need4); + emitCount.incrementAndGet(); + } + assertEquals(resizable.getCurrentSize(), (Integer)4); + }}); + + long resizeDelay = System.currentTimeMillis() - emitTime; + assertTrue(resizeDelay >= (resizeUpStabilizationDelay-EARLY_RETURN_MS), "resizeDelay="+resizeDelay); + } + + @Test(groups="Integration") + public void testResizeDownStabilizationDelayIgnoresBlip() throws Exception { + long resizeStabilizationDelay = 1000L; + Duration minPeriodBetweenExecs = Duration.ZERO; + resizable.removePolicy(policy); + + policy = AutoScalerPolicy.builder() + .resizeDownStabilizationDelay(Duration.of(resizeStabilizationDelay, TimeUnit.MILLISECONDS)) + .minPeriodBetweenExecs(minPeriodBetweenExecs) + .build(); + resizable.addPolicy(policy); + resizable.resize(2); + + // Ignores temporary blip + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(2, 1L, 2*10L, 2*20L)); // would shrink to 1 + Thread.sleep(resizeStabilizationDelay-OVERHEAD_DURATION_MS); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_OK_SENSOR, message(2, 20L, 1*10L, 1*20L)); // but 2 is still adequate + + assertEquals(resizable.getCurrentSize(), (Integer)2); + Asserts.succeedsContinually(MutableMap.of("duration", 2000L), new Runnable() { + public void run() { + assertEquals(resizable.sizes, ImmutableList.of(2)); + }}); + } + + // FIXME Acceptance -- see comment against testRepeatedResizeUpStabilizationDelayTakesMaxSustainedDesired + @Test(groups={"Integration", "Acceptance"}, invocationCount=MANY_TIMES_INVOCATION_COUNT) + public void testRepeatedResizeDownStabilizationDelayTakesMinSustainedDesired() throws Throwable { + try { + testResizeDownStabilizationDelayTakesMinSustainedDesired(); + } catch (Throwable t) { + dumpThreadsEtc(); + throw t; + } + } + + @Test(groups="Integration") + public void testResizeDownStabilizationDelayTakesMinSustainedDesired() throws Exception { + long resizeDownStabilizationDelay = 1100L; + Duration minPeriodBetweenExecs = Duration.ZERO; + policy.suspend(); + resizable.removePolicy(policy); + + policy = AutoScalerPolicy.builder() + .resizeDownStabilizationDelay(Duration.of(resizeDownStabilizationDelay, TimeUnit.MILLISECONDS)) + .minPeriodBetweenExecs(minPeriodBetweenExecs) + .build(); + resizable.addPolicy(policy); + resizable.resize(3); + + // Will shrink to only the min sustained in this time window + // (i.e. to 2 within the first $resizeUpStabilizationDelay milliseconds) + Stopwatch stopwatch = Stopwatch.createStarted(); + + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(3, 1L, 3*10L, 3*20L)); // would shrink to 1 + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(3, 20L, 3*10L, 3*20L)); // would shrink to 2 + Thread.sleep(resizeDownStabilizationDelay-OVERHEAD_DURATION_MS); + + long postSleepTime = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(3, 1L, 3*10L, 3*20L)); // would shrink to 1 + + // Wait for it to reach size 2, and confirm take expected time + // TODO This is time sensitive, and sometimes fails in CI with size=1 if we wait for currentSize==2 (presumably GC kicking in?) + // Therefore do strong assertion of currentSize==2 later, so can write out times if it goes wrong. + Asserts.succeedsEventually(MutableMap.of("period", 1, "timeout", TIMEOUT_MS), new Runnable() { + public void run() { + assertTrue(resizable.getCurrentSize() <= 2, "currentSize="+resizable.getCurrentSize()); + }}); + assertEquals(resizable.getCurrentSize(), (Integer)2, + stopwatch.elapsed(TimeUnit.MILLISECONDS)+"ms after first emission; "+(stopwatch.elapsed(TimeUnit.MILLISECONDS)-postSleepTime)+"ms after last"); + + long timeToResizeTo2 = stopwatch.elapsed(TimeUnit.MILLISECONDS); + assertTrue(timeToResizeTo2 >= resizeDownStabilizationDelay-EARLY_RETURN_MS && + timeToResizeTo2 <= resizeDownStabilizationDelay+OVERHEAD_DURATION_MS, + "Resizing to 2: time="+timeToResizeTo2+"; resizeDownStabilizationDelay="+resizeDownStabilizationDelay); + + // Will then shrink to 1 $resizeUpStabilizationDelay milliseconds after that emission + Asserts.succeedsEventually(MutableMap.of("period", 1, "timeout", TIMEOUT_MS), + currentSizeAsserter(resizable, 1)); + long timeToResizeTo1 = stopwatch.elapsed(TimeUnit.MILLISECONDS) - postSleepTime; + + assertTrue(timeToResizeTo1 >= resizeDownStabilizationDelay-EARLY_RETURN_MS && + timeToResizeTo1 <= resizeDownStabilizationDelay+OVERHEAD_DURATION_MS, + "Resizing to 1: timeToResizeTo1="+timeToResizeTo1+"; timeToResizeTo2="+timeToResizeTo2+"; resizeDownStabilizationDelay="+resizeDownStabilizationDelay); + } + + @Test(groups="Integration") + public void testResizeDownStabilizationDelayResizesAfterDelay() throws Exception { + final long resizeDownStabilizationDelay = 1000L; + Duration minPeriodBetweenExecs = Duration.ZERO; + resizable.removePolicy(policy); + + policy = AutoScalerPolicy.builder() + .resizeDownStabilizationDelay(Duration.of(resizeDownStabilizationDelay, TimeUnit.MILLISECONDS)) + .minPeriodBetweenExecs(minPeriodBetweenExecs) + .build(); + resizable.addPolicy(policy); + resizable.resize(2); + + // After suitable delay, grows to desired + final long emitTime = System.currentTimeMillis(); + final Map<String, Object> needJust1 = message(2, 1L, 2*10L, 2*20L); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, needJust1); // would shrink to 1 + final AtomicInteger emitCount = new AtomicInteger(0); + + Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() { + public void run() { + if (System.currentTimeMillis() - emitTime > (2+emitCount.get())*resizeDownStabilizationDelay) { + //first one may not have been received, in a registration race + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, needJust1); // would shrink to 1 + emitCount.incrementAndGet(); + } + assertEquals(resizable.getCurrentSize(), (Integer)1); + }}); + + long resizeDelay = System.currentTimeMillis() - emitTime; + assertTrue(resizeDelay >= (resizeDownStabilizationDelay-EARLY_RETURN_MS), "resizeDelay="+resizeDelay); + } + + Map<String, Object> message(double currentWorkrate, double lowThreshold, double highThreshold) { + return message(resizable.getCurrentSize(), currentWorkrate, lowThreshold, highThreshold); + } + static Map<String, Object> message(int currentSize, double currentWorkrate, double lowThreshold, double highThreshold) { + return ImmutableMap.<String,Object>of( + AutoScalerPolicy.POOL_CURRENT_SIZE_KEY, currentSize, + AutoScalerPolicy.POOL_CURRENT_WORKRATE_KEY, currentWorkrate, + AutoScalerPolicy.POOL_LOW_THRESHOLD_KEY, lowThreshold, + AutoScalerPolicy.POOL_HIGH_THRESHOLD_KEY, highThreshold); + } + + public static Runnable currentSizeAsserter(final Resizable resizable, final Integer desired) { + return new Runnable() { + public void run() { + assertEquals(resizable.getCurrentSize(), desired); + } + }; + } + + public static void dumpThreadsEtc() { + ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + ThreadInfo[] threads = threadMXBean.dumpAllThreads(true, true); + for (ThreadInfo thread : threads) { + System.out.println(thread.getThreadName()+" ("+thread.getThreadState()+")"); + for (StackTraceElement stackTraceElement : thread.getStackTrace()) { + System.out.println("\t"+stackTraceElement); + } + } + + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage heapMemoryUsage = memoryMXBean.getHeapMemoryUsage(); + MemoryUsage nonHeapMemoryUsage = memoryMXBean.getNonHeapMemoryUsage(); + System.out.println("Memory:"); + System.out.println("\tHeap: used="+heapMemoryUsage.getUsed()+"; max="+heapMemoryUsage.getMax()+"; init="+heapMemoryUsage.getInit()+"; committed="+heapMemoryUsage.getCommitted()); + System.out.println("\tNon-heap: used="+nonHeapMemoryUsage.getUsed()+"; max="+nonHeapMemoryUsage.getMax()+"; init="+nonHeapMemoryUsage.getInit()+"; committed="+nonHeapMemoryUsage.getCommitted()); + + OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean(); + System.out.println("OS:"); + System.out.println("\tsysLoadAvg="+operatingSystemMXBean.getSystemLoadAverage()+"; availableProcessors="+operatingSystemMXBean.getAvailableProcessors()+"; arch="+operatingSystemMXBean.getArch()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/LocallyResizableEntity.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/LocallyResizableEntity.java b/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/LocallyResizableEntity.java new file mode 100644 index 0000000..0b3ef9c --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/LocallyResizableEntity.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.autoscaling; + +import java.util.List; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.test.entity.TestCluster; + +import brooklyn.entity.basic.AbstractEntity; +import brooklyn.entity.trait.Resizable; +import brooklyn.entity.trait.Startable; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; + +/** + * Test class for providing a Resizable LocallyManagedEntity for policy testing + * It is hooked up to a TestCluster that can be used to make assertions against + */ +public class LocallyResizableEntity extends AbstractEntity implements Resizable { + List<Integer> sizes = Lists.newArrayList(); + TestCluster cluster; + long resizeSleepTime = 0; + + public LocallyResizableEntity (TestCluster tc) { + this(null, tc); + } + @SuppressWarnings("deprecation") + public LocallyResizableEntity (Entity parent, TestCluster tc) { + super(parent); + this.cluster = tc; + setAttribute(Startable.SERVICE_UP, true); + } + + @Override + public Integer resize(Integer newSize) { + try { + Thread.sleep(resizeSleepTime); + sizes.add(newSize); + return cluster.resize(newSize); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + } + + @Override + public Integer getCurrentSize() { + return cluster.getCurrentSize(); + } + + @Override + public String toString() { + return getDisplayName(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/followthesun/AbstractFollowTheSunPolicyTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/followthesun/AbstractFollowTheSunPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/followthesun/AbstractFollowTheSunPolicyTest.java new file mode 100644 index 0000000..1928e3c --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/followthesun/AbstractFollowTheSunPolicyTest.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.followthesun; + +import static org.testng.Assert.assertEquals; + +import java.util.Collection; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.entity.basic.EntityLocal; +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.location.LocationSpec; +import org.apache.brooklyn.api.management.ManagementContext; +import org.apache.brooklyn.test.entity.LocalManagementContextForTests; +import org.apache.brooklyn.test.entity.TestApplication; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; + +import brooklyn.entity.basic.ApplicationBuilder; +import brooklyn.entity.basic.DynamicGroup; +import brooklyn.entity.basic.Entities; + +import org.apache.brooklyn.location.basic.SimulatedLocation; + +import org.apache.brooklyn.policy.loadbalancing.BalanceableContainer; +import org.apache.brooklyn.policy.loadbalancing.MockContainerEntity; +import org.apache.brooklyn.policy.loadbalancing.MockItemEntity; +import org.apache.brooklyn.policy.loadbalancing.MockItemEntityImpl; +import org.apache.brooklyn.policy.loadbalancing.Movable; +import brooklyn.test.Asserts; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.time.Time; + +import com.google.common.base.Function; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; + +public class AbstractFollowTheSunPolicyTest { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractFollowTheSunPolicyTest.class); + + protected static final long TIMEOUT_MS = 10*1000; + protected static final long SHORT_WAIT_MS = 250; + + protected static final long CONTAINER_STARTUP_DELAY_MS = 100; + + protected TestApplication app; + protected ManagementContext managementContext; + protected SimulatedLocation loc1; + protected SimulatedLocation loc2; + protected FollowTheSunPool pool; + protected DefaultFollowTheSunModel<Entity, Movable> model; + protected FollowTheSunPolicy policy; + protected Group containerGroup; + protected Group itemGroup; + protected Random random = new Random(); + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + LOG.debug("In AbstractFollowTheSunPolicyTest.setUp()"); + + MockItemEntityImpl.totalMoveCount.set(0); + MockItemEntityImpl.lastMoveTime.set(0); + + managementContext = LocalManagementContextForTests.newInstance(); + app = ApplicationBuilder.newManagedApp(TestApplication.class, managementContext); + + loc1 = managementContext.getLocationManager().createLocation(LocationSpec.create(SimulatedLocation.class).configure("name", "loc1")); + loc2 = managementContext.getLocationManager().createLocation(LocationSpec.create(SimulatedLocation.class).configure("name", "loc2")); + + containerGroup = app.createAndManageChild(EntitySpec.create(DynamicGroup.class) + .displayName("containerGroup") + .configure(DynamicGroup.ENTITY_FILTER, Predicates.instanceOf(MockContainerEntity.class))); + + itemGroup = app.createAndManageChild(EntitySpec.create(DynamicGroup.class) + .displayName("itemGroup") + .configure(DynamicGroup.ENTITY_FILTER, Predicates.instanceOf(MockItemEntity.class))); + model = new DefaultFollowTheSunModel<Entity, Movable>("pool-model"); + pool = app.createAndManageChild(EntitySpec.create(FollowTheSunPool.class)); + pool.setContents(containerGroup, itemGroup); + policy = new FollowTheSunPolicy(MockItemEntity.ITEM_USAGE_METRIC, model, FollowTheSunParameters.newDefault()); + pool.addPolicy(policy); + app.start(ImmutableList.of(loc1, loc2)); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() { + if (pool != null && policy != null) pool.removePolicy(policy); + if (app != null) Entities.destroyAll(app.getManagementContext()); + MockItemEntityImpl.totalMoveCount.set(0); + MockItemEntityImpl.lastMoveTime.set(0); + } + + /** + * Asserts that the given container have the given expected workrates (by querying the containers directly). + * Accepts an accuracy of "precision" for each container's workrate. + */ + protected void assertItemDistributionEventually(final Map<MockContainerEntity, ? extends Collection<MockItemEntity>> expected) { + try { + Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() { + public void run() { + assertItemDistribution(expected); + }}); + } catch (AssertionError e) { + String errMsg = e.getMessage()+"; "+verboseDumpToString(); + throw new RuntimeException(errMsg, e); + } + } + + protected void assertItemDistributionContinually(final Map<MockContainerEntity, Collection<MockItemEntity>> expected) { + try { + Asserts.succeedsContinually(MutableMap.of("timeout", SHORT_WAIT_MS), new Runnable() { + public void run() { + assertItemDistribution(expected); + }}); + } catch (AssertionError e) { + String errMsg = e.getMessage()+"; "+verboseDumpToString(); + throw new RuntimeException(errMsg, e); + } + } + + protected void assertItemDistribution(Map<MockContainerEntity, ? extends Collection<MockItemEntity>> expected) { + String errMsg = verboseDumpToString(); + for (Map.Entry<MockContainerEntity, ? extends Collection<MockItemEntity>> entry : expected.entrySet()) { + MockContainerEntity container = entry.getKey(); + Collection<MockItemEntity> expectedItems = entry.getValue(); + + assertEquals(ImmutableSet.copyOf(container.getBalanceableItems()), ImmutableSet.copyOf(expectedItems), errMsg); + } + } + + protected String verboseDumpToString() { + Iterable<MockContainerEntity> containers = Iterables.filter(app.getManagementContext().getEntityManager().getEntities(), MockContainerEntity.class); + Iterable<MockItemEntity> items = Iterables.filter(app.getManagementContext().getEntityManager().getEntities(), MockItemEntity.class); + + Iterable<Double> containerRates = Iterables.transform(containers, new Function<MockContainerEntity, Double>() { + @Override public Double apply(MockContainerEntity input) { + return (double) input.getWorkrate(); + }}); + + Iterable<Map<Entity, Double>> containerItemUsages = Iterables.transform(containers, new Function<MockContainerEntity, Map<Entity, Double>>() { + @Override public Map<Entity, Double> apply(MockContainerEntity input) { + return input.getItemUsage(); + }}); + + Map<MockContainerEntity, Set<Movable>> itemDistributionByContainer = Maps.newLinkedHashMap(); + for (MockContainerEntity container : containers) { + itemDistributionByContainer.put(container, container.getBalanceableItems()); + } + + Map<Movable, BalanceableContainer<?>> itemDistributionByItem = Maps.newLinkedHashMap(); + for (Movable item : items) { + itemDistributionByItem.put(item, item.getAttribute(Movable.CONTAINER)); + } + + String modelItemDistribution = model.itemDistributionToString(); + + return "containers="+containers+"; containerRates="+containerRates + +"; containerItemUsages="+containerItemUsages + +"; itemDistributionByContainer="+itemDistributionByContainer + +"; itemDistributionByItem="+itemDistributionByItem + +"; model="+modelItemDistribution + +"; totalMoves="+MockItemEntityImpl.totalMoveCount + +"; lastMoveTime="+Time.makeDateString(MockItemEntityImpl.lastMoveTime.get()); + } + + protected MockContainerEntity newContainer(TestApplication app, Location loc, String name) { + return newAsyncContainer(app, loc, name, 0); + } + + /** + * Creates a new container that will take "delay" millis to complete its start-up. + */ + protected MockContainerEntity newAsyncContainer(TestApplication app, Location loc, String name, long delay) { + // FIXME Is this comment true? + // Annoyingly, can't set parent until after the threshold config has been defined. + MockContainerEntity container = app.createAndManageChild(EntitySpec.create(MockContainerEntity.class) + .displayName(name) + .configure(MockContainerEntity.DELAY, delay)); + LOG.debug("Managed new container {}", container); + container.start(ImmutableList.of(loc)); + return container; + } + + protected static MockItemEntity newLockedItem(TestApplication app, MockContainerEntity container, String name) { + MockItemEntity item = app.createAndManageChild(EntitySpec.create(MockItemEntity.class) + .displayName(name) + .configure(MockItemEntity.IMMOVABLE, true)); + LOG.debug("Managed new locked item {}", container); + if (container != null) { + item.move(container); + } + return item; + } + + protected static MockItemEntity newItem(TestApplication app, MockContainerEntity container, String name) { + MockItemEntity item = app.createAndManageChild(EntitySpec.create(MockItemEntity.class) + .displayName(name)); + LOG.debug("Managed new item {} at {}", item, container); + if (container != null) { + item.move(container); + } + return item; + } + + protected static MockItemEntity newItem(TestApplication app, MockContainerEntity container, String name, Map<? extends Entity, Double> workpattern) { + MockItemEntity item = newItem(app, container, name); + if (workpattern != null) { + ((EntityLocal)item).setAttribute(MockItemEntity.ITEM_USAGE_METRIC, (Map) workpattern); + } + return item; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/followthesun/FollowTheSunModelTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/followthesun/FollowTheSunModelTest.java b/policy/src/test/java/org/apache/brooklyn/policy/followthesun/FollowTheSunModelTest.java new file mode 100644 index 0000000..0edf888 --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/followthesun/FollowTheSunModelTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.followthesun; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.location.basic.SimulatedLocation; + +import org.apache.brooklyn.policy.loadbalancing.MockContainerEntity; +import org.apache.brooklyn.policy.loadbalancing.MockContainerEntityImpl; +import org.apache.brooklyn.policy.loadbalancing.MockItemEntity; +import org.apache.brooklyn.policy.loadbalancing.MockItemEntityImpl; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +public class FollowTheSunModelTest { + + private Location loc1 = new SimulatedLocation(DefaultFollowTheSunModel.newHashMap("name","loc1")); + private Location loc2 = new SimulatedLocation(DefaultFollowTheSunModel.newHashMap("name","loc2")); + private MockContainerEntity container1 = new MockContainerEntityImpl(); + private MockContainerEntity container2 = new MockContainerEntityImpl(); + private MockItemEntity item1 = new MockItemEntityImpl(); + private MockItemEntity item2 = new MockItemEntityImpl(); + private MockItemEntity item3 = new MockItemEntityImpl(); + + private DefaultFollowTheSunModel<MockContainerEntity, MockItemEntity> model; + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + model = new DefaultFollowTheSunModel<MockContainerEntity, MockItemEntity>("myname"); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + // noting to tear down; no management context created + } + + @Test + public void testSimpleAddAndRemove() throws Exception { + model.onContainerAdded(container1, loc1); + model.onContainerAdded(container2, loc2); + model.onItemAdded(item1, container1, true); + model.onItemAdded(item2, container2, true); + + assertEquals(model.getContainerLocation(container1), loc1); + assertEquals(model.getContainerLocation(container2), loc2); + assertEquals(model.getItems(), ImmutableSet.of(item1, item2)); + assertEquals(model.getItemLocation(item1), loc1); + assertEquals(model.getItemLocation(item2), loc2); + assertEquals(model.getItemContainer(item1), container1); + assertEquals(model.getItemContainer(item2), container2); + + model.onContainerRemoved(container2); + model.onItemRemoved(item2); + + assertEquals(model.getContainerLocation(container1), loc1); + assertEquals(model.getContainerLocation(container2), null); + assertEquals(model.getItems(), ImmutableSet.of(item1)); + assertEquals(model.getItemLocation(item1), loc1); + assertEquals(model.getItemLocation(item2), null); + assertEquals(model.getItemContainer(item1), container1); + assertEquals(model.getItemContainer(item2), null); + } + + @Test + public void testItemUsageMetrics() throws Exception { + model.onContainerAdded(container1, loc1); + model.onContainerAdded(container2, loc2); + model.onItemAdded(item1, container1, true); + model.onItemAdded(item2, container2, true); + + model.onItemUsageUpdated(item1, ImmutableMap.of(item2, 12d)); + model.onItemUsageUpdated(item2, ImmutableMap.of(item1, 11d)); + + assertEquals(model.getDirectSendsToItemByLocation(), + ImmutableMap.of(item1, ImmutableMap.of(loc2, 12d), item2, ImmutableMap.of(loc1, 11d))); + } + + @Test + public void testItemUsageReportedIfLocationSetAfterUsageUpdate() throws Exception { + model.onContainerAdded(container1, null); + model.onContainerAdded(container2, null); + model.onItemAdded(item1, container1, true); + model.onItemAdded(item2, container2, true); + model.onItemUsageUpdated(item1, ImmutableMap.of(item2, 12d)); + model.onContainerLocationUpdated(container1, loc1); + model.onContainerLocationUpdated(container2, loc2); + + assertEquals(model.getDirectSendsToItemByLocation(), + ImmutableMap.of(item1, ImmutableMap.of(loc2, 12d))); + } + + @Test + public void testItemUsageMetricsSummedForActorsInSameLocation() throws Exception { + model.onContainerAdded(container1, loc1); + model.onContainerAdded(container2, loc2); + model.onItemAdded(item1, container1, true); + model.onItemAdded(item2, container2, true); + model.onItemAdded(item3, container2, true); + + model.onItemUsageUpdated(item1, ImmutableMap.of(item2, 12d, item3, 13d)); + + assertEquals(model.getDirectSendsToItemByLocation(), + ImmutableMap.of(item1, ImmutableMap.of(loc2, 12d+13d))); + } + + @Test + public void testItemMovedWillUpdateLocationUsage() throws Exception { + model.onContainerAdded(container1, loc1); + model.onContainerAdded(container2, loc2); + model.onItemAdded(item1, container1, false); + model.onItemAdded(item2, container2, false); + model.onItemUsageUpdated(item2, ImmutableMap.of(item1, 12d)); + + model.onItemMoved(item1, container2); + + assertEquals(model.getDirectSendsToItemByLocation(), + ImmutableMap.of(item2, ImmutableMap.of(loc2, 12d))); + assertEquals(model.getItemContainer(item1), container2); + assertEquals(model.getItemLocation(item1), loc2); + } + + @Test + public void testItemAddedWithNoContainer() throws Exception { + model.onItemAdded(item1, null, true); + + assertEquals(model.getItems(), ImmutableSet.of(item1)); + assertEquals(model.getItemContainer(item1), null); + assertEquals(model.getItemLocation(item1), null); + } + + @Test + public void testItemAddedBeforeContainer() throws Exception { + model.onItemAdded(item1, container1, true); + model.onContainerAdded(container1, loc1); + + assertEquals(model.getItems(), ImmutableSet.of(item1)); + assertEquals(model.getItemContainer(item1), container1); + assertEquals(model.getItemLocation(item1), loc1); + } + + @Test + public void testItemMovedBeforeContainerAdded() throws Exception { + model.onContainerAdded(container1, loc1); + model.onItemAdded(item1, container1, true); + model.onItemMoved(item1, container2); + model.onContainerAdded(container2, loc2); + + assertEquals(model.getItems(), ImmutableSet.of(item1)); + assertEquals(model.getItemContainer(item1), container2); + assertEquals(model.getItemLocation(item1), loc2); + } + + @Test + public void testItemAddedAnswersMovability() throws Exception { + model.onItemAdded(item1, container1, false); + model.onItemAdded(item2, container1, true); + assertTrue(model.isItemMoveable(item1)); + assertFalse(model.isItemMoveable(item2)); + } + + @Test + public void testWorkrateUpdateAfterItemRemovalIsNotRecorded() throws Exception { + model.onContainerAdded(container1, loc1); + model.onItemAdded(item1, container1, true); + model.onItemAdded(item2, container1, true); + model.onItemRemoved(item1); + model.onItemUsageUpdated(item1, ImmutableMap.of(item2, 123d)); + + assertFalse(model.getDirectSendsToItemByLocation().containsKey(item1)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPolicySoakTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPolicySoakTest.java b/policy/src/test/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPolicySoakTest.java new file mode 100644 index 0000000..178a9ae --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPolicySoakTest.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.followthesun; + +import static org.testng.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.basic.EntityLocal; +import org.apache.brooklyn.api.location.Location; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.Entities; + +import org.apache.brooklyn.location.basic.SimulatedLocation; + +import org.apache.brooklyn.policy.loadbalancing.BalanceableContainer; +import org.apache.brooklyn.policy.loadbalancing.MockContainerEntity; +import org.apache.brooklyn.policy.loadbalancing.MockItemEntity; +import org.apache.brooklyn.policy.loadbalancing.MockItemEntityImpl; +import org.apache.brooklyn.policy.loadbalancing.Movable; +import brooklyn.test.Asserts; +import brooklyn.util.collections.MutableMap; + +import com.google.common.base.Function; +import com.google.common.base.Predicates; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + +public class FollowTheSunPolicySoakTest extends AbstractFollowTheSunPolicyTest { + + private static final Logger LOG = LoggerFactory.getLogger(FollowTheSunPolicySoakTest.class); + + private static final long TIMEOUT_MS = 10*1000; + + @Test + public void testFollowTheSunQuickTest() { + RunConfig config = new RunConfig(); + config.numCycles = 1; + config.numLocations=3; + config.numContainersPerLocation = 5; + config.numLockedItemsPerLocation = 2; + config.numMovableItems = 10; + + runFollowTheSunSoakTest(config); + } + + @Test + public void testLoadBalancingManyItemsQuickTest() { + RunConfig config = new RunConfig(); + config.numCycles = 1; + config.numLocations=2; + config.numContainersPerLocation = 3; + config.numLockedItemsPerLocation = 2; + config.numMovableItems = 10; + config.numContainerStopsPerCycle = 1; + config.numItemStopsPerCycle = 1; + + runFollowTheSunSoakTest(config); + } + + @Test(groups={"Integration"}) // takes ~2s + public void testLoadBalancingManyItemsNotTooLongTest() { + RunConfig config = new RunConfig(); + config.numCycles = 1; + config.numLocations=3; + config.numContainersPerLocation = 5; + config.numLockedItemsPerLocation = 2; + config.numMovableItems = 500; + config.numContainerStopsPerCycle = 1; + config.numItemStopsPerCycle = 1; + + runFollowTheSunSoakTest(config); + } + + @Test(groups={"Integration","Acceptance"}) // integration group, because it's slow to run many cycles + public void testLoadBalancingSoakTest() { + RunConfig config = new RunConfig(); + config.numCycles = 100; + config.numLocations=3; + config.numContainersPerLocation = 5; + config.numLockedItemsPerLocation = 2; + config.numMovableItems = 10; + + runFollowTheSunSoakTest(config); + } + + @Test(groups={"Integration","Acceptance"}) // integration group, because it's slow to run many cycles + public void testLoadBalancingManyItemsSoakTest() { + RunConfig config = new RunConfig(); + config.numCycles = 100; + config.numLocations=3; + config.numContainersPerLocation = 5; + config.numLockedItemsPerLocation = 2; + config.numMovableItems = 100; + config.numContainerStopsPerCycle = 3; + config.numItemStopsPerCycle = 10; + + runFollowTheSunSoakTest(config); + } + + @Test(groups={"Integration","Acceptance"}) // integration group, because it's slow to run many cycles + public void testLoadBalancingManyManyItemsTest() { + RunConfig config = new RunConfig(); + config.numCycles = 1; + config.numLocations=10; + config.numContainersPerLocation = 5; + config.numLockedItemsPerLocation = 100; + config.numMovableItems = 1000; + config.numContainerStopsPerCycle = 0; + config.numItemStopsPerCycle = 0; + config.timeout_ms = 30*1000; + config.verbose = false; + + runFollowTheSunSoakTest(config); + } + + private void runFollowTheSunSoakTest(RunConfig config) { + int numCycles = config.numCycles; + int numLocations = config.numLocations; + int numContainersPerLocation = config.numContainersPerLocation; + int numLockedItemsPerLocation = config.numLockedItemsPerLocation; + int numMovableItems = config.numMovableItems; + + int numContainerStopsPerCycle = config.numContainerStopsPerCycle; + int numItemStopsPerCycle = config.numItemStopsPerCycle; + long timeout_ms = config.timeout_ms; + final boolean verbose = config.verbose; + + MockItemEntityImpl.totalMoveCount.set(0); + + List<Location> locations = new ArrayList<Location>(); + Multimap<Location,MockContainerEntity> containers = HashMultimap.<Location,MockContainerEntity>create(); + Multimap<Location,MockItemEntity> lockedItems = HashMultimap.<Location,MockItemEntity>create(); + final List<MockItemEntity> movableItems = new ArrayList<MockItemEntity>(); + + for (int i = 1; i <= numLocations; i++) { + String locName = "loc"+i; + Location loc = new SimulatedLocation(MutableMap.of("name",locName)); + locations.add(loc); + + for (int j = 1; j <= numContainersPerLocation; j++) { + MockContainerEntity container = newContainer(app, loc, "container-"+locName+"-"+j); + containers.put(loc, container); + } + for (int j = 1; j <= numLockedItemsPerLocation; j++) { + MockContainerEntity container = Iterables.get(containers.get(loc), j%numContainersPerLocation); + MockItemEntity item = newLockedItem(app, container, "item-locked-"+locName+"-"+j); + lockedItems.put(loc, item); + } + } + + for (int i = 1; i <= numMovableItems; i++) { + MockContainerEntity container = Iterables.get(containers.values(), i%containers.size()); + MockItemEntity item = newItem(app, container, "item-movable"+i); + movableItems.add(item); + } + + for (int i = 1; i <= numCycles; i++) { + LOG.info("{}: cycle {}", FollowTheSunPolicySoakTest.class.getSimpleName(), i); + + // Stop movable items, and start others + for (int j = 1; j <= numItemStopsPerCycle; j++) { + int itemIndex = random.nextInt(numMovableItems); + MockItemEntity itemToStop = movableItems.get(itemIndex); + itemToStop.stop(); + LOG.debug("Unmanaging item {}", itemToStop); + Entities.unmanage(itemToStop); + movableItems.set(itemIndex, newItem(app, Iterables.get(containers.values(), 0), "item-movable"+itemIndex)); + } + + // Choose a location to be busiest + int locIndex = random.nextInt(numLocations); + final Location busiestLocation = locations.get(locIndex); + + // Repartition the load across the items + for (int j = 0; j < numMovableItems; j++) { + MockItemEntity item = movableItems.get(j); + Map<Entity, Double> workrates = Maps.newLinkedHashMap(); + + for (Map.Entry<Location,MockItemEntity> entry : lockedItems.entries()) { + Location location = entry.getKey(); + MockItemEntity source = entry.getValue(); + double baseWorkrate = (location == busiestLocation ? 1000 : 100); + double jitter = 10; + double jitteredWorkrate = Math.max(0, baseWorkrate + (random.nextDouble()*jitter*2 - jitter)); + workrates.put(source, jitteredWorkrate); + } + ((EntityLocal)item).setAttribute(MockItemEntity.ITEM_USAGE_METRIC, workrates); + } + + // Stop containers, and start others + // This offloads the "immovable" items to other containers in the same location! + for (int j = 1; j <= numContainerStopsPerCycle; j++) { + int containerIndex = random.nextInt(containers.size()); + MockContainerEntity containerToStop = Iterables.get(containers.values(), containerIndex); + Location location = Iterables.get(containerToStop.getLocations(), 0); + MockContainerEntity otherContainerInLocation = Iterables.find(containers.get(location), Predicates.not(Predicates.equalTo(containerToStop)), null); + containerToStop.offloadAndStop(otherContainerInLocation); + LOG.debug("Unmanaging container {}", containerToStop); + Entities.unmanage(containerToStop); + containers.remove(location, containerToStop); + + MockContainerEntity containerToAdd = newContainer(app, location, "container-"+location.getDisplayName()+"-new."+i+"."+j); + containers.put(location, containerToAdd); + } + + // Assert that the items all end up in the location with maximum load-generation + Asserts.succeedsEventually(MutableMap.of("timeout", timeout_ms), new Runnable() { + public void run() { + Iterable<Location> itemLocs = Iterables.transform(movableItems, new Function<MockItemEntity, Location>() { + public Location apply(MockItemEntity input) { + BalanceableContainer<?> container = input.getAttribute(Movable.CONTAINER); + Collection<Location> locs = (container != null) ? container.getLocations(): null; + return (locs != null && locs.size() > 0) ? Iterables.get(locs, 0) : null; + }}); + + Iterable<String> itemLocNames = Iterables.transform(itemLocs, new Function<Location, String>() { + public String apply(Location input) { + return (input != null) ? input.getDisplayName() : null; + }}); + String errMsg; + if (verbose) { + errMsg = verboseDumpToString()+"; itemLocs="+itemLocNames; + } else { + Set<String> locNamesInUse = Sets.newLinkedHashSet(itemLocNames); + errMsg = "locsInUse="+locNamesInUse+"; totalMoves="+MockItemEntityImpl.totalMoveCount; + } + + assertEquals(ImmutableList.copyOf(itemLocs), Collections.nCopies(movableItems.size(), busiestLocation), errMsg); + }}); + } + } + + static class RunConfig { + int numCycles = 1; + int numLocations = 3; + int numContainersPerLocation = 5; + int numLockedItemsPerLocation = 5; + int numMovableItems = 5; + int numContainerStopsPerCycle = 0; + int numItemStopsPerCycle = 0; + long timeout_ms = TIMEOUT_MS; + boolean verbose = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPolicyTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPolicyTest.java new file mode 100644 index 0000000..872efab --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPolicyTest.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.followthesun; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.basic.EntityLocal; +import org.apache.brooklyn.api.event.SensorEvent; +import org.apache.brooklyn.api.event.SensorEventListener; +import org.apache.brooklyn.api.location.Location; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.Entities; + +import org.apache.brooklyn.location.basic.SimulatedLocation; + +import org.apache.brooklyn.policy.loadbalancing.MockContainerEntity; +import org.apache.brooklyn.policy.loadbalancing.MockItemEntity; +import org.apache.brooklyn.policy.loadbalancing.Movable; +import brooklyn.test.Asserts; +import brooklyn.util.collections.MutableMap; + +import com.google.common.base.Function; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + +public class FollowTheSunPolicyTest extends AbstractFollowTheSunPolicyTest { + + private static final Logger LOG = LoggerFactory.getLogger(FollowTheSunPolicyTest.class); + + @Test + public void testPolicyUpdatesModel() { + final MockContainerEntity containerA = newContainer(app, loc1, "A"); + final MockItemEntity item1 = newItem(app, containerA, "1"); + final MockItemEntity item2 = newItem(app, containerA, "2"); + ((EntityLocal)item2).setAttribute(MockItemEntity.ITEM_USAGE_METRIC, ImmutableMap.<Entity,Double>of(item1, 11d)); + + Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() { + @Override public void run() { + assertEquals(ImmutableSet.of(item1, item2), model.getItems()); + assertEquals(model.getItemContainer(item1), containerA); + assertEquals(model.getItemLocation(item1), loc1); + assertEquals(model.getContainerLocation(containerA), loc1); + assertEquals(model.getDirectSendsToItemByLocation(), ImmutableMap.of(item2, ImmutableMap.of(loc1, 11d))); + }}); + } + + @Test + public void testPolicyAcceptsLocationFinder() { + pool.removePolicy(policy); + + Function<Entity, Location> customLocationFinder = new Function<Entity, Location>() { + @Override public Location apply(Entity input) { + return new SimulatedLocation(MutableMap.of("name", "custom location for "+input)); + }}; + + FollowTheSunPolicy customPolicy = new FollowTheSunPolicy( + MutableMap.of("minPeriodBetweenExecs", 0, "locationFinder", customLocationFinder), + MockItemEntity.ITEM_USAGE_METRIC, + model, + FollowTheSunParameters.newDefault()); + + pool.addPolicy(customPolicy); + + final MockContainerEntity containerA = newContainer(app, loc1, "A"); + + Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() { + @Override public void run() { + assertEquals(model.getContainerLocation(containerA).getDisplayName(), "custom location for "+containerA); + }}); + } + + @Test + public void testNoopBalancing() throws Exception { + // Set-up containers and items. + MockContainerEntity containerA = newContainer(app, loc1, "A"); + MockContainerEntity containerB = newContainer(app, loc2, "B"); + MockItemEntity item1 = newItem(app, containerA, "1", Collections.<Entity, Double>emptyMap()); + MockItemEntity item2 = newItem(app, containerB, "2", Collections.<Entity, Double>emptyMap()); + + Thread.sleep(SHORT_WAIT_MS); + assertItemDistributionEventually(ImmutableMap.of(containerA, ImmutableList.of(item1), containerB, ImmutableList.of(item2))); + } + + @Test + public void testMovesItemToFollowDemand() throws Exception { + // Set-up containers and items. + MockContainerEntity containerA = newContainer(app, loc1, "A"); + MockContainerEntity containerB = newContainer(app, loc2, "B"); + MockItemEntity item1 = newItem(app, containerA, "1"); + MockItemEntity item2 = newItem(app, containerB, "2"); + + ((EntityLocal)item1).setAttribute(MockItemEntity.ITEM_USAGE_METRIC, ImmutableMap.<Entity,Double>of(item2, 100d)); + + assertItemDistributionEventually(ImmutableMap.of(containerA, ImmutableList.<MockItemEntity>of(), containerB, ImmutableList.of(item1, item2))); + } + + @Test + public void testNoopIfDemandIsTiny() throws Exception { + // Set-up containers and items. + MockContainerEntity containerA = newContainer(app, loc1, "A"); + MockContainerEntity containerB = newContainer(app, loc2, "B"); + MockItemEntity item1 = newItem(app, containerA, "1"); + MockItemEntity item2 = newItem(app, containerB, "2"); + + ((EntityLocal)item1).setAttribute(MockItemEntity.ITEM_USAGE_METRIC, ImmutableMap.<Entity,Double>of(item2, 0.1d)); + + Thread.sleep(SHORT_WAIT_MS); + assertItemDistributionEventually(ImmutableMap.of(containerA, ImmutableList.of(item1), containerB, ImmutableList.of(item2))); + } + + @Test + public void testNoopIfDemandIsSimilarToCurrentLocation() throws Exception { + // Set-up containers and items. + MockContainerEntity containerA = newContainer(app, loc1, "A"); + MockContainerEntity containerB = newContainer(app, loc2, "B"); + MockItemEntity item1 = newItem(app, containerA, "1"); + MockItemEntity item2 = newItem(app, containerA, "2"); + MockItemEntity item3 = newItem(app, containerB, "3"); + + ((EntityLocal)item1).setAttribute(MockItemEntity.ITEM_USAGE_METRIC, ImmutableMap.<Entity,Double>of(item2, 100d, item3, 100.1d)); + + Thread.sleep(SHORT_WAIT_MS); + assertItemDistributionEventually(ImmutableMap.of(containerA, ImmutableList.of(item1, item2), containerB, ImmutableList.of(item3))); + } + + @Test + public void testMoveDecisionIgnoresDemandFromItself() throws Exception { + // Set-up containers and items. + MockContainerEntity containerA = newContainer(app, loc1, "A"); + MockContainerEntity containerB = newContainer(app, loc2, "B"); + MockItemEntity item1 = newItem(app, containerA, "1"); + MockItemEntity item2 = newItem(app, containerB, "2"); + + ((EntityLocal)item1).setAttribute(MockItemEntity.ITEM_USAGE_METRIC, ImmutableMap.<Entity,Double>of(item1, 100d)); + ((EntityLocal)item2).setAttribute(MockItemEntity.ITEM_USAGE_METRIC, ImmutableMap.<Entity,Double>of(item2, 100d)); + + Thread.sleep(SHORT_WAIT_MS); + assertItemDistributionEventually(ImmutableMap.of(containerA, ImmutableList.of(item1), containerB, ImmutableList.of(item2))); + } + + @Test + public void testItemRemovedCausesRecalculationOfOptimalLocation() { + // Set-up containers and items. + MockContainerEntity containerA = newContainer(app, loc1, "A"); + MockContainerEntity containerB = newContainer(app, loc2, "B"); + MockItemEntity item1 = newItem(app, containerA, "1"); + MockItemEntity item2 = newItem(app, containerA, "2"); + MockItemEntity item3 = newItem(app, containerB, "3"); + + ((EntityLocal)item1).setAttribute(MockItemEntity.ITEM_USAGE_METRIC, ImmutableMap.<Entity,Double>of(item2, 100d, item3, 1000d)); + + assertItemDistributionEventually(ImmutableMap.of(containerA, ImmutableList.of(item2), containerB, ImmutableList.of(item1, item3))); + + item3.stop(); + Entities.unmanage(item3); + + assertItemDistributionEventually(ImmutableMap.of(containerA, ImmutableList.of(item1, item2), containerB, ImmutableList.<MockItemEntity>of())); + } + + @Test + public void testItemMovedCausesRecalculationOfOptimalLocationForOtherItems() { + // Set-up containers and items. + MockContainerEntity containerA = newContainer(app, loc1, "A"); + MockContainerEntity containerB = newContainer(app, loc2, "B"); + MockItemEntity item1 = newItem(app, containerA, "1"); + MockItemEntity item2 = newItem(app, containerA, "2"); + MockItemEntity item3 = newItem(app, containerB, "3"); + + ((EntityLocal)item1).setAttribute(MockItemEntity.ITEM_USAGE_METRIC, ImmutableMap.<Entity,Double>of(item2, 100d, item3, 1000d)); + + assertItemDistributionEventually(ImmutableMap.of(containerA, ImmutableList.of(item2), containerB, ImmutableList.of(item1, item3))); + + item3.move(containerA); + + assertItemDistributionEventually(ImmutableMap.of(containerA, ImmutableList.of(item1, item2, item3), containerB, ImmutableList.<MockItemEntity>of())); + } + + @Test + public void testImmovableItemIsNotMoved() { + MockContainerEntity containerA = newContainer(app, loc1, "A"); + MockContainerEntity containerB = newContainer(app, loc2, "B"); + MockItemEntity item1 = newLockedItem(app, containerA, "1"); + MockItemEntity item2 = newItem(app, containerB, "2"); + + ((EntityLocal)item1).setAttribute(MockItemEntity.ITEM_USAGE_METRIC, ImmutableMap.<Entity,Double>of(item2, 100d)); + + assertItemDistributionEventually(ImmutableMap.of(containerA, ImmutableList.of(item1), containerB, ImmutableList.of(item2))); + } + + @Test + public void testImmovableItemContributesTowardsLoad() { + MockContainerEntity containerA = newContainer(app, loc1, "A"); + MockContainerEntity containerB = newContainer(app, loc2, "B"); + MockItemEntity item1 = newLockedItem(app, containerA, "1"); + MockItemEntity item2 = newItem(app, containerA, "2"); + + ((EntityLocal)item1).setAttribute(MockItemEntity.ITEM_USAGE_METRIC, ImmutableMap.<Entity,Double>of(item1, 100d)); + + assertItemDistributionEventually(ImmutableMap.of(containerA, ImmutableList.of(item1, item2), containerB, ImmutableList.<MockItemEntity>of())); + } + + // Marked as "Acceptance" due to time-sensitive nature :-( + @Test(groups={"Integration", "Acceptance"}, invocationCount=20) + public void testRepeatedRespectsMinPeriodBetweenExecs() throws Exception { + testRespectsMinPeriodBetweenExecs(); + } + + @Test(groups="Integration") + public void testRespectsMinPeriodBetweenExecs() throws Exception { + // Failed in jenkins several times, e.g. with event times [2647, 2655] and [1387, 2001]. + // Aled's guess is that there was a delay notifying the listener the first time + // (which happens async), causing the listener to be notified in rapid + // succession. The policy execs probably did happen with a 1000ms separation. + // + // Therefore try up to three times to see if we get the desired separation. If the + // minPeriodBetweenExecs wasn't being respected, we'd expect the default 100ms; this + // test would therefore hardly ever pass. + final int MAX_ATTEMPTS = 3; + + final long minPeriodBetweenExecs = 1000; + final long timePrecision = 250; + + pool.removePolicy(policy); + + MockContainerEntity containerA = newContainer(app, loc1, "A"); + MockContainerEntity containerB = newContainer(app, loc2, "B"); + MockItemEntity item1 = newItem(app, containerA, "1"); + MockItemEntity item2 = newItem(app, containerB, "2"); + MockItemEntity item3 = newItem(app, containerA, "3"); + + FollowTheSunPolicy customPolicy = new FollowTheSunPolicy( + MutableMap.of("minPeriodBetweenExecs", minPeriodBetweenExecs), + MockItemEntity.ITEM_USAGE_METRIC, + model, + FollowTheSunParameters.newDefault()); + + pool.addPolicy(customPolicy); + + // Record times that things are moved, by lisening to the container sensor being set + final Stopwatch stopwatch = Stopwatch.createStarted(); + + final List<Long> eventTimes = Lists.newCopyOnWriteArrayList(); + final Semaphore semaphore = new Semaphore(0); + + app.subscribe(item1, Movable.CONTAINER, new SensorEventListener<Entity>() { + @Override public void onEvent(SensorEvent<Entity> event) { + long eventTime = stopwatch.elapsed(TimeUnit.MILLISECONDS); + LOG.info("Received {} at {}", event, eventTime); + eventTimes.add(eventTime); + semaphore.release(); + }}); + + String errmsg = ""; + for (int i = 0; i < MAX_ATTEMPTS; i++) { + // Set the workrate, causing the policy to move item1 to item2's location, and wait for it to happen + ((EntityLocal)item1).setAttribute(MockItemEntity.ITEM_USAGE_METRIC, ImmutableMap.<Entity,Double>of(item2, 100d)); + assertTrue(semaphore.tryAcquire(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + assertEquals(item1.getAttribute(Movable.CONTAINER), containerB); + + // now cause item1 to be moved to item3's location, and wait for it to happen + ((EntityLocal)item1).setAttribute(MockItemEntity.ITEM_USAGE_METRIC, ImmutableMap.<Entity,Double>of(item3, 100d)); + assertTrue(semaphore.tryAcquire(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + assertEquals(item1.getAttribute(Movable.CONTAINER), containerA); + + LOG.info("testRepeatedRespectsMinPeriodBetweenExecs event times: "+eventTimes); + assertEquals(eventTimes.size(), 2); + if (eventTimes.get(1) - eventTimes.get(0) > (minPeriodBetweenExecs-timePrecision)) { + return; // success + } else { + errmsg += eventTimes; + eventTimes.clear(); + } + } + + fail("Event times never had sufficient gap: "+errmsg); + } +}
