Repository: incubator-brooklyn Updated Branches: refs/heads/master f7b90474a -> 284b763d2
AutoScaler now takes config keys for iteration increment and iteration max e.g. to say increase in batches of 10, decrease in batches of 5 Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/c1a5df7d Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/c1a5df7d Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/c1a5df7d Branch: refs/heads/master Commit: c1a5df7dda1bd2215975b2717ad3579ada99bb77 Parents: 54af993 Author: Alex Heneveld <[email protected]> Authored: Tue May 26 06:26:04 2015 +0100 Committer: Alex Heneveld <[email protected]> Committed: Tue Jun 2 19:18:09 2015 +0200 ---------------------------------------------------------------------- .../policy/basic/AbstractEntityAdjunct.java | 6 +- .../policy/autoscaling/AutoScalerPolicy.java | 289 +++++++++++++------ .../policy/autoscaling/SizeHistory.java | 12 +- .../autoscaling/AutoScalerPolicyTest.java | 114 +++++++- .../autoscaling/LocallyResizableEntity.java | 1 + 5 files changed, 320 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c1a5df7d/core/src/main/java/brooklyn/policy/basic/AbstractEntityAdjunct.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/policy/basic/AbstractEntityAdjunct.java b/core/src/main/java/brooklyn/policy/basic/AbstractEntityAdjunct.java index 638818e..b14fd3e 100644 --- a/core/src/main/java/brooklyn/policy/basic/AbstractEntityAdjunct.java +++ b/core/src/main/java/brooklyn/policy/basic/AbstractEntityAdjunct.java @@ -19,7 +19,6 @@ package brooklyn.policy.basic; import static brooklyn.util.GroovyJavaMethods.truth; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import java.util.Collection; @@ -326,6 +325,11 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple return configsInternal; } + /** + * Invoked whenever a config change is applied after management is started. + * Default implementation throws an exception to disallow the change. + * Can be overridden to return (allowing the change) or to make other changes + * (if necessary), and of course it can do this selectively and call the super to disallow any others. */ protected <T> void doReconfigureConfig(ConfigKey<T> key, T val) { throw new UnsupportedOperationException("reconfiguring "+key+" unsupported for "+this); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c1a5df7d/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java b/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java index 8727f54..bbf1249 100644 --- a/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java +++ b/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java @@ -90,6 +90,10 @@ public class AutoScalerPolicy extends AbstractPolicy { private Number metricLowerBound; private int minPoolSize = 0; private int maxPoolSize = Integer.MAX_VALUE; + private Integer resizeDownIterationIncrement; + private Integer resizeDownIterationMax; + private Integer resizeUpIterationIncrement; + private Integer resizeUpIterationMax; private Duration minPeriodBetweenExecs; private Duration resizeUpStabilizationDelay; private Duration resizeDownStabilizationDelay; @@ -135,6 +139,20 @@ public class AutoScalerPolicy extends AbstractPolicy { maxPoolSize = max; return this; } + + public Builder resizeUpIterationIncrement(Integer val) { + this.resizeUpIterationIncrement = val; return this; + } + public Builder resizeUpIterationMax(Integer val) { + this.resizeUpIterationMax = val; return this; + } + public Builder resizeDownIterationIncrement(Integer val) { + this.resizeUpIterationIncrement = val; return this; + } + public Builder resizeDownIterationMax(Integer val) { + this.resizeUpIterationMax = val; return this; + } + /** * @deprecated since 0.7; use {@link #minPeriodBetweenExecs(Duration)} */ @@ -210,6 +228,10 @@ public class AutoScalerPolicy extends AbstractPolicy { .putIfNotNull("metricLowerBound", metricLowerBound) .putIfNotNull("minPoolSize", minPoolSize) .putIfNotNull("maxPoolSize", maxPoolSize) + .putIfNotNull("resizeUpIterationMax", resizeUpIterationMax) + .putIfNotNull("resizeUpIterationIncrement", resizeUpIterationIncrement) + .putIfNotNull("resizeDownIterationMax", resizeDownIterationMax) + .putIfNotNull("resizeDownIterationIncrement", resizeDownIterationIncrement) .putIfNotNull("minPeriodBetweenExecs", minPeriodBetweenExecs) .putIfNotNull("resizeUpStabilizationDelay", resizeUpStabilizationDelay) .putIfNotNull("resizeDownStabilizationDelay", resizeDownStabilizationDelay) @@ -260,6 +282,7 @@ public class AutoScalerPolicy extends AbstractPolicy { public static final String POOL_LOW_THRESHOLD_KEY = "pool.low.threshold"; public static final String POOL_CURRENT_WORKRATE_KEY = "pool.current.workrate"; + @SuppressWarnings("serial") @SetFromFlag("metric") public static final ConfigKey<AttributeSensor<? extends Number>> METRIC = BasicConfigKey.builder(new TypeToken<AttributeSensor<? extends Number>>() {}) .name("autoscaler.metric") @@ -282,6 +305,35 @@ public class AutoScalerPolicy extends AbstractPolicy { .reconfigurable(true) .build(); + @SetFromFlag("resizeUpIterationIncrement") + public static final ConfigKey<Integer> RESIZE_UP_ITERATION_INCREMENT = BasicConfigKey.builder(Integer.class) + .name("autoscaler.resizeUpIterationIncrement") + .description("Batch size for resizing up; the size will be increased by a mulitiple of this value") + .defaultValue(1) + .reconfigurable(true) + .build(); + @SetFromFlag("resizeUpIterationMax") + public static final ConfigKey<Integer> RESIZE_UP_ITERATION_MAX = BasicConfigKey.builder(Integer.class) + .name("autoscaler.resizeUpIterationMax") + .defaultValue(Integer.MAX_VALUE) + .description("Maximum change to the size on a single iteration when scaling up") + .reconfigurable(true) + .build(); + @SetFromFlag("resizeDownIterationIncrement") + public static final ConfigKey<Integer> RESIZE_DOWN_ITERATION_INCREMENT = BasicConfigKey.builder(Integer.class) + .name("autoscaler.resizeDownIterationIncrement") + .description("Batch size for resizing down; the size will be decreased by a mulitiple of this value") + .defaultValue(1) + .reconfigurable(true) + .build(); + @SetFromFlag("resizeDownIterationMax") + public static final ConfigKey<Integer> RESIZE_DOWN_ITERATION_MAX = BasicConfigKey.builder(Integer.class) + .name("autoscaler.resizeDownIterationMax") + .defaultValue(Integer.MAX_VALUE) + .description("Maximum change to the size on a single iteration when scaling down") + .reconfigurable(true) + .build(); + @SetFromFlag("minPeriodBetweenExecs") public static final ConfigKey<Duration> MIN_PERIOD_BETWEEN_EXECS = BasicConfigKey.builder(Duration.class) .name("autoscaler.minPeriodBetweenExecs") @@ -325,6 +377,7 @@ public class AutoScalerPolicy extends AbstractPolicy { }}) .build(); + @SuppressWarnings("serial") @SetFromFlag("currentSizeOperator") public static final ConfigKey<Function<Entity,Integer>> CURRENT_SIZE_OPERATOR = BasicConfigKey.builder(new TypeToken<Function<Entity,Integer>>() {}) .name("autoscaler.currentSizeOperator") @@ -334,24 +387,28 @@ public class AutoScalerPolicy extends AbstractPolicy { }}) .build(); + @SuppressWarnings("serial") @SetFromFlag("poolHotSensor") public static final ConfigKey<BasicNotificationSensor<? extends Map>> POOL_HOT_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? extends Map>>() {}) .name("autoscaler.poolHotSensor") .defaultValue(DEFAULT_POOL_HOT_SENSOR) .build(); + @SuppressWarnings("serial") @SetFromFlag("poolColdSensor") public static final ConfigKey<BasicNotificationSensor<? extends Map>> POOL_COLD_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? extends Map>>() {}) .name("autoscaler.poolColdSensor") .defaultValue(DEFAULT_POOL_COLD_SENSOR) .build(); + @SuppressWarnings("serial") @SetFromFlag("poolOkSensor") public static final ConfigKey<BasicNotificationSensor<? extends Map>> POOL_OK_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? extends Map>>() {}) .name("autoscaler.poolOkSensor") .defaultValue(DEFAULT_POOL_OK_SENSOR) .build(); + @SuppressWarnings("serial") @SetFromFlag("maxSizeReachedSensor") public static final ConfigKey<BasicNotificationSensor<? super MaxPoolSizeReachedEvent>> MAX_SIZE_REACHED_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? super MaxPoolSizeReachedEvent>>() {}) .name("autoscaler.maxSizeReachedSensor") @@ -443,6 +500,31 @@ public class AutoScalerPolicy extends AbstractPolicy { config().set(METRIC_UPPER_BOUND, checkNotNull(val)); } + private <T> void setOrDefault(ConfigKey<T> key, T val) { + if (val==null) val = key.getDefaultValue(); + config().set(key, val); + } + public int getResizeUpIterationIncrement() { return getConfig(RESIZE_UP_ITERATION_INCREMENT); } + public void setResizeUpIterationIncrement(Integer val) { + if (LOG.isInfoEnabled()) LOG.info("{} changing resizeUpIterationIncrement from {} to {}", new Object[] {this, getResizeUpIterationIncrement(), val}); + setOrDefault(RESIZE_UP_ITERATION_INCREMENT, val); + } + public int getResizeDownIterationIncrement() { return getConfig(RESIZE_DOWN_ITERATION_INCREMENT); } + public void setResizeDownIterationIncrement(Integer val) { + if (LOG.isInfoEnabled()) LOG.info("{} changing resizeDownIterationIncrement from {} to {}", new Object[] {this, getResizeDownIterationIncrement(), val}); + setOrDefault(RESIZE_DOWN_ITERATION_INCREMENT, val); + } + public int getResizeUpIterationMax() { return getConfig(RESIZE_UP_ITERATION_MAX); } + public void setResizeUpIterationMax(Integer val) { + if (LOG.isInfoEnabled()) LOG.info("{} changing resizeUpIterationMax from {} to {}", new Object[] {this, getResizeUpIterationMax(), val}); + setOrDefault(RESIZE_UP_ITERATION_MAX, val); + } + public int getResizeDownIterationMax() { return getConfig(RESIZE_DOWN_ITERATION_MAX); } + public void setResizeDownIterationMax(Integer val) { + if (LOG.isInfoEnabled()) LOG.info("{} changing resizeDownIterationMax from {} to {}", new Object[] {this, getResizeDownIterationMax(), val}); + setOrDefault(RESIZE_DOWN_ITERATION_MAX, val); + } + /** * @deprecated since 0.7.0; use {@link #setMinPeriodBetweenExecs(Duration)} */ @@ -566,7 +648,9 @@ public class AutoScalerPolicy extends AbstractPolicy { // Rely on next metric-change to trigger recalculation; // and same for those below... } else if (key.equals(METRIC_UPPER_BOUND)) { - + // see above + } else if (key.equals(RESIZE_UP_ITERATION_INCREMENT) || key.equals(RESIZE_UP_ITERATION_MAX) || key.equals(RESIZE_DOWN_ITERATION_INCREMENT) || key.equals(RESIZE_DOWN_ITERATION_MAX)) { + // no special actions needed } else if (key.equals(MIN_POOL_SIZE)) { int newMin = (Integer) val; if (newMin > getConfig(MAX_POOL_SIZE)) { @@ -653,6 +737,29 @@ public class AutoScalerPolicy extends AbstractPolicy { }}); } } + + private enum ScalingType { HOT, COLD } + private static class ScalingData { + ScalingType scalingMode; + int currentSize; + double currentMetricValue; + Double metricUpperBound; + Double metricLowerBound; + + public double getCurrentTotalActivity() { + return currentMetricValue * currentSize; + } + + public boolean isHot() { + return ((scalingMode==null || scalingMode==ScalingType.HOT) && isValid(metricUpperBound) && currentMetricValue > metricUpperBound); + } + public boolean isCold() { + return ((scalingMode==null || scalingMode==ScalingType.COLD) && isValid(metricLowerBound) && currentMetricValue < metricLowerBound); + } + private boolean isValid(Double bound) { + return (bound!=null && bound>0); + } + } private void onMetricChanged(Number val) { if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-metric for {}: {}", new Object[] {this, poolEntity, val}); @@ -663,13 +770,38 @@ public class AutoScalerPolicy extends AbstractPolicy { return; } - double currentMetricD = val.doubleValue(); - double metricUpperBoundD = getMetricUpperBound().doubleValue(); - double metricLowerBoundD = getMetricLowerBound().doubleValue(); - int currentSize = getCurrentSizeOperator().apply(entity); - double currentTotalActivity = currentSize * currentMetricD; - int unboundedSize; - int desiredSize; + ScalingData data = new ScalingData(); + data.currentMetricValue = val.doubleValue(); + data.currentSize = getCurrentSizeOperator().apply(entity); + data.metricUpperBound = getMetricUpperBound().doubleValue(); + data.metricLowerBound = getMetricLowerBound().doubleValue(); + + analyze(data, "pool"); + } + + private void onPoolCold(Map<String, ?> properties) { + if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-cold for {}: {}", new Object[] {this, poolEntity, properties}); + analyzeOnHotOrColdSensor(ScalingType.COLD, "cold pool", properties); + } + + private void onPoolHot(Map<String, ?> properties) { + if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-hot for {}: {}", new Object[] {this, poolEntity, properties}); + analyzeOnHotOrColdSensor(ScalingType.HOT, "hot pool", properties); + } + + private void analyzeOnHotOrColdSensor(ScalingType scalingMode, String description, Map<String, ?> properties) { + ScalingData data = new ScalingData(); + data.scalingMode = scalingMode; + data.currentMetricValue = (Double) properties.get(POOL_CURRENT_WORKRATE_KEY); + data.currentSize = (Integer) properties.get(POOL_CURRENT_SIZE_KEY); + data.metricUpperBound = (Double) properties.get(POOL_HIGH_THRESHOLD_KEY); + data.metricLowerBound = (Double) properties.get(POOL_LOW_THRESHOLD_KEY); + + analyze(data, description); + } + + private void analyze(ScalingData data, String description) { + int desiredSizeUnconstrained; /* We always scale out (modulo stabilization delay) if: * currentTotalActivity > currentSize*metricUpperBound @@ -685,87 +817,86 @@ public class AutoScalerPolicy extends AbstractPolicy { *Â Â n*metricUpperBound >= currentTotalActivity * thus n := Math.max ( floor(currentTotalActiviy/metricLowerBound), ceil(currentTotal/metricUpperBound) ) */ - if (currentMetricD > metricUpperBoundD) { + if (data.isHot()) { // scale out - unboundedSize = (int)Math.ceil(currentTotalActivity/metricUpperBoundD); - desiredSize = toBoundedDesiredPoolSize(unboundedSize); - if (desiredSize > currentSize) { - if (LOG.isDebugEnabled()) LOG.debug("{} provisionally resizing out pool {} from {} to {} ({} > {})", new Object[] {this, poolEntity, currentSize, desiredSize, currentMetricD, metricUpperBoundD}); - scheduleResize(desiredSize); - } else { - if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} ({} > {} > {}, but scale-out blocked eg by bounds/check)", new Object[] {this, poolEntity, currentSize, currentMetricD, metricUpperBoundD, metricLowerBoundD}); - } - onNewUnboundedPoolSize(unboundedSize); + desiredSizeUnconstrained = (int)Math.ceil(data.getCurrentTotalActivity() / data.metricUpperBound); + data.scalingMode = ScalingType.HOT; - } else if (currentMetricD < metricLowerBoundD) { + } else if (data.isCold()) { // scale back - unboundedSize = (int)Math.floor(currentTotalActivity/metricLowerBoundD); - desiredSize = toBoundedDesiredPoolSize(unboundedSize); - if (desiredSize < currentTotalActivity/metricUpperBoundD) { - // this desired size would cause scale-out on next run, ie thrashing, so tweak - if (LOG.isTraceEnabled()) LOG.trace("{} resizing back pool {} from {}, tweaking from {} to prevent thrashing", new Object[] {this, poolEntity, currentSize, desiredSize }); - desiredSize = (int)Math.ceil(currentTotalActivity/metricUpperBoundD); - desiredSize = toBoundedDesiredPoolSize(desiredSize); + desiredSizeUnconstrained = (int)Math.floor(data.getCurrentTotalActivity() / data.metricLowerBound); + data.scalingMode = ScalingType.COLD; + + } else { + if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} ({} within range {}..{})", new Object[] {this, poolEntity, data.currentSize, data.currentMetricValue, data.metricLowerBound, data.metricUpperBound}); + abortResize(data.currentSize); + return; // within the healthy range; no-op + } + + if (LOG.isTraceEnabled()) LOG.debug("{} detected unconstrained desired size {}", new Object[] {this, desiredSizeUnconstrained}); + int desiredSize = applyMinMaxConstraints(desiredSizeUnconstrained); + + if ((data.scalingMode==ScalingType.COLD) && (desiredSize < data.currentSize)) { + + int delta = data.currentSize - desiredSize; + int scaleIncrement = getResizeDownIterationIncrement(); + int scaleMax = getResizeDownIterationMax(); + if (delta>scaleMax) { + delta=scaleMax; + } else if (delta % scaleIncrement != 0) { + // keep scaling to the increment + delta += scaleIncrement - (delta % scaleIncrement); } - if (desiredSize < currentSize) { - if (LOG.isDebugEnabled()) LOG.debug("{} provisionally resizing back pool {} from {} to {} ({} < {})", new Object[] {this, poolEntity, currentSize, desiredSize, currentMetricD, metricLowerBoundD}); - scheduleResize(desiredSize); - } else { - if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} ({} < {} < {}, but scale-back blocked eg by bounds/check)", new Object[] {this, poolEntity, currentSize, currentMetricD, metricLowerBoundD, metricUpperBoundD}); + desiredSize = data.currentSize - delta; + + if (data.metricUpperBound!=null) { + // if upper bound supplied, check that this desired scale-back size + // is not going to cause scale-out on next run; i.e. anti-thrashing + while (desiredSize < data.currentSize && data.getCurrentTotalActivity() > data.metricUpperBound * desiredSize) { + if (LOG.isTraceEnabled()) LOG.trace("{} when resizing back pool {} from {}, tweaking from {} to prevent thrashing", new Object[] {this, poolEntity, data.currentSize, desiredSize }); + desiredSize += scaleIncrement; + } } - onNewUnboundedPoolSize(unboundedSize); + desiredSize = applyMinMaxConstraints(desiredSize); + if (desiredSize >= data.currentSize) data.scalingMode = null; + } else if ((data.scalingMode==ScalingType.HOT) && (desiredSize > data.currentSize)) { + + int delta = desiredSize - data.currentSize; + int scaleIncrement = getResizeUpIterationIncrement(); + int scaleMax = getResizeUpIterationMax(); + if (delta>scaleMax) { + delta=scaleMax; + } else if (delta % scaleIncrement != 0) { + // keep scaling to the increment + delta += scaleIncrement - (delta % scaleIncrement); + } + desiredSize = data.currentSize + delta; + desiredSize = applyMinMaxConstraints(desiredSize); + if (desiredSize <= data.currentSize) data.scalingMode = null; + } else { - if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} ({} within range {}..{})", new Object[] {this, poolEntity, currentSize, currentMetricD, metricLowerBoundD, metricUpperBoundD}); - abortResize(currentSize); - return; // within a health range; no-op + data.scalingMode = null; } - } - private void onPoolCold(Map<String, ?> properties) { - if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-cold for {}: {}", new Object[] {this, poolEntity, properties}); - - int poolCurrentSize = (Integer) properties.get(POOL_CURRENT_SIZE_KEY); - double poolCurrentWorkrate = (Double) properties.get(POOL_CURRENT_WORKRATE_KEY); - double poolLowThreshold = (Double) properties.get(POOL_LOW_THRESHOLD_KEY); - - // Shrink the pool to force its low threshold to fall below the current workrate. - // NOTE: assumes the pool is homogeneous for now. - int unboundedPoolSize = (int) Math.ceil(poolCurrentWorkrate / (poolLowThreshold/poolCurrentSize)); - int desiredPoolSize = toBoundedDesiredPoolSize(unboundedPoolSize); - - if (desiredPoolSize < poolCurrentSize) { - if (LOG.isTraceEnabled()) LOG.trace("{} resizing cold pool {} from {} to {}", new Object[] {this, poolEntity, poolCurrentSize, desiredPoolSize}); - scheduleResize(desiredPoolSize); + if (data.scalingMode!=null) { + if (LOG.isDebugEnabled()) LOG.debug("{} provisionally resizing {} {} from {} to {} ({} < {}; ideal size {})", new Object[] {this, description, poolEntity, data.currentSize, desiredSize, data.currentMetricValue, data.metricLowerBound, desiredSizeUnconstrained}); + scheduleResize(desiredSize); } else { - if (LOG.isTraceEnabled()) LOG.trace("{} not resizing cold pool {} from {} to {}", new Object[] {this, poolEntity, poolCurrentSize, desiredPoolSize}); - abortResize(poolCurrentSize); + if (LOG.isTraceEnabled()) LOG.trace("{} not resizing {} {} from {} to {}, {} out of healthy range {}..{} but unconstrained size {} blocked by bounds/check", new Object[] {this, description, poolEntity, data.currentSize, desiredSize, data.currentMetricValue, data.metricLowerBound, data.metricUpperBound, desiredSizeUnconstrained}); + abortResize(data.currentSize); + // but add to the unbounded record for future consideration } - onNewUnboundedPoolSize(unboundedPoolSize); + onNewUnboundedPoolSize(desiredSizeUnconstrained); } - - private void onPoolHot(Map<String, ?> properties) { - if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-hot for {}: {}", new Object[] {this, poolEntity, properties}); - - int poolCurrentSize = (Integer) properties.get(POOL_CURRENT_SIZE_KEY); - double poolCurrentWorkrate = (Double) properties.get(POOL_CURRENT_WORKRATE_KEY); - double poolHighThreshold = (Double) properties.get(POOL_HIGH_THRESHOLD_KEY); - - // Grow the pool to force its high threshold to rise above the current workrate. - // FIXME: assumes the pool is homogeneous for now. - int unboundedPoolSize = (int) Math.ceil(poolCurrentWorkrate / (poolHighThreshold/poolCurrentSize)); - int desiredPoolSize = toBoundedDesiredPoolSize(unboundedPoolSize); - if (desiredPoolSize > poolCurrentSize) { - if (LOG.isTraceEnabled()) LOG.trace("{} resizing hot pool {} from {} to {}", new Object[] {this, poolEntity, poolCurrentSize, desiredPoolSize}); - scheduleResize(desiredPoolSize); - } else { - if (LOG.isTraceEnabled()) LOG.trace("{} not resizing hot pool {} from {} to {}", new Object[] {this, poolEntity, poolCurrentSize, desiredPoolSize}); - abortResize(poolCurrentSize); - } - onNewUnboundedPoolSize(unboundedPoolSize); + + private int applyMinMaxConstraints(int desiredSize) { + desiredSize = Math.max(getMinPoolSize(), desiredSize); + desiredSize = Math.min(getMaxPoolSize(), desiredSize); + return desiredSize; } - + private void onPoolOk(Map<String, ?> properties) { if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-ok for {}: {}", new Object[] {this, poolEntity, properties}); @@ -774,12 +905,6 @@ public class AutoScalerPolicy extends AbstractPolicy { if (LOG.isTraceEnabled()) LOG.trace("{} not resizing ok pool {} from {}", new Object[] {this, poolEntity, poolCurrentSize}); abortResize(poolCurrentSize); } - - private int toBoundedDesiredPoolSize(int size) { - int result = Math.max(getMinPoolSize(), size); - result = Math.min(getMaxPoolSize(), result); - return result; - } /** * Schedules a resize, if there is not already a resize operation queued up. When that resize @@ -796,7 +921,7 @@ public class AutoScalerPolicy extends AbstractPolicy { * If a listener is registered to be notified of the max-pool-size cap being reached, then record * what our unbounded size would be and schedule a check to see if this unbounded size is sustained. * - * Piggie backs off the existing scheduleResize execution, which now also checks if the listener + * Piggy-backs off the existing scheduleResize execution, which now also checks if the listener * needs to be called. */ private void onNewUnboundedPoolSize(final int val) { http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c1a5df7d/policy/src/main/java/brooklyn/policy/autoscaling/SizeHistory.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/autoscaling/SizeHistory.java b/policy/src/main/java/brooklyn/policy/autoscaling/SizeHistory.java index 5b633de..525ba20 100644 --- a/policy/src/main/java/brooklyn/policy/autoscaling/SizeHistory.java +++ b/policy/src/main/java/brooklyn/policy/autoscaling/SizeHistory.java @@ -119,7 +119,7 @@ public class SizeHistory { T valAsNum = val.getValue(); double valAsDouble = (valAsNum != null) ? valAsNum.doubleValue() : 0; if (result == null && val.getTimestamp() > epoch) { - result = (T) Integer.valueOf(Integer.MAX_VALUE); + result = withDefault(null, Integer.MAX_VALUE); resultAsDouble = result.doubleValue(); } if (result == null || (valAsNum != null && valAsDouble > resultAsDouble)) { @@ -127,7 +127,7 @@ public class SizeHistory { resultAsDouble = valAsDouble; } } - return (T) (result != null ? result : Integer.MAX_VALUE); + return withDefault(result, Integer.MAX_VALUE); } /** @@ -142,7 +142,7 @@ public class SizeHistory { T valAsNum = val.getValue(); double valAsDouble = (valAsNum != null) ? valAsNum.doubleValue() : 0; if (result == null && val.getTimestamp() > epoch) { - result = (T) Integer.valueOf(Integer.MIN_VALUE); + result = withDefault(null, Integer.MIN_VALUE); resultAsDouble = result.doubleValue(); } if (result == null || (val.getValue() != null && valAsDouble < resultAsDouble)) { @@ -150,9 +150,13 @@ public class SizeHistory { resultAsDouble = valAsDouble; } } - return (T) (result != null ? result : Integer.MIN_VALUE); + return withDefault(result, Integer.MIN_VALUE); } + @SuppressWarnings("unchecked") + private <T> T withDefault(T result, Integer defaultValue) { + return result!=null ? result : (T) defaultValue; + } /** * @return null if empty, or the most recent value */ http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c1a5df7d/policy/src/test/java/brooklyn/policy/autoscaling/AutoScalerPolicyTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/brooklyn/policy/autoscaling/AutoScalerPolicyTest.java b/policy/src/test/java/brooklyn/policy/autoscaling/AutoScalerPolicyTest.java index 14f2f6f..1885e9a 100644 --- a/policy/src/test/java/brooklyn/policy/autoscaling/AutoScalerPolicyTest.java +++ b/policy/src/test/java/brooklyn/policy/autoscaling/AutoScalerPolicyTest.java @@ -27,10 +27,13 @@ import java.lang.management.MemoryUsage; import java.lang.management.OperatingSystemMXBean; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -41,9 +44,11 @@ import brooklyn.entity.basic.Entities; import brooklyn.entity.proxying.EntitySpec; import brooklyn.entity.trait.Resizable; import brooklyn.event.basic.BasicNotificationSensor; +import brooklyn.policy.PolicySpec; import brooklyn.test.Asserts; import brooklyn.test.entity.TestApplication; import brooklyn.test.entity.TestCluster; +import brooklyn.util.collections.MutableList; import brooklyn.util.collections.MutableMap; import brooklyn.util.time.Duration; @@ -53,24 +58,38 @@ import com.google.common.collect.ImmutableMap; public class AutoScalerPolicyTest { + private static final Logger log = LoggerFactory.getLogger(AutoScalerPolicyTest.class); + private static long TIMEOUT_MS = 10*1000; private static long SHORT_WAIT_MS = 250; private static long OVERHEAD_DURATION_MS = 500; private static long EARLY_RETURN_MS = 10; + + private static final int MANY_TIMES_INVOCATION_COUNT = 10; AutoScalerPolicy policy; TestCluster cluster; LocallyResizableEntity resizable; TestApplication app; + List<Integer> policyResizes = MutableList.of(); @BeforeMethod(alwaysRun=true) public void setUp() throws Exception { + log.info("resetting "+getClass().getSimpleName()); app = TestApplication.Factory.newManagedInstanceForTests(); cluster = app.createAndManageChild(EntitySpec.create(TestCluster.class).configure(TestCluster.INITIAL_SIZE, 1)); resizable = new LocallyResizableEntity(cluster, cluster); Entities.manage(resizable); - policy = new AutoScalerPolicy(); - resizable.addPolicy(policy); + PolicySpec<AutoScalerPolicy> policySpec = PolicySpec.create(AutoScalerPolicy.class).configure(AutoScalerPolicy.RESIZE_OPERATOR, new ResizeOperator() { + @Override + public Integer resize(Entity entity, Integer desiredSize) { + log.info("resizing to "+desiredSize); + policyResizes.add(desiredSize); + return ((Resizable)entity).resize(desiredSize); + } + }); + policy = resizable.addPolicy(policySpec); + policyResizes.clear(); } @AfterMethod(alwaysRun=true) @@ -82,19 +101,35 @@ public class AutoScalerPolicyTest { policy = null; } + public void assertSizeEventually(Integer targetSize) { + Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, targetSize)); + assertEquals(policyResizes.get(policyResizes.size()-1), targetSize); + } + @Test public void testShrinkColdPool() throws Exception { resizable.resize(4); - resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(4, 30L, 4*10L, 4*20L)); + // all metrics as per-node here + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(30d/4, 10, 20)); + + // expect pool to shrink to 3 (i.e. maximum to have >= 10 per container) + assertSizeEventually(3); + } + + @Test + public void testShrinkColdPoolTotals() throws Exception { + resizable.resize(4); + // all metrics as totals here + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(30L, 4*10L, 4*20L)); - // expect pool to shrink to 3 (i.e. maximum to have >= 40 per container) - Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, 3)); + // expect pool to shrink to 3 (i.e. maximum to have >= 10 per container) + assertSizeEventually(3); } @Test public void testShrinkColdPoolRoundsUpDesiredNumberOfContainers() throws Exception { resizable.resize(4); - resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(4, 1L, 4*10L, 4*20L)); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(1L, 4*10L, 4*20L)); Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, 1)); } @@ -102,10 +137,53 @@ public class AutoScalerPolicyTest { @Test public void testGrowHotPool() throws Exception { resizable.resize(2); - resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(2, 41L, 2*10L, 2*20L)); + // all metrics as per-node here + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(21L, 10L, 20L)); + + // expect pool to grow to 3 (i.e. minimum to have <= 20 per container) + assertSizeEventually(3); + } + + @Test + public void testGrowHotPoolTotals() throws Exception { + resizable.resize(2); + // all metrics as totals here + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(41L, 2*10L, 2*20L)); - // expect pool to grow to 3 (i.e. minimum to have <= 80 per container) - Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, 3)); + // expect pool to grow to 3 (i.e. minimum to have <= 20 per container) + assertSizeEventually(3); + } + + @Test + public void testGrowShrinkRespectsResizeIterationIncrementAndResizeIterationMax() throws Exception { + resizable.resize(2); + policy.config().set(AutoScalerPolicy.RESIZE_UP_ITERATION_INCREMENT, 2); + policy.config().set(AutoScalerPolicy.RESIZE_UP_ITERATION_MAX, 4); + policy.config().set(AutoScalerPolicy.RESIZE_DOWN_ITERATION_INCREMENT, 3); + policy.config().set(AutoScalerPolicy.RESIZE_DOWN_ITERATION_MAX, 3); + + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(42/2, 10, 20)); + // expect pool to grow to 4 (i.e. to have <= 20 per container we need 3, but increment is 2) + assertSizeEventually(4); + + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(200/4, 10, 20)); + // a single hot message can only make it go to 8 + assertSizeEventually(8); + assertEquals(policyResizes, MutableList.of(4, 8)); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(200/8, 10, 20)); + assertSizeEventually(10); + assertEquals(policyResizes, MutableList.of(4, 8, 10)); + + // now shrink + policyResizes.clear(); + policy.config().set(AutoScalerPolicy.MIN_POOL_SIZE, 2); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(1, 10, 20)); + assertSizeEventually(7); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(1, 10, 20)); + assertSizeEventually(4); + resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(1, 10, 20)); + assertSizeEventually(2); + assertEquals(policyResizes, MutableList.of(7, 4, 2)); } @Test @@ -262,8 +340,11 @@ public class AutoScalerPolicyTest { public void testUsesCustomSensorOverride() throws Exception { resizable.removePolicy(policy); + @SuppressWarnings("rawtypes") BasicNotificationSensor<Map> customPoolHotSensor = new BasicNotificationSensor<Map>(Map.class, "custom.hot", ""); + @SuppressWarnings("rawtypes") BasicNotificationSensor<Map> customPoolColdSensor = new BasicNotificationSensor<Map>(Map.class, "custom.cold", ""); + @SuppressWarnings("rawtypes") BasicNotificationSensor<Map> customPoolOkSensor = new BasicNotificationSensor<Map>(Map.class, "custom.ok", ""); policy = AutoScalerPolicy.builder() .poolHotSensor(customPoolHotSensor) @@ -304,15 +385,16 @@ public class AutoScalerPolicyTest { }}); } - // FIXME decreased invocationCount from 100, because was failing in jenkins occassionally. + // FIXME failing in jenkins occassionally - have put it in the "Acceptance" group for now + // // Error was things like it taking a couple of seconds too long to scale-up. This is *not* // just caused by a slow GC (running with -verbose:gc shows during a failure several // incremental GCs that usually don't amount to more than 0.2 of a second at most, often less). // Doing a thread-dump etc immediately after the too-long delay shows no strange thread usage, // and shows releng3 system load averages of numbers like 1.73, 2.87 and 1.22. // - // Have put it in the "Acceptance" group for now. - @Test(groups={"Integration", "Acceptance"}, invocationCount=100) + // Is healthy on normal machines. + @Test(groups={"Integration", "Acceptance"}, invocationCount=MANY_TIMES_INVOCATION_COUNT) public void testRepeatedResizeUpStabilizationDelayTakesMaxSustainedDesired() throws Throwable { try { testResizeUpStabilizationDelayTakesMaxSustainedDesired(); @@ -429,9 +511,8 @@ public class AutoScalerPolicyTest { }}); } - // FIXME decreased invocationCount from 100; see comment against testRepeatedResizeUpStabilizationDelayTakesMaxSustainedDesired - // Have put it in the "Acceptance" group for now. - @Test(groups={"Integration", "Acceptance"}, invocationCount=100) + // FIXME Acceptance -- see comment against testRepeatedResizeUpStabilizationDelayTakesMaxSustainedDesired + @Test(groups={"Integration", "Acceptance"}, invocationCount=MANY_TIMES_INVOCATION_COUNT) public void testRepeatedResizeDownStabilizationDelayTakesMinSustainedDesired() throws Throwable { try { testResizeDownStabilizationDelayTakesMinSustainedDesired(); @@ -525,6 +606,9 @@ public class AutoScalerPolicyTest { assertTrue(resizeDelay >= (resizeDownStabilizationDelay-EARLY_RETURN_MS), "resizeDelay="+resizeDelay); } + Map<String, Object> message(double currentWorkrate, double lowThreshold, double highThreshold) { + return message(resizable.getCurrentSize(), currentWorkrate, lowThreshold, highThreshold); + } static Map<String, Object> message(int currentSize, double currentWorkrate, double lowThreshold, double highThreshold) { return ImmutableMap.<String,Object>of( AutoScalerPolicy.POOL_CURRENT_SIZE_KEY, currentSize, http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c1a5df7d/policy/src/test/java/brooklyn/policy/autoscaling/LocallyResizableEntity.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/brooklyn/policy/autoscaling/LocallyResizableEntity.java b/policy/src/test/java/brooklyn/policy/autoscaling/LocallyResizableEntity.java index 6dea88b..05c6e5b 100644 --- a/policy/src/test/java/brooklyn/policy/autoscaling/LocallyResizableEntity.java +++ b/policy/src/test/java/brooklyn/policy/autoscaling/LocallyResizableEntity.java @@ -41,6 +41,7 @@ public class LocallyResizableEntity extends AbstractEntity implements Resizable public LocallyResizableEntity (TestCluster tc) { this(null, tc); } + @SuppressWarnings("deprecation") public LocallyResizableEntity (Entity parent, TestCluster tc) { super(parent); this.cluster = tc;
