http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java new file mode 100644 index 0000000..98d814f --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java @@ -0,0 +1,1092 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.autoscaling; + +import static brooklyn.util.JavaGroovyEquivalents.groovyTruth; +import static com.google.common.base.Preconditions.checkNotNull; +import groovy.lang.Closure; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.brooklyn.api.catalog.Catalog; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.basic.EntityLocal; +import org.apache.brooklyn.api.event.AttributeSensor; +import org.apache.brooklyn.api.event.Sensor; +import org.apache.brooklyn.api.event.SensorEvent; +import org.apache.brooklyn.api.event.SensorEventListener; +import org.apache.brooklyn.api.policy.PolicySpec; +import org.apache.brooklyn.core.policy.basic.AbstractPolicy; +import org.apache.brooklyn.core.util.flags.SetFromFlag; +import org.apache.brooklyn.core.util.flags.TypeCoercions; +import org.apache.brooklyn.core.util.task.Tasks; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.BrooklynTaskTags; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.trait.Resizable; +import brooklyn.entity.trait.Startable; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.event.basic.BasicNotificationSensor; + +import org.apache.brooklyn.policy.autoscaling.SizeHistory.WindowSummary; +import org.apache.brooklyn.policy.loadbalancing.LoadBalancingPolicy; + +import brooklyn.util.collections.MutableMap; +import brooklyn.util.time.Duration; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.reflect.TypeToken; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + + +/** + * Policy that is attached to a {@link Resizable} entity and dynamically adjusts its size in response to + * emitted {@code POOL_COLD} and {@code POOL_HOT} events. Alternatively, the policy can be configured to + * keep a given metric within a required range. + * <p> + * TThis policy does not itself determine whether the pool is hot or cold, but instead relies on these + * events being emitted by the monitored entity itself, or by another policy that is attached to it; see, + * for example, {@link LoadBalancingPolicy}.) + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +@Catalog(name="Auto-scaler", description="Policy that is attached to a Resizable entity and dynamically " + + "adjusts its size in response to either keep a metric within a given range, or in response to " + + "POOL_COLD and POOL_HOT events") +public class AutoScalerPolicy extends AbstractPolicy { + + private static final Logger LOG = LoggerFactory.getLogger(AutoScalerPolicy.class); + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String id; + private String name; + private AttributeSensor<? extends Number> metric; + private Entity entityWithMetric; + private Number metricUpperBound; + private Number metricLowerBound; + private int minPoolSize = 1; + private int maxPoolSize = Integer.MAX_VALUE; + private Integer resizeDownIterationIncrement; + private Integer resizeDownIterationMax; + private Integer resizeUpIterationIncrement; + private Integer resizeUpIterationMax; + private Duration minPeriodBetweenExecs; + private Duration resizeUpStabilizationDelay; + private Duration resizeDownStabilizationDelay; + private ResizeOperator resizeOperator; + private Function<Entity,Integer> currentSizeOperator; + private BasicNotificationSensor<?> poolHotSensor; + private BasicNotificationSensor<?> poolColdSensor; + private BasicNotificationSensor<?> poolOkSensor; + private BasicNotificationSensor<? super MaxPoolSizeReachedEvent> maxSizeReachedSensor; + private Duration maxReachedNotificationDelay; + + public Builder id(String val) { + this.id = val; return this; + } + public Builder name(String val) { + this.name = val; return this; + } + public Builder metric(AttributeSensor<? extends Number> val) { + this.metric = val; return this; + } + public Builder entityWithMetric(Entity val) { + this.entityWithMetric = val; return this; + } + public Builder metricLowerBound(Number val) { + this.metricLowerBound = val; return this; + } + public Builder metricUpperBound(Number val) { + this.metricUpperBound = val; return this; + } + public Builder metricRange(Number min, Number max) { + metricLowerBound = checkNotNull(min); + metricUpperBound = checkNotNull(max); + return this; + } + public Builder minPoolSize(int val) { + this.minPoolSize = val; return this; + } + public Builder maxPoolSize(int val) { + this.maxPoolSize = val; return this; + } + public Builder sizeRange(int min, int max) { + minPoolSize = min; + maxPoolSize = max; + return this; + } + + public Builder resizeUpIterationIncrement(Integer val) { + this.resizeUpIterationIncrement = val; return this; + } + public Builder resizeUpIterationMax(Integer val) { + this.resizeUpIterationMax = val; return this; + } + public Builder resizeDownIterationIncrement(Integer val) { + this.resizeUpIterationIncrement = val; return this; + } + public Builder resizeDownIterationMax(Integer val) { + this.resizeUpIterationMax = val; return this; + } + + public Builder minPeriodBetweenExecs(Duration val) { + this.minPeriodBetweenExecs = val; return this; + } + public Builder resizeUpStabilizationDelay(Duration val) { + this.resizeUpStabilizationDelay = val; return this; + } + public Builder resizeDownStabilizationDelay(Duration val) { + this.resizeDownStabilizationDelay = val; return this; + } + public Builder resizeOperator(ResizeOperator val) { + this.resizeOperator = val; return this; + } + public Builder currentSizeOperator(Function<Entity, Integer> val) { + this.currentSizeOperator = val; return this; + } + public Builder poolHotSensor(BasicNotificationSensor<?> val) { + this.poolHotSensor = val; return this; + } + public Builder poolColdSensor(BasicNotificationSensor<?> val) { + this.poolColdSensor = val; return this; + } + public Builder poolOkSensor(BasicNotificationSensor<?> val) { + this.poolOkSensor = val; return this; + } + public Builder maxSizeReachedSensor(BasicNotificationSensor<? super MaxPoolSizeReachedEvent> val) { + this.maxSizeReachedSensor = val; return this; + } + public Builder maxReachedNotificationDelay(Duration val) { + this.maxReachedNotificationDelay = val; return this; + } + public AutoScalerPolicy build() { + return new AutoScalerPolicy(toFlags()); + } + public PolicySpec<AutoScalerPolicy> buildSpec() { + return PolicySpec.create(AutoScalerPolicy.class) + .configure(toFlags()); + } + private Map<String,?> toFlags() { + return MutableMap.<String,Object>builder() + .putIfNotNull("id", id) + .putIfNotNull("name", name) + .putIfNotNull("metric", metric) + .putIfNotNull("entityWithMetric", entityWithMetric) + .putIfNotNull("metricUpperBound", metricUpperBound) + .putIfNotNull("metricLowerBound", metricLowerBound) + .putIfNotNull("minPoolSize", minPoolSize) + .putIfNotNull("maxPoolSize", maxPoolSize) + .putIfNotNull("resizeUpIterationMax", resizeUpIterationMax) + .putIfNotNull("resizeUpIterationIncrement", resizeUpIterationIncrement) + .putIfNotNull("resizeDownIterationMax", resizeDownIterationMax) + .putIfNotNull("resizeDownIterationIncrement", resizeDownIterationIncrement) + .putIfNotNull("minPeriodBetweenExecs", minPeriodBetweenExecs) + .putIfNotNull("resizeUpStabilizationDelay", resizeUpStabilizationDelay) + .putIfNotNull("resizeDownStabilizationDelay", resizeDownStabilizationDelay) + .putIfNotNull("resizeOperator", resizeOperator) + .putIfNotNull("currentSizeOperator", currentSizeOperator) + .putIfNotNull("poolHotSensor", poolHotSensor) + .putIfNotNull("poolColdSensor", poolColdSensor) + .putIfNotNull("poolOkSensor", poolOkSensor) + .putIfNotNull("maxSizeReachedSensor", maxSizeReachedSensor) + .putIfNotNull("maxReachedNotificationDelay", maxReachedNotificationDelay) + .build(); + } + } + + // TODO Is there a nicer pattern for registering such type-coercions? + // Can't put it in the ResizeOperator interface, nor in core TypeCoercions class because interface is defined in policy/. + static { + TypeCoercions.registerAdapter(Closure.class, ResizeOperator.class, new Function<Closure,ResizeOperator>() { + @Override + public ResizeOperator apply(final Closure closure) { + return new ResizeOperator() { + @Override public Integer resize(Entity entity, Integer input) { + return (Integer) closure.call(entity, input); + } + }; + } + }); + } + + // Pool workrate notifications. + public static BasicNotificationSensor<Map> DEFAULT_POOL_HOT_SENSOR = new BasicNotificationSensor<Map>( + Map.class, "resizablepool.hot", "Pool is over-utilized; it has insufficient resource for current workload"); + public static BasicNotificationSensor<Map> DEFAULT_POOL_COLD_SENSOR = new BasicNotificationSensor<Map>( + Map.class, "resizablepool.cold", "Pool is under-utilized; it has too much resource for current workload"); + public static BasicNotificationSensor<Map> DEFAULT_POOL_OK_SENSOR = new BasicNotificationSensor<Map>( + Map.class, "resizablepool.cold", "Pool utilization is ok; the available resources are fine for the current workload"); + + /** + * A convenience for policies that want to register a {@code builder.maxSizeReachedSensor(sensor)}. + * Note that this "default" is not set automatically; the default is for no sensor to be used (so + * no events emitted). + */ + public static BasicNotificationSensor<MaxPoolSizeReachedEvent> DEFAULT_MAX_SIZE_REACHED_SENSOR = new BasicNotificationSensor<MaxPoolSizeReachedEvent>( + MaxPoolSizeReachedEvent.class, "resizablepool.maxSizeReached", "Consistently wanted to resize the pool above the max allowed size"); + + public static final String POOL_CURRENT_SIZE_KEY = "pool.current.size"; + public static final String POOL_HIGH_THRESHOLD_KEY = "pool.high.threshold"; + public static final String POOL_LOW_THRESHOLD_KEY = "pool.low.threshold"; + public static final String POOL_CURRENT_WORKRATE_KEY = "pool.current.workrate"; + + @SuppressWarnings("serial") + @SetFromFlag("metric") + public static final ConfigKey<AttributeSensor<? extends Number>> METRIC = BasicConfigKey.builder(new TypeToken<AttributeSensor<? extends Number>>() {}) + .name("autoscaler.metric") + .build(); + + @SetFromFlag("entityWithMetric") + public static final ConfigKey<Entity> ENTITY_WITH_METRIC = BasicConfigKey.builder(Entity.class) + .name("autoscaler.entityWithMetric") + .build(); + + @SetFromFlag("metricLowerBound") + public static final ConfigKey<Number> METRIC_LOWER_BOUND = BasicConfigKey.builder(Number.class) + .name("autoscaler.metricLowerBound") + .reconfigurable(true) + .build(); + + @SetFromFlag("metricUpperBound") + public static final ConfigKey<Number> METRIC_UPPER_BOUND = BasicConfigKey.builder(Number.class) + .name("autoscaler.metricUpperBound") + .reconfigurable(true) + .build(); + + @SetFromFlag("resizeUpIterationIncrement") + public static final ConfigKey<Integer> RESIZE_UP_ITERATION_INCREMENT = BasicConfigKey.builder(Integer.class) + .name("autoscaler.resizeUpIterationIncrement") + .description("Batch size for resizing up; the size will be increased by a multiple of this value") + .defaultValue(1) + .reconfigurable(true) + .build(); + @SetFromFlag("resizeUpIterationMax") + public static final ConfigKey<Integer> RESIZE_UP_ITERATION_MAX = BasicConfigKey.builder(Integer.class) + .name("autoscaler.resizeUpIterationMax") + .defaultValue(Integer.MAX_VALUE) + .description("Maximum change to the size on a single iteration when scaling up") + .reconfigurable(true) + .build(); + @SetFromFlag("resizeDownIterationIncrement") + public static final ConfigKey<Integer> RESIZE_DOWN_ITERATION_INCREMENT = BasicConfigKey.builder(Integer.class) + .name("autoscaler.resizeDownIterationIncrement") + .description("Batch size for resizing down; the size will be decreased by a multiple of this value") + .defaultValue(1) + .reconfigurable(true) + .build(); + @SetFromFlag("resizeDownIterationMax") + public static final ConfigKey<Integer> RESIZE_DOWN_ITERATION_MAX = BasicConfigKey.builder(Integer.class) + .name("autoscaler.resizeDownIterationMax") + .defaultValue(Integer.MAX_VALUE) + .description("Maximum change to the size on a single iteration when scaling down") + .reconfigurable(true) + .build(); + + @SetFromFlag("minPeriodBetweenExecs") + public static final ConfigKey<Duration> MIN_PERIOD_BETWEEN_EXECS = BasicConfigKey.builder(Duration.class) + .name("autoscaler.minPeriodBetweenExecs") + .defaultValue(Duration.millis(100)) + .build(); + + @SetFromFlag("resizeUpStabilizationDelay") + public static final ConfigKey<Duration> RESIZE_UP_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class) + .name("autoscaler.resizeUpStabilizationDelay") + .defaultValue(Duration.ZERO) + .reconfigurable(true) + .build(); + + @SetFromFlag("resizeDownStabilizationDelay") + public static final ConfigKey<Duration> RESIZE_DOWN_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class) + .name("autoscaler.resizeDownStabilizationDelay") + .defaultValue(Duration.ZERO) + .reconfigurable(true) + .build(); + + @SetFromFlag("minPoolSize") + public static final ConfigKey<Integer> MIN_POOL_SIZE = BasicConfigKey.builder(Integer.class) + .name("autoscaler.minPoolSize") + .defaultValue(1) + .reconfigurable(true) + .build(); + + @SetFromFlag("maxPoolSize") + public static final ConfigKey<Integer> MAX_POOL_SIZE = BasicConfigKey.builder(Integer.class) + .name("autoscaler.maxPoolSize") + .defaultValue(Integer.MAX_VALUE) + .reconfigurable(true) + .build(); + + @SetFromFlag("resizeOperator") + public static final ConfigKey<ResizeOperator> RESIZE_OPERATOR = BasicConfigKey.builder(ResizeOperator.class) + .name("autoscaler.resizeOperator") + .defaultValue(new ResizeOperator() { + public Integer resize(Entity entity, Integer desiredSize) { + return ((Resizable)entity).resize(desiredSize); + }}) + .build(); + + @SuppressWarnings("serial") + @SetFromFlag("currentSizeOperator") + public static final ConfigKey<Function<Entity,Integer>> CURRENT_SIZE_OPERATOR = BasicConfigKey.builder(new TypeToken<Function<Entity,Integer>>() {}) + .name("autoscaler.currentSizeOperator") + .defaultValue(new Function<Entity,Integer>() { + public Integer apply(Entity entity) { + return ((Resizable)entity).getCurrentSize(); + }}) + .build(); + + @SuppressWarnings("serial") + @SetFromFlag("poolHotSensor") + public static final ConfigKey<BasicNotificationSensor<? extends Map>> POOL_HOT_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? extends Map>>() {}) + .name("autoscaler.poolHotSensor") + .defaultValue(DEFAULT_POOL_HOT_SENSOR) + .build(); + + @SuppressWarnings("serial") + @SetFromFlag("poolColdSensor") + public static final ConfigKey<BasicNotificationSensor<? extends Map>> POOL_COLD_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? extends Map>>() {}) + .name("autoscaler.poolColdSensor") + .defaultValue(DEFAULT_POOL_COLD_SENSOR) + .build(); + + @SuppressWarnings("serial") + @SetFromFlag("poolOkSensor") + public static final ConfigKey<BasicNotificationSensor<? extends Map>> POOL_OK_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? extends Map>>() {}) + .name("autoscaler.poolOkSensor") + .defaultValue(DEFAULT_POOL_OK_SENSOR) + .build(); + + @SuppressWarnings("serial") + @SetFromFlag("maxSizeReachedSensor") + public static final ConfigKey<BasicNotificationSensor<? super MaxPoolSizeReachedEvent>> MAX_SIZE_REACHED_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? super MaxPoolSizeReachedEvent>>() {}) + .name("autoscaler.maxSizeReachedSensor") + .description("Sensor for which a notification will be emitted (on the associated entity) when " + + "we consistently wanted to resize the pool above the max allowed size, for " + + "maxReachedNotificationDelay milliseconds") + .build(); + + @SetFromFlag("maxReachedNotificationDelay") + public static final ConfigKey<Duration> MAX_REACHED_NOTIFICATION_DELAY = BasicConfigKey.builder(Duration.class) + .name("autoscaler.maxReachedNotificationDelay") + .description("Time that we consistently wanted to go above the maxPoolSize for, after which the " + + "maxSizeReachedSensor (if any) will be emitted") + .defaultValue(Duration.ZERO) + .build(); + + private Entity poolEntity; + + private final AtomicBoolean executorQueued = new AtomicBoolean(false); + private volatile long executorTime = 0; + private volatile ScheduledExecutorService executor; + + private SizeHistory recentUnboundedResizes; + + private SizeHistory recentDesiredResizes; + + private long maxReachedLastNotifiedTime; + + private final SensorEventListener<Map> utilizationEventHandler = new SensorEventListener<Map>() { + public void onEvent(SensorEvent<Map> event) { + Map<String, ?> properties = (Map<String, ?>) event.getValue(); + Sensor<?> sensor = event.getSensor(); + + if (sensor.equals(getPoolColdSensor())) { + onPoolCold(properties); + } else if (sensor.equals(getPoolHotSensor())) { + onPoolHot(properties); + } else if (sensor.equals(getPoolOkSensor())) { + onPoolOk(properties); + } else { + throw new IllegalStateException("Unexpected sensor type: "+sensor+"; event="+event); + } + } + }; + + private final SensorEventListener<Number> metricEventHandler = new SensorEventListener<Number>() { + public void onEvent(SensorEvent<Number> event) { + assert event.getSensor().equals(getMetric()); + onMetricChanged(event.getValue()); + } + }; + + public AutoScalerPolicy() { + this(MutableMap.<String,Object>of()); + } + + public AutoScalerPolicy(Map<String,?> props) { + super(props); + } + + @Override + public void init() { + doInit(); + } + + @Override + public void rebind() { + doInit(); + } + + protected void doInit() { + long maxReachedNotificationDelay = getMaxReachedNotificationDelay().toMilliseconds(); + recentUnboundedResizes = new SizeHistory(maxReachedNotificationDelay); + + long maxResizeStabilizationDelay = Math.max(getResizeUpStabilizationDelay().toMilliseconds(), getResizeDownStabilizationDelay().toMilliseconds()); + recentDesiredResizes = new SizeHistory(maxResizeStabilizationDelay); + + // TODO Should re-use the execution manager's thread pool, somehow + executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory()); + } + + public void setMetricLowerBound(Number val) { + if (LOG.isInfoEnabled()) LOG.info("{} changing metricLowerBound from {} to {}", new Object[] {this, getMetricLowerBound(), val}); + config().set(METRIC_LOWER_BOUND, checkNotNull(val)); + } + + public void setMetricUpperBound(Number val) { + if (LOG.isInfoEnabled()) LOG.info("{} changing metricUpperBound from {} to {}", new Object[] {this, getMetricUpperBound(), val}); + config().set(METRIC_UPPER_BOUND, checkNotNull(val)); + } + + private <T> void setOrDefault(ConfigKey<T> key, T val) { + if (val==null) val = key.getDefaultValue(); + config().set(key, val); + } + public int getResizeUpIterationIncrement() { return getConfig(RESIZE_UP_ITERATION_INCREMENT); } + public void setResizeUpIterationIncrement(Integer val) { + if (LOG.isInfoEnabled()) LOG.info("{} changing resizeUpIterationIncrement from {} to {}", new Object[] {this, getResizeUpIterationIncrement(), val}); + setOrDefault(RESIZE_UP_ITERATION_INCREMENT, val); + } + public int getResizeDownIterationIncrement() { return getConfig(RESIZE_DOWN_ITERATION_INCREMENT); } + public void setResizeDownIterationIncrement(Integer val) { + if (LOG.isInfoEnabled()) LOG.info("{} changing resizeDownIterationIncrement from {} to {}", new Object[] {this, getResizeDownIterationIncrement(), val}); + setOrDefault(RESIZE_DOWN_ITERATION_INCREMENT, val); + } + public int getResizeUpIterationMax() { return getConfig(RESIZE_UP_ITERATION_MAX); } + public void setResizeUpIterationMax(Integer val) { + if (LOG.isInfoEnabled()) LOG.info("{} changing resizeUpIterationMax from {} to {}", new Object[] {this, getResizeUpIterationMax(), val}); + setOrDefault(RESIZE_UP_ITERATION_MAX, val); + } + public int getResizeDownIterationMax() { return getConfig(RESIZE_DOWN_ITERATION_MAX); } + public void setResizeDownIterationMax(Integer val) { + if (LOG.isInfoEnabled()) LOG.info("{} changing resizeDownIterationMax from {} to {}", new Object[] {this, getResizeDownIterationMax(), val}); + setOrDefault(RESIZE_DOWN_ITERATION_MAX, val); + } + + public void setMinPeriodBetweenExecs(Duration val) { + if (LOG.isInfoEnabled()) LOG.info("{} changing minPeriodBetweenExecs from {} to {}", new Object[] {this, getMinPeriodBetweenExecs(), val}); + config().set(MIN_PERIOD_BETWEEN_EXECS, val); + } + + public void setResizeUpStabilizationDelay(Duration val) { + if (LOG.isInfoEnabled()) LOG.info("{} changing resizeUpStabilizationDelay from {} to {}", new Object[] {this, getResizeUpStabilizationDelay(), val}); + config().set(RESIZE_UP_STABILIZATION_DELAY, val); + } + + public void setResizeDownStabilizationDelay(Duration val) { + if (LOG.isInfoEnabled()) LOG.info("{} changing resizeDownStabilizationDelay from {} to {}", new Object[] {this, getResizeDownStabilizationDelay(), val}); + config().set(RESIZE_DOWN_STABILIZATION_DELAY, val); + } + + public void setMinPoolSize(int val) { + if (LOG.isInfoEnabled()) LOG.info("{} changing minPoolSize from {} to {}", new Object[] {this, getMinPoolSize(), val}); + config().set(MIN_POOL_SIZE, val); + } + + public void setMaxPoolSize(int val) { + if (LOG.isInfoEnabled()) LOG.info("{} changing maxPoolSize from {} to {}", new Object[] {this, getMaxPoolSize(), val}); + config().set(MAX_POOL_SIZE, val); + } + + private AttributeSensor<? extends Number> getMetric() { + return getConfig(METRIC); + } + + private Entity getEntityWithMetric() { + return getConfig(ENTITY_WITH_METRIC); + } + + private Number getMetricLowerBound() { + return getConfig(METRIC_LOWER_BOUND); + } + + private Number getMetricUpperBound() { + return getConfig(METRIC_UPPER_BOUND); + } + + private Duration getMinPeriodBetweenExecs() { + return getConfig(MIN_PERIOD_BETWEEN_EXECS); + } + + private Duration getResizeUpStabilizationDelay() { + return getConfig(RESIZE_UP_STABILIZATION_DELAY); + } + + private Duration getResizeDownStabilizationDelay() { + return getConfig(RESIZE_DOWN_STABILIZATION_DELAY); + } + + private int getMinPoolSize() { + return getConfig(MIN_POOL_SIZE); + } + + private int getMaxPoolSize() { + return getConfig(MAX_POOL_SIZE); + } + + private ResizeOperator getResizeOperator() { + return getConfig(RESIZE_OPERATOR); + } + + private Function<Entity,Integer> getCurrentSizeOperator() { + return getConfig(CURRENT_SIZE_OPERATOR); + } + + private BasicNotificationSensor<? extends Map> getPoolHotSensor() { + return getConfig(POOL_HOT_SENSOR); + } + + private BasicNotificationSensor<? extends Map> getPoolColdSensor() { + return getConfig(POOL_COLD_SENSOR); + } + + private BasicNotificationSensor<? extends Map> getPoolOkSensor() { + return getConfig(POOL_OK_SENSOR); + } + + private BasicNotificationSensor<? super MaxPoolSizeReachedEvent> getMaxSizeReachedSensor() { + return getConfig(MAX_SIZE_REACHED_SENSOR); + } + + private Duration getMaxReachedNotificationDelay() { + return getConfig(MAX_REACHED_NOTIFICATION_DELAY); + } + + @Override + protected <T> void doReconfigureConfig(ConfigKey<T> key, T val) { + if (key.equals(RESIZE_UP_STABILIZATION_DELAY)) { + Duration maxResizeStabilizationDelay = Duration.max((Duration)val, getResizeDownStabilizationDelay()); + recentDesiredResizes.setWindowSize(maxResizeStabilizationDelay); + } else if (key.equals(RESIZE_DOWN_STABILIZATION_DELAY)) { + Duration maxResizeStabilizationDelay = Duration.max((Duration)val, getResizeUpStabilizationDelay()); + recentDesiredResizes.setWindowSize(maxResizeStabilizationDelay); + } else if (key.equals(METRIC_LOWER_BOUND)) { + // TODO If recorded what last metric value was then we could recalculate immediately + // Rely on next metric-change to trigger recalculation; + // and same for those below... + } else if (key.equals(METRIC_UPPER_BOUND)) { + // see above + } else if (key.equals(RESIZE_UP_ITERATION_INCREMENT) || key.equals(RESIZE_UP_ITERATION_MAX) || key.equals(RESIZE_DOWN_ITERATION_INCREMENT) || key.equals(RESIZE_DOWN_ITERATION_MAX)) { + // no special actions needed + } else if (key.equals(MIN_POOL_SIZE)) { + int newMin = (Integer) val; + if (newMin > getConfig(MAX_POOL_SIZE)) { + throw new IllegalArgumentException("Min pool size "+val+" must not be greater than max pool size "+getConfig(MAX_POOL_SIZE)); + } + onPoolSizeLimitsChanged(newMin, getConfig(MAX_POOL_SIZE)); + } else if (key.equals(MAX_POOL_SIZE)) { + int newMax = (Integer) val; + if (newMax < getConfig(MIN_POOL_SIZE)) { + throw new IllegalArgumentException("Min pool size "+val+" must not be greater than max pool size "+getConfig(MAX_POOL_SIZE)); + } + onPoolSizeLimitsChanged(getConfig(MIN_POOL_SIZE), newMax); + } else { + throw new UnsupportedOperationException("reconfiguring "+key+" unsupported for "+this); + } + } + + @Override + public void suspend() { + super.suspend(); + // TODO unsubscribe from everything? And resubscribe on resume? + if (executor != null) executor.shutdownNow(); + } + + @Override + public void resume() { + super.resume(); + executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory()); + } + + @Override + public void setEntity(EntityLocal entity) { + if (!config().getRaw(RESIZE_OPERATOR).isPresentAndNonNull()) { + Preconditions.checkArgument(entity instanceof Resizable, "Provided entity "+entity+" must be an instance of Resizable, because no custom-resizer operator supplied"); + } + super.setEntity(entity); + this.poolEntity = entity; + + if (getMetric() != null) { + Entity entityToSubscribeTo = (getEntityWithMetric() != null) ? getEntityWithMetric() : entity; + subscribe(entityToSubscribeTo, getMetric(), metricEventHandler); + } + subscribe(poolEntity, getPoolColdSensor(), utilizationEventHandler); + subscribe(poolEntity, getPoolHotSensor(), utilizationEventHandler); + subscribe(poolEntity, getPoolOkSensor(), utilizationEventHandler); + } + + private ThreadFactory newThreadFactory() { + return new ThreadFactoryBuilder() + .setNameFormat("brooklyn-autoscalerpolicy-%d") + .build(); + } + + /** + * Forces an immediate resize (without waiting for stabilization etc) if the current size is + * not within the min and max limits. We schedule this so that all resize operations are done + * by the same thread. + */ + private void onPoolSizeLimitsChanged(final int min, final int max) { + if (LOG.isTraceEnabled()) LOG.trace("{} checking pool size on limits changed for {} (between {} and {})", new Object[] {this, poolEntity, min, max}); + + if (isRunning() && isEntityUp()) { + executor.submit(new Runnable() { + @Override public void run() { + try { + int currentSize = getCurrentSizeOperator().apply(entity); + int desiredSize = Math.min(max, Math.max(min, currentSize)); + + if (currentSize != desiredSize) { + if (LOG.isInfoEnabled()) LOG.info("{} resizing pool {} immediateley from {} to {} (due to new pool size limits)", new Object[] {this, poolEntity, currentSize, desiredSize}); + getResizeOperator().resize(poolEntity, desiredSize); + } + + } catch (Exception e) { + if (isRunning()) { + LOG.error("Error resizing: "+e, e); + } else { + if (LOG.isDebugEnabled()) LOG.debug("Error resizing, but no longer running: "+e, e); + } + } catch (Throwable t) { + LOG.error("Error resizing: "+t, t); + throw Throwables.propagate(t); + } + }}); + } + } + + private enum ScalingType { HOT, COLD } + private static class ScalingData { + ScalingType scalingMode; + int currentSize; + double currentMetricValue; + Double metricUpperBound; + Double metricLowerBound; + + public double getCurrentTotalActivity() { + return currentMetricValue * currentSize; + } + + public boolean isHot() { + return ((scalingMode==null || scalingMode==ScalingType.HOT) && isValid(metricUpperBound) && currentMetricValue > metricUpperBound); + } + public boolean isCold() { + return ((scalingMode==null || scalingMode==ScalingType.COLD) && isValid(metricLowerBound) && currentMetricValue < metricLowerBound); + } + private boolean isValid(Double bound) { + return (bound!=null && bound>0); + } + } + + private void onMetricChanged(Number val) { + if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-metric for {}: {}", new Object[] {this, poolEntity, val}); + + if (val==null) { + // occurs e.g. if using an aggregating enricher who returns null when empty, the sensor has gone away + if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {}, inbound metric is null", new Object[] {this, poolEntity}); + return; + } + + ScalingData data = new ScalingData(); + data.currentMetricValue = val.doubleValue(); + data.currentSize = getCurrentSizeOperator().apply(entity); + data.metricUpperBound = getMetricUpperBound().doubleValue(); + data.metricLowerBound = getMetricLowerBound().doubleValue(); + + analyze(data, "pool"); + } + + private void onPoolCold(Map<String, ?> properties) { + if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-cold for {}: {}", new Object[] {this, poolEntity, properties}); + analyzeOnHotOrColdSensor(ScalingType.COLD, "cold pool", properties); + } + + private void onPoolHot(Map<String, ?> properties) { + if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-hot for {}: {}", new Object[] {this, poolEntity, properties}); + analyzeOnHotOrColdSensor(ScalingType.HOT, "hot pool", properties); + } + + private void analyzeOnHotOrColdSensor(ScalingType scalingMode, String description, Map<String, ?> properties) { + ScalingData data = new ScalingData(); + data.scalingMode = scalingMode; + data.currentMetricValue = (Double) properties.get(POOL_CURRENT_WORKRATE_KEY); + data.currentSize = (Integer) properties.get(POOL_CURRENT_SIZE_KEY); + data.metricUpperBound = (Double) properties.get(POOL_HIGH_THRESHOLD_KEY); + data.metricLowerBound = (Double) properties.get(POOL_LOW_THRESHOLD_KEY); + + analyze(data, description); + } + + private void analyze(ScalingData data, String description) { + int desiredSizeUnconstrained; + + /* We always scale out (modulo stabilization delay) if: + * currentTotalActivity > currentSize*metricUpperBound + * With newDesiredSize the smallest n such that  n*metricUpperBound >= currentTotalActivity + * ie  n >= currentTotalActiviy/metricUpperBound, thus n := Math.ceil(currentTotalActivity/metricUpperBound) + * + * Else consider scale back if: + *  currentTotalActivity < currentSize*metricLowerBound + * With newDesiredSize normally the largest n such that:  + *  n*metricLowerBound <= currentTotalActivity + * BUT with an absolute requirement which trumps the above computation + * that the newDesiredSize doesn't cause immediate scale out: + *  n*metricUpperBound >= currentTotalActivity + * thus n := Math.max ( floor(currentTotalActiviy/metricLowerBound), ceil(currentTotal/metricUpperBound) ) + */ + if (data.isHot()) { + // scale out + desiredSizeUnconstrained = (int)Math.ceil(data.getCurrentTotalActivity() / data.metricUpperBound); + data.scalingMode = ScalingType.HOT; + + } else if (data.isCold()) { + // scale back + desiredSizeUnconstrained = (int)Math.floor(data.getCurrentTotalActivity() / data.metricLowerBound); + data.scalingMode = ScalingType.COLD; + + } else { + if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} ({} within range {}..{})", new Object[] {this, poolEntity, data.currentSize, data.currentMetricValue, data.metricLowerBound, data.metricUpperBound}); + abortResize(data.currentSize); + return; // within the healthy range; no-op + } + + if (LOG.isTraceEnabled()) LOG.debug("{} detected unconstrained desired size {}", new Object[] {this, desiredSizeUnconstrained}); + int desiredSize = applyMinMaxConstraints(desiredSizeUnconstrained); + + if ((data.scalingMode==ScalingType.COLD) && (desiredSize < data.currentSize)) { + + int delta = data.currentSize - desiredSize; + int scaleIncrement = getResizeDownIterationIncrement(); + int scaleMax = getResizeDownIterationMax(); + if (delta>scaleMax) { + delta=scaleMax; + } else if (delta % scaleIncrement != 0) { + // keep scaling to the increment + delta += scaleIncrement - (delta % scaleIncrement); + } + desiredSize = data.currentSize - delta; + + if (data.metricUpperBound!=null) { + // if upper bound supplied, check that this desired scale-back size + // is not going to cause scale-out on next run; i.e. anti-thrashing + while (desiredSize < data.currentSize && data.getCurrentTotalActivity() > data.metricUpperBound * desiredSize) { + if (LOG.isTraceEnabled()) LOG.trace("{} when resizing back pool {} from {}, tweaking from {} to prevent thrashing", new Object[] {this, poolEntity, data.currentSize, desiredSize }); + desiredSize += scaleIncrement; + } + } + desiredSize = applyMinMaxConstraints(desiredSize); + if (desiredSize >= data.currentSize) data.scalingMode = null; + + } else if ((data.scalingMode==ScalingType.HOT) && (desiredSize > data.currentSize)) { + + int delta = desiredSize - data.currentSize; + int scaleIncrement = getResizeUpIterationIncrement(); + int scaleMax = getResizeUpIterationMax(); + if (delta>scaleMax) { + delta=scaleMax; + } else if (delta % scaleIncrement != 0) { + // keep scaling to the increment + delta += scaleIncrement - (delta % scaleIncrement); + } + desiredSize = data.currentSize + delta; + desiredSize = applyMinMaxConstraints(desiredSize); + if (desiredSize <= data.currentSize) data.scalingMode = null; + + } else { + data.scalingMode = null; + } + + if (data.scalingMode!=null) { + if (LOG.isDebugEnabled()) LOG.debug("{} provisionally resizing {} {} from {} to {} ({} < {}; ideal size {})", new Object[] {this, description, poolEntity, data.currentSize, desiredSize, data.currentMetricValue, data.metricLowerBound, desiredSizeUnconstrained}); + scheduleResize(desiredSize); + } else { + if (LOG.isTraceEnabled()) LOG.trace("{} not resizing {} {} from {} to {}, {} out of healthy range {}..{} but unconstrained size {} blocked by bounds/check", new Object[] {this, description, poolEntity, data.currentSize, desiredSize, data.currentMetricValue, data.metricLowerBound, data.metricUpperBound, desiredSizeUnconstrained}); + abortResize(data.currentSize); + // but add to the unbounded record for future consideration + } + + onNewUnboundedPoolSize(desiredSizeUnconstrained); + } + + private int applyMinMaxConstraints(int desiredSize) { + desiredSize = Math.max(getMinPoolSize(), desiredSize); + desiredSize = Math.min(getMaxPoolSize(), desiredSize); + return desiredSize; + } + + private void onPoolOk(Map<String, ?> properties) { + if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-ok for {}: {}", new Object[] {this, poolEntity, properties}); + + int poolCurrentSize = (Integer) properties.get(POOL_CURRENT_SIZE_KEY); + + if (LOG.isTraceEnabled()) LOG.trace("{} not resizing ok pool {} from {}", new Object[] {this, poolEntity, poolCurrentSize}); + abortResize(poolCurrentSize); + } + + /** + * Schedules a resize, if there is not already a resize operation queued up. When that resize + * executes, it will resize to whatever the latest value is to be (rather than what it was told + * to do at the point the job was queued). + */ + private void scheduleResize(final int newSize) { + recentDesiredResizes.add(newSize); + + scheduleResize(); + } + + /** + * If a listener is registered to be notified of the max-pool-size cap being reached, then record + * what our unbounded size would be and schedule a check to see if this unbounded size is sustained. + * + * Piggy-backs off the existing scheduleResize execution, which now also checks if the listener + * needs to be called. + */ + private void onNewUnboundedPoolSize(final int val) { + if (getMaxSizeReachedSensor() != null) { + recentUnboundedResizes.add(val); + scheduleResize(); + } + } + + private void abortResize(final int currentSize) { + recentDesiredResizes.add(currentSize); + recentUnboundedResizes.add(currentSize); + } + + private boolean isEntityUp() { + if (entity == null) { + return false; + } else if (entity.getEntityType().getSensors().contains(Startable.SERVICE_UP)) { + return Boolean.TRUE.equals(entity.getAttribute(Startable.SERVICE_UP)); + } else { + return true; + } + } + + private void scheduleResize() { + // TODO Make scale-out calls concurrent, rather than waiting for first resize to entirely + // finish. On ec2 for example, this can cause us to grow very slowly if first request is for + // just one new VM to be provisioned. + + if (isRunning() && isEntityUp() && executorQueued.compareAndSet(false, true)) { + long now = System.currentTimeMillis(); + long delay = Math.max(0, (executorTime + getMinPeriodBetweenExecs().toMilliseconds()) - now); + if (LOG.isTraceEnabled()) LOG.trace("{} scheduling resize in {}ms", this, delay); + + executor.schedule(new Runnable() { + @Override public void run() { + try { + executorTime = System.currentTimeMillis(); + executorQueued.set(false); + + resizeNow(); + notifyMaxReachedIfRequiredNow(); + + } catch (Exception e) { + if (isRunning()) { + LOG.error("Error resizing: "+e, e); + } else { + if (LOG.isDebugEnabled()) LOG.debug("Error resizing, but no longer running: "+e, e); + } + } catch (Throwable t) { + LOG.error("Error resizing: "+t, t); + throw Throwables.propagate(t); + } + }}, + delay, + TimeUnit.MILLISECONDS); + } + } + + /** + * Looks at the values for "unbounded pool size" (i.e. if we ignore caps of minSize and maxSize) to report what + * those values have been within a time window. The time window used is the "maxReachedNotificationDelay", + * which determines how many milliseconds after being consistently above the max-size will it take before + * we emit the sensor event (if any). + */ + private void notifyMaxReachedIfRequiredNow() { + BasicNotificationSensor<? super MaxPoolSizeReachedEvent> maxSizeReachedSensor = getMaxSizeReachedSensor(); + if (maxSizeReachedSensor == null) { + return; + } + + WindowSummary valsSummary = recentUnboundedResizes.summarizeWindow(getMaxReachedNotificationDelay()); + long timeWindowSize = getMaxReachedNotificationDelay().toMilliseconds(); + long currentPoolSize = getCurrentSizeOperator().apply(poolEntity); + int maxAllowedPoolSize = getMaxPoolSize(); + long unboundedSustainedMaxPoolSize = valsSummary.min; // The sustained maximum (i.e. the smallest it's dropped down to) + long unboundedCurrentPoolSize = valsSummary.latest; + + if (maxReachedLastNotifiedTime > 0) { + // already notified the listener; don't do it again + // TODO Could have max period for notifications, or a step increment to warn when exceeded by ever bigger amounts + + } else if (unboundedSustainedMaxPoolSize > maxAllowedPoolSize) { + // We have consistently wanted to be bigger than the max allowed; tell the listener + if (LOG.isDebugEnabled()) LOG.debug("{} notifying listener of max pool size reached; current {}, max {}, unbounded current {}, unbounded max {}", + new Object[] {this, currentPoolSize, maxAllowedPoolSize, unboundedCurrentPoolSize, unboundedSustainedMaxPoolSize}); + + maxReachedLastNotifiedTime = System.currentTimeMillis(); + MaxPoolSizeReachedEvent event = MaxPoolSizeReachedEvent.builder() + .currentPoolSize(currentPoolSize) + .maxAllowed(maxAllowedPoolSize) + .currentUnbounded(unboundedCurrentPoolSize) + .maxUnbounded(unboundedSustainedMaxPoolSize) + .timeWindow(timeWindowSize) + .build(); + entity.emit(maxSizeReachedSensor, event); + + } else if (valsSummary.max > maxAllowedPoolSize) { + // We temporarily wanted to be bigger than the max allowed; check back later to see if consistent + // TODO Could check if there has been anything bigger than "min" since min happened (would be more efficient) + if (LOG.isTraceEnabled()) LOG.trace("{} re-scheduling max-reached check for {}, as unbounded size not stable (min {}, max {}, latest {})", + new Object[] {this, poolEntity, valsSummary.min, valsSummary.max, valsSummary.latest}); + scheduleResize(); + + } else { + // nothing to write home about; continually below maxAllowed + } + } + + private void resizeNow() { + long currentPoolSize = getCurrentSizeOperator().apply(poolEntity); + CalculatedDesiredPoolSize calculatedDesiredPoolSize = calculateDesiredPoolSize(currentPoolSize); + final long desiredPoolSize = calculatedDesiredPoolSize.size; + boolean stable = calculatedDesiredPoolSize.stable; + + if (!stable) { + // the desired size fluctuations are not stable; ensure we check again later (due to time-window) + // even if no additional events have been received + // (note we continue now with as "good" a resize as we can given the instability) + if (LOG.isTraceEnabled()) LOG.trace("{} re-scheduling resize check for {}, as desired size not stable (current {}, desired {}); continuing with resize...", + new Object[] {this, poolEntity, currentPoolSize, desiredPoolSize}); + scheduleResize(); + } + if (currentPoolSize == desiredPoolSize) { + if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} to {}", + new Object[] {this, poolEntity, currentPoolSize, desiredPoolSize}); + return; + } + + if (LOG.isDebugEnabled()) LOG.debug("{} requesting resize to {}; current {}, min {}, max {}", + new Object[] {this, desiredPoolSize, currentPoolSize, getMinPoolSize(), getMaxPoolSize()}); + + Entities.submit(entity, Tasks.<Void>builder().name("Auto-scaler") + .description("Auto-scaler recommending resize from "+currentPoolSize+" to "+desiredPoolSize) + .tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG) + .body(new Callable<Void>() { + @Override + public Void call() throws Exception { + // TODO Should we use int throughout, rather than casting here? + getResizeOperator().resize(poolEntity, (int) desiredPoolSize); + return null; + } + }).build()) + .blockUntilEnded(); + } + + /** + * Complicated logic for stabilization-delay... + * Only grow if we have consistently been asked to grow for the resizeUpStabilizationDelay period; + * Only shrink if we have consistently been asked to shrink for the resizeDownStabilizationDelay period. + * + * @return tuple of desired pool size, and whether this is "stable" (i.e. if we receive no more events + * will this continue to be the desired pool size) + */ + private CalculatedDesiredPoolSize calculateDesiredPoolSize(long currentPoolSize) { + long now = System.currentTimeMillis(); + WindowSummary downsizeSummary = recentDesiredResizes.summarizeWindow(getResizeDownStabilizationDelay()); + WindowSummary upsizeSummary = recentDesiredResizes.summarizeWindow(getResizeUpStabilizationDelay()); + + // this is the _sustained_ growth value; the smallest size that has been requested in the "stable-for-growing" period + long maxDesiredPoolSize = upsizeSummary.min; + boolean stableForGrowing = upsizeSummary.stableForGrowth; + + // this is the _sustained_ shrink value; largest size that has been requested in the "stable-for-shrinking" period: + long minDesiredPoolSize = downsizeSummary.max; + boolean stableForShrinking = downsizeSummary.stableForShrinking; + + // (it is a logical consequence of the above that minDesired >= maxDesired -- this is correct, if confusing: + // think of minDesired as the minimum size we are allowed to resize to, and similarly for maxDesired; + // if min > max we can scale to max if current < max, or scale to min if current > min) + + long desiredPoolSize; + + boolean stable; + + if (currentPoolSize < maxDesiredPoolSize) { + // we have valid request to grow + // (we'll never have a valid request to grow and a valid to shrink simultaneously, btw) + desiredPoolSize = maxDesiredPoolSize; + stable = stableForGrowing; + } else if (currentPoolSize > minDesiredPoolSize) { + // we have valid request to shrink + desiredPoolSize = minDesiredPoolSize; + stable = stableForShrinking; + } else { + desiredPoolSize = currentPoolSize; + stable = stableForGrowing && stableForShrinking; + } + + if (LOG.isTraceEnabled()) LOG.trace("{} calculated desired pool size: from {} to {}; minDesired {}, maxDesired {}; " + + "stable {}; now {}; downsizeHistory {}; upsizeHistory {}", + new Object[] {this, currentPoolSize, desiredPoolSize, minDesiredPoolSize, maxDesiredPoolSize, stable, now, downsizeSummary, upsizeSummary}); + + return new CalculatedDesiredPoolSize(desiredPoolSize, stable); + } + + private static class CalculatedDesiredPoolSize { + final long size; + final boolean stable; + + CalculatedDesiredPoolSize(long size, boolean stable) { + this.size = size; + this.stable = stable; + } + } + + @Override + public String toString() { + return getClass().getSimpleName() + (groovyTruth(name) ? "("+name+")" : ""); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/MaxPoolSizeReachedEvent.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/MaxPoolSizeReachedEvent.java b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/MaxPoolSizeReachedEvent.java new file mode 100644 index 0000000..6e97771 --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/MaxPoolSizeReachedEvent.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.autoscaling; + +import java.io.Serializable; + +import com.google.common.base.Objects; + +public class MaxPoolSizeReachedEvent implements Serializable { + private static final long serialVersionUID = 1602627701360505190L; + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + protected long maxAllowed; + protected long currentPoolSize; + protected long currentUnbounded; + protected long maxUnbounded; + protected long timeWindow; + + public Builder maxAllowed(long val) { + this.maxAllowed = val; return this; + } + + public Builder currentPoolSize(long val) { + this.currentPoolSize = val; return this; + } + + public Builder currentUnbounded(long val) { + this.currentUnbounded = val; return this; + } + + public Builder maxUnbounded(long val) { + this.maxUnbounded = val; return this; + } + + public Builder timeWindow(long val) { + this.timeWindow = val; return this; + } + public MaxPoolSizeReachedEvent build() { + return new MaxPoolSizeReachedEvent(this); + } + } + + private final long maxAllowed; + private final long currentPoolSize; + private final long currentUnbounded; + private final long maxUnbounded; + private final long timeWindow; + + protected MaxPoolSizeReachedEvent(Builder builder) { + maxAllowed = builder.maxAllowed; + currentPoolSize = builder.currentPoolSize; + currentUnbounded = builder.currentUnbounded; + maxUnbounded = builder.maxUnbounded; + timeWindow = builder.timeWindow; + } + + public long getMaxAllowed() { + return maxAllowed; + } + + public long getCurrentPoolSize() { + return currentPoolSize; + } + + public long getCurrentUnbounded() { + return currentUnbounded; + } + + public long getMaxUnbounded() { + return maxUnbounded; + } + + public long getTimeWindow() { + return timeWindow; + } + + @Override + public String toString() { + return Objects.toStringHelper(this).add("maxAllowed", maxAllowed).add("currentPoolSize", currentPoolSize) + .add("currentUnbounded", currentUnbounded).add("maxUnbounded", maxUnbounded) + .add("timeWindow", timeWindow).toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/ResizeOperator.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/ResizeOperator.java b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/ResizeOperator.java new file mode 100644 index 0000000..4f4fbb0 --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/ResizeOperator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.autoscaling; + +import org.apache.brooklyn.api.entity.Entity; + +public interface ResizeOperator { + + /** + * Resizes the given entity to the desired size, if possible. + * + * @return the new size of the entity + */ + public Integer resize(Entity entity, Integer desiredSize); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/SizeHistory.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/SizeHistory.java b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/SizeHistory.java new file mode 100644 index 0000000..0aa8801 --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/SizeHistory.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.autoscaling; + +import java.util.List; + +import brooklyn.util.collections.MutableMap; +import brooklyn.util.collections.TimeWindowedList; +import brooklyn.util.collections.TimestampedValue; +import brooklyn.util.time.Duration; + +import com.google.common.base.Objects; + +/** + * Using a {@link TimeWindowedList}, tracks the recent history of values to allow a summary of + * those values to be obtained. + * + * @author aled + */ +public class SizeHistory { + + public static class WindowSummary { + /** The most recent value (or -1 if there has been no value) */ + public final long latest; + + /** The minimum vaule within the given time period */ + public final long min; + + /** The maximum vaule within the given time period */ + public final long max; + + /** true if, since that max value, there have not been any higher values */ + public final boolean stableForGrowth; + + /** true if, since that low value, there have not been any lower values */ + public final boolean stableForShrinking; + + public WindowSummary(long latest, long min, long max, boolean stableForGrowth, boolean stableForShrinking) { + this.latest = latest; + this.min = min; + this.max = max; + this.stableForGrowth = stableForGrowth; + this.stableForShrinking = stableForShrinking; + } + + @Override + public String toString() { + return Objects.toStringHelper(this).add("latest", latest).add("min", min).add("max", max) + .add("stableForGrowth", stableForGrowth).add("stableForShrinking", stableForShrinking).toString(); + } + } + + private final TimeWindowedList<Number> recentDesiredResizes; + + public SizeHistory(long windowSize) { + recentDesiredResizes = new TimeWindowedList<Number>(MutableMap.of("timePeriod", windowSize, "minExpiredVals", 1)); + } + + public void add(final int val) { + recentDesiredResizes.add(val); + } + + public void setWindowSize(Duration newWindowSize) { + recentDesiredResizes.setTimePeriod(newWindowSize); + } + + /** + * Summarises the history of values in this time window, with a few special things: + * <ul> + * <li>If entire time-window is not covered by the given values, then min is Integer.MIN_VALUE and max is Integer.MAX_VALUE + * <li>If no values, then latest is -1 + * <li>If no recent values, then keeps last-seen value (no matter how old), to use that + * <li>"stable for growth" means that since that max value, there have not been any higher values + * <li>"stable for shrinking" means that since that low value, there have not been any lower values + * </ul> + */ + public WindowSummary summarizeWindow(Duration windowSize) { + long now = System.currentTimeMillis(); + List<TimestampedValue<Number>> windowVals = recentDesiredResizes.getValuesInWindow(now, windowSize); + + Number latestObj = latestInWindow(windowVals); + long latest = (latestObj == null) ? -1: latestObj.longValue(); + long max = maxInWindow(windowVals, windowSize).longValue(); + long min = minInWindow(windowVals, windowSize).longValue(); + + // TODO Could do more sophisticated "stable" check; this is the easiest code - correct but not most efficient + // in terms of the caller having to schedule additional stability checks. + boolean stable = (min == max); + + return new WindowSummary(latest, min, max, stable, stable); + } + + /** + * If the entire time-window is not covered by the given values, then returns Integer.MAX_VALUE. + */ + private <T extends Number> T maxInWindow(List<TimestampedValue<T>> vals, Duration timeWindow) { + // TODO bad casting from Integer default result to T + long now = System.currentTimeMillis(); + long epoch = now - timeWindow.toMilliseconds(); + T result = null; + double resultAsDouble = Integer.MAX_VALUE; + for (TimestampedValue<T> val : vals) { + T valAsNum = val.getValue(); + double valAsDouble = (valAsNum != null) ? valAsNum.doubleValue() : 0; + if (result == null && val.getTimestamp() > epoch) { + result = withDefault(null, Integer.MAX_VALUE); + resultAsDouble = result.doubleValue(); + } + if (result == null || (valAsNum != null && valAsDouble > resultAsDouble)) { + result = valAsNum; + resultAsDouble = valAsDouble; + } + } + return withDefault(result, Integer.MAX_VALUE); + } + + /** + * If the entire time-window is not covered by the given values, then returns Integer.MIN_VALUE + */ + private <T extends Number> T minInWindow(List<TimestampedValue<T>> vals, Duration timeWindow) { + long now = System.currentTimeMillis(); + long epoch = now - timeWindow.toMilliseconds(); + T result = null; + double resultAsDouble = Integer.MIN_VALUE; + for (TimestampedValue<T> val : vals) { + T valAsNum = val.getValue(); + double valAsDouble = (valAsNum != null) ? valAsNum.doubleValue() : 0; + if (result == null && val.getTimestamp() > epoch) { + result = withDefault(null, Integer.MIN_VALUE); + resultAsDouble = result.doubleValue(); + } + if (result == null || (val.getValue() != null && valAsDouble < resultAsDouble)) { + result = valAsNum; + resultAsDouble = valAsDouble; + } + } + return withDefault(result, Integer.MIN_VALUE); + } + + @SuppressWarnings("unchecked") + private <T> T withDefault(T result, Integer defaultValue) { + return result!=null ? result : (T) defaultValue; + } + /** + * @return null if empty, or the most recent value + */ + private <T extends Number> T latestInWindow(List<TimestampedValue<T>> vals) { + return vals.isEmpty() ? null : vals.get(vals.size()-1).getValue(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java new file mode 100644 index 0000000..fef3d7f --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.followthesun; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.location.basic.AbstractLocation; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; + +public class DefaultFollowTheSunModel<ContainerType, ItemType> implements FollowTheSunModel<ContainerType, ItemType> { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultFollowTheSunModel.class); + + // Concurrent maps cannot have null value; use this to represent when no container is supplied for an item + private static final String NULL = "null-val"; + private static final Location NULL_LOCATION = new AbstractLocation(newHashMap("name","null-location")) {}; + + private final String name; + private final Set<ContainerType> containers = Collections.newSetFromMap(new ConcurrentHashMap<ContainerType,Boolean>()); + private final Map<ItemType, ContainerType> itemToContainer = new ConcurrentHashMap<ItemType, ContainerType>(); + private final Map<ContainerType, Location> containerToLocation = new ConcurrentHashMap<ContainerType, Location>(); + private final Map<ItemType, Location> itemToLocation = new ConcurrentHashMap<ItemType, Location>(); + private final Map<ItemType, Map<? extends ItemType, Double>> itemUsage = new ConcurrentHashMap<ItemType, Map<? extends ItemType,Double>>(); + private final Set<ItemType> immovableItems = Collections.newSetFromMap(new ConcurrentHashMap<ItemType, Boolean>()); + + public DefaultFollowTheSunModel(String name) { + this.name = name; + } + + @Override + public Set<ItemType> getItems() { + return itemToContainer.keySet(); + } + + @Override + public ContainerType getItemContainer(ItemType item) { + ContainerType result = itemToContainer.get(item); + return (isNull(result) ? null : result); + } + + @Override + public Location getItemLocation(ItemType item) { + Location result = itemToLocation.get(item); + return (isNull(result) ? null : result); + } + + @Override + public Location getContainerLocation(ContainerType container) { + Location result = containerToLocation.get(container); + return (isNull(result) ? null : result); + } + + // Provider methods. + + @Override public String getName() { + return name; + } + + // TODO: delete? + @Override public String getName(ItemType item) { + return item.toString(); + } + + @Override public boolean isItemMoveable(ItemType item) { + // If don't know about item, then assume not movable; otherwise has this item been explicitly flagged as immovable? + return hasItem(item) && !immovableItems.contains(item); + } + + @Override public boolean isItemAllowedIn(ItemType item, Location location) { + return true; // TODO? + } + + @Override public boolean hasActiveMigration(ItemType item) { + return false; // TODO? + } + + @Override + // FIXME Too expensive to compute; store in a different data structure? + public Map<ItemType, Map<Location, Double>> getDirectSendsToItemByLocation() { + Map<ItemType, Map<Location, Double>> result = new LinkedHashMap<ItemType, Map<Location,Double>>(getNumItems()); + + for (Map.Entry<ItemType, Map<? extends ItemType, Double>> entry : itemUsage.entrySet()) { + ItemType targetItem = entry.getKey(); + Map<? extends ItemType, Double> sources = entry.getValue(); + if (sources.isEmpty()) continue; // no-one talking to us + + Map<Location, Double> targetUsageByLocation = new LinkedHashMap<Location, Double>(); + result.put(targetItem, targetUsageByLocation); + + for (Map.Entry<? extends ItemType, Double> entry2 : sources.entrySet()) { + ItemType sourceItem = entry2.getKey(); + Location sourceLocation = getItemLocation(sourceItem); + double usageVal = (entry.getValue() != null) ? entry2.getValue() : 0d; + if (sourceLocation == null) continue; // don't know where to attribute this load; e.g. item may have just terminated + if (sourceItem.equals(targetItem)) continue; // ignore msgs to self + + Double usageValTotal = targetUsageByLocation.get(sourceLocation); + double newUsageValTotal = (usageValTotal != null ? usageValTotal : 0d) + usageVal; + targetUsageByLocation.put(sourceLocation, newUsageValTotal); + } + } + + return result; + } + + @Override + public Set<ContainerType> getAvailableContainersFor(ItemType item, Location location) { + checkNotNull(location); + return getContainersInLocation(location); + } + + + // Mutators. + + @Override + public void onItemMoved(ItemType item, ContainerType newContainer) { + // idempotent, as may be called multiple times + Location newLocation = (newContainer != null) ? containerToLocation.get(newContainer) : null; + ContainerType newContainerNonNull = toNonNullContainer(newContainer); + Location newLocationNonNull = toNonNullLocation(newLocation); + ContainerType oldContainer = itemToContainer.put(item, newContainerNonNull); + Location oldLocation = itemToLocation.put(item, newLocationNonNull); + } + + @Override + public void onContainerAdded(ContainerType container, Location location) { + Location locationNonNull = toNonNullLocation(location); + containers.add(container); + containerToLocation.put(container, locationNonNull); + for (ItemType item : getItemsOnContainer(container)) { + itemToLocation.put(item, locationNonNull); + } + } + + @Override + public void onContainerRemoved(ContainerType container) { + containers.remove(container); + containerToLocation.remove(container); + } + + public void onContainerLocationUpdated(ContainerType container, Location location) { + if (!containers.contains(container)) { + // unknown container; probably just stopped? + // If this overtook onContainerAdded, then assume we'll lookup the location and get it right in onContainerAdded + if (LOG.isDebugEnabled()) LOG.debug("Ignoring setting of location for unknown container {}, to {}", container, location); + return; + } + Location locationNonNull = toNonNullLocation(location); + containerToLocation.put(container, locationNonNull); + for (ItemType item : getItemsOnContainer(container)) { + itemToLocation.put(item, locationNonNull); + } + } + + @Override + public void onItemAdded(ItemType item, ContainerType container, boolean immovable) { + // idempotent, as may be called multiple times + + if (immovable) { + immovableItems.add(item); + } + Location location = (container != null) ? containerToLocation.get(container) : null; + ContainerType containerNonNull = toNonNullContainer(container); + Location locationNonNull = toNonNullLocation(location); + ContainerType oldContainer = itemToContainer.put(item, containerNonNull); + Location oldLocation = itemToLocation.put(item, locationNonNull); + } + + @Override + public void onItemRemoved(ItemType item) { + itemToContainer.remove(item); + itemToLocation.remove(item); + itemUsage.remove(item); + immovableItems.remove(item); + } + + @Override + public void onItemUsageUpdated(ItemType item, Map<? extends ItemType, Double> newValue) { + if (hasItem(item)) { + itemUsage.put(item, newValue); + } else { + // Can happen when item removed - get notification of removal and workrate from group and item + // respectively, so can overtake each other + if (LOG.isDebugEnabled()) LOG.debug("Ignoring setting of usage for unknown item {}, to {}", item, newValue); + } + } + + + // Additional methods for tests. + + /** + * Warning: this can be an expensive (time and memory) operation if there are a lot of items/containers. + */ + @VisibleForTesting + public String itemDistributionToString() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + dumpItemDistribution(new PrintStream(baos)); + return new String(baos.toByteArray()); + } + + @VisibleForTesting + public void dumpItemDistribution() { + dumpItemDistribution(System.out); + } + + @VisibleForTesting + public void dumpItemDistribution(PrintStream out) { + Map<ItemType, Map<Location, Double>> directSendsToItemByLocation = getDirectSendsToItemByLocation(); + + out.println("Follow-The-Sun dump: "); + for (Location location: getLocations()) { + out.println("\t"+"Location "+location); + for (ContainerType container : getContainersInLocation(location)) { + out.println("\t\t"+"Container "+container); + for (ItemType item : getItemsOnContainer(container)) { + Map<Location, Double> inboundUsage = directSendsToItemByLocation.get(item); + Map<? extends ItemType, Double> outboundUsage = itemUsage.get(item); + double totalInboundByLocation = (inboundUsage != null) ? sum(inboundUsage.values()) : 0d; + double totalInboundByActor = (outboundUsage != null) ? sum(outboundUsage.values()) : 0d; + out.println("\t\t\t"+"Item "+item); + out.println("\t\t\t\t"+"Inbound-by-location: "+totalInboundByLocation+": "+inboundUsage); + out.println("\t\t\t\t"+"Inbound-by-actor: "+totalInboundByActor+": "+outboundUsage); + } + } + } + out.flush(); + } + + private boolean hasItem(ItemType item) { + return itemToContainer.containsKey(item); + } + + private Set<Location> getLocations() { + return ImmutableSet.copyOf(containerToLocation.values()); + } + + private Set<ContainerType> getContainersInLocation(Location location) { + Set<ContainerType> result = new LinkedHashSet<ContainerType>(); + for (Map.Entry<ContainerType, Location> entry : containerToLocation.entrySet()) { + if (location.equals(entry.getValue())) { + result.add(entry.getKey()); + } + } + return result; + } + + private Set<ItemType> getItemsOnContainer(ContainerType container) { + Set<ItemType> result = new LinkedHashSet<ItemType>(); + for (Map.Entry<ItemType, ContainerType> entry : itemToContainer.entrySet()) { + if (container.equals(entry.getValue())) { + result.add(entry.getKey()); + } + } + return result; + } + + private int getNumItems() { + return itemToContainer.size(); + } + + @SuppressWarnings("unchecked") + private ContainerType nullContainer() { + return (ContainerType) NULL; // relies on erasure + } + + private Location nullLocation() { + return NULL_LOCATION; + } + + private ContainerType toNonNullContainer(ContainerType val) { + return (val != null) ? val : nullContainer(); + } + + private Location toNonNullLocation(Location val) { + return (val != null) ? val : nullLocation(); + } + + private boolean isNull(Object val) { + return val == NULL || val == NULL_LOCATION; + } + + // TODO Move to utils; or stop AbstractLocation from removing things from the map! + public static <K,V> Map<K,V> newHashMap(K k, V v) { + Map<K,V> result = Maps.newLinkedHashMap(); + result.put(k, v); + return result; + } + + public static double sum(Collection<? extends Number> values) { + double total = 0; + for (Number d : values) { + total += d.doubleValue(); + } + return total; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunModel.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunModel.java b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunModel.java new file mode 100644 index 0000000..07c6ed0 --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunModel.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.followthesun; + +import java.util.Map; +import java.util.Set; + +import org.apache.brooklyn.api.location.Location; + +/** + * Captures the state of items, containers and locations for the purpose of moving items around + * to minimise latency. For consumption by a {@link FollowTheSunStrategy}. + */ +public interface FollowTheSunModel<ContainerType, ItemType> { + + // Attributes of the pool. + public String getName(); + + // Attributes of containers and items. + public String getName(ItemType item); + public Set<ItemType> getItems(); + public Map<ItemType, Map<Location, Double>> getDirectSendsToItemByLocation(); + public Location getItemLocation(ItemType item); + public ContainerType getItemContainer(ItemType item); + public Location getContainerLocation(ContainerType container); + public boolean hasActiveMigration(ItemType item); + public Set<ContainerType> getAvailableContainersFor(ItemType item, Location location); + public boolean isItemMoveable(ItemType item); + public boolean isItemAllowedIn(ItemType item, Location location); + + // Mutators for keeping the model in-sync with the observed world + public void onContainerAdded(ContainerType container, Location location); + public void onContainerRemoved(ContainerType container); + public void onContainerLocationUpdated(ContainerType container, Location location); + + public void onItemAdded(ItemType item, ContainerType parentContainer, boolean immovable); + public void onItemRemoved(ItemType item); + public void onItemUsageUpdated(ItemType item, Map<? extends ItemType, Double> newValues); + public void onItemMoved(ItemType item, ContainerType newContainer); +}
