Repository: incubator-brooklyn Updated Branches: refs/heads/master f2de6ecfb -> 692eb0c87
fix time-sensitive sensor derivation when rebinding to old sensors before this, autoscaler might scale out on rebind, because the last total reqs might be 100, if data is v old, current value might be 10000; a recent timestamp is attached to initial value, meaning it computes 9900 new reqs in a 1s window. this forces an invalid timestamp for enrichers, and time-based sensors are stricter (configurable) about how they see windows and exclude timestamps. this may change behaviour in some places, particularly where sensors were being fired based on incomplete or stale data. better fix would be to store timestamp on sensors themselves Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/87604e28 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/87604e28 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/87604e28 Branch: refs/heads/master Commit: 87604e28cb6f3bb860a051aef579361d4ee7b2ef Parents: 8c5dc9d Author: Alex Heneveld <[email protected]> Authored: Mon Feb 2 12:37:55 2015 +0000 Committer: Alex Heneveld <[email protected]> Committed: Mon Feb 2 17:31:50 2015 +0000 ---------------------------------------------------------------------- .../basic/AbstractTypeTransformingEnricher.java | 2 +- .../brooklyn/enricher/basic/AddingEnricher.java | 2 +- .../java/brooklyn/enricher/basic/Combiner.java | 2 +- .../brooklyn/enricher/basic/Transformer.java | 2 +- .../brooklyn/event/basic/BasicSensorEvent.java | 9 +-- .../enricher/RollingTimeWindowMeanEnricher.java | 83 ++++++++++++++++---- .../enricher/TimeWeightedDeltaEnricher.java | 8 +- .../policy/autoscaling/AutoScalerPolicy.java | 4 +- .../brooklyn/enricher/RebindEnricherTest.java | 3 + .../RollingTimeWindowMeanEnricherTest.groovy | 10 ++- 10 files changed, 94 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/core/src/main/java/brooklyn/enricher/basic/AbstractTypeTransformingEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/enricher/basic/AbstractTypeTransformingEnricher.java b/core/src/main/java/brooklyn/enricher/basic/AbstractTypeTransformingEnricher.java index 27eac93..ab31ebc 100644 --- a/core/src/main/java/brooklyn/enricher/basic/AbstractTypeTransformingEnricher.java +++ b/core/src/main/java/brooklyn/enricher/basic/AbstractTypeTransformingEnricher.java @@ -61,7 +61,7 @@ public abstract class AbstractTypeTransformingEnricher<T,U> extends AbstractEnri Object value = producer.getAttribute((AttributeSensor)source); // TODO Aled didn't you write a convenience to "subscribeAndRunIfSet" ? (-Alex) if (value!=null) - onEvent(new BasicSensorEvent(source, producer, value)); + onEvent(new BasicSensorEvent(source, producer, value, -1)); } } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/core/src/main/java/brooklyn/enricher/basic/AddingEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/enricher/basic/AddingEnricher.java b/core/src/main/java/brooklyn/enricher/basic/AddingEnricher.java index 11254bf..868240d 100644 --- a/core/src/main/java/brooklyn/enricher/basic/AddingEnricher.java +++ b/core/src/main/java/brooklyn/enricher/basic/AddingEnricher.java @@ -61,7 +61,7 @@ public class AddingEnricher extends AbstractEnricher implements SensorEventListe if (source instanceof AttributeSensor) { Object value = entity.getAttribute((AttributeSensor)source); if (value!=null) - onEvent(new BasicSensorEvent(source, entity, value)); + onEvent(new BasicSensorEvent(source, entity, value, -1)); } } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/core/src/main/java/brooklyn/enricher/basic/Combiner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/enricher/basic/Combiner.java b/core/src/main/java/brooklyn/enricher/basic/Combiner.java index cd410a8..cc37d8c 100644 --- a/core/src/main/java/brooklyn/enricher/basic/Combiner.java +++ b/core/src/main/java/brooklyn/enricher/basic/Combiner.java @@ -103,7 +103,7 @@ public class Combiner<T,U> extends AbstractEnricher implements SensorEventListen // TODO Aled didn't you write a convenience to "subscribeAndRunIfSet" ? (-Alex) // Unfortunately not yet! if (value != null) { - onEvent(new BasicSensorEvent(sourceSensor, producer, value)); + onEvent(new BasicSensorEvent(sourceSensor, producer, value, -1)); } } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/core/src/main/java/brooklyn/enricher/basic/Transformer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/enricher/basic/Transformer.java b/core/src/main/java/brooklyn/enricher/basic/Transformer.java index d8244c3..d126307 100644 --- a/core/src/main/java/brooklyn/enricher/basic/Transformer.java +++ b/core/src/main/java/brooklyn/enricher/basic/Transformer.java @@ -98,7 +98,7 @@ public class Transformer<T,U> extends AbstractEnricher implements SensorEventLis Object value = producer.getAttribute((AttributeSensor<?>)sourceSensor); // TODO would be useful to have a convenience to "subscribeAndThenIfItIsAlreadySetRunItOnce" if (value!=null) { - onEvent(new BasicSensorEvent(sourceSensor, producer, value)); + onEvent(new BasicSensorEvent(sourceSensor, producer, value, -1)); } } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/core/src/main/java/brooklyn/event/basic/BasicSensorEvent.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/event/basic/BasicSensorEvent.java b/core/src/main/java/brooklyn/event/basic/BasicSensorEvent.java index da912a8..b571eb7 100644 --- a/core/src/main/java/brooklyn/event/basic/BasicSensorEvent.java +++ b/core/src/main/java/brooklyn/event/basic/BasicSensorEvent.java @@ -51,19 +51,14 @@ public class BasicSensorEvent<T> implements SensorEvent<T> { /** arguments should not be null (except in certain limited testing situations) */ public BasicSensorEvent(Sensor<T> sensor, Entity source, T value) { - this(sensor, source, value, 0); + this(sensor, source, value, System.currentTimeMillis()); } public BasicSensorEvent(Sensor<T> sensor, Entity source, T value, long timestamp) { this.sensor = sensor; this.source = source; this.value = value; - - if (timestamp > 0) { - this.timestamp = timestamp; - } else { - this.timestamp = System.currentTimeMillis(); - } + this.timestamp = timestamp; } public static <T> SensorEvent<T> of(Sensor<T> sensor, Entity source, T value, long timestamp) { http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java b/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java index ea70445..3c9247b 100644 --- a/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java +++ b/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java @@ -21,11 +21,10 @@ package brooklyn.enricher; import java.util.Iterator; import java.util.LinkedList; -import com.google.common.base.Preconditions; - -import brooklyn.catalog.Catalog; +import brooklyn.config.ConfigKey; import brooklyn.enricher.basic.AbstractTypeTransformingEnricher; import brooklyn.entity.Entity; +import brooklyn.entity.basic.ConfigKeys; import brooklyn.event.AttributeSensor; import brooklyn.event.Sensor; import brooklyn.event.SensorEvent; @@ -33,6 +32,8 @@ import brooklyn.util.flags.SetFromFlag; import brooklyn.util.javalang.JavaClassNames; import brooklyn.util.time.Duration; +import com.google.common.base.Preconditions; + /** * Transforms {@link Sensor} data into a rolling average based on a time window. * @@ -59,6 +60,15 @@ import brooklyn.util.time.Duration; //@Catalog(name="Rolling Mean in Time Window", description="Transforms a sensor's data into a rolling average " // + "based on a time window.") public class RollingTimeWindowMeanEnricher<T extends Number> extends AbstractTypeTransformingEnricher<T,Double> { + + public static ConfigKey<Double> CONFIDENCE_REQUIRED_TO_PUBLISH = ConfigKeys.newDoubleConfigKey("confidenceRequired", + "Minimum confidence level (ie period covered) required to publish a rolling average", 0.8d); + + // without this, we will refuse to publish if the server time differs from the publisher time (in a distributed setup); + // also we won't publish if a lot of time is spent actually doing the computation + public static ConfigKey<Duration> TIMESTAMP_GRACE_TIME = ConfigKeys.newConfigKey(Duration.class, "timestampGraceTime", + "When computing windowed average, allow this much slippage time between published metrics and local clock", Duration.millis(500)); + public static class ConfidenceQualifiedNumber { final Double value; final double confidence; @@ -67,6 +77,12 @@ public class RollingTimeWindowMeanEnricher<T extends Number> extends AbstractTyp this.value = value; this.confidence = confidence; } + + @Override + public String toString() { + return ""+value+" ("+(int)(confidence*100)+"%)"; + } + } private final LinkedList<T> values = new LinkedList<T>(); @@ -103,31 +119,69 @@ public class RollingTimeWindowMeanEnricher<T extends Number> extends AbstractTyp public void onEvent(SensorEvent<T> event, long eventTime) { values.addLast(event.getValue()); timestamps.addLast(eventTime); - pruneValues(eventTime); - entity.setAttribute((AttributeSensor<Double>)target, getAverage(eventTime).value); //TODO this can potentially go stale... maybe we need to timestamp as well? + if (eventTime>0) { + ConfidenceQualifiedNumber average = getAverage(eventTime, 0); + + if (average.confidence > getConfig(CONFIDENCE_REQUIRED_TO_PUBLISH)) { + // without confidence, we might publish wildly varying estimates, + // causing spurious resizes, so allow it to be configured, and + // by default require a high value + + // TODO would be nice to include timestamp, etc + entity.setAttribute((AttributeSensor<Double>)target, average.value); + } + } } public ConfidenceQualifiedNumber getAverage() { - return getAverage(System.currentTimeMillis()); + return getAverage(System.currentTimeMillis(), getConfig(TIMESTAMP_GRACE_TIME).toMilliseconds()); } - public ConfidenceQualifiedNumber getAverage(long now) { - pruneValues(now); + public ConfidenceQualifiedNumber getAverage(long fromTimeExact) { + return getAverage(fromTimeExact, 0); + } + + public ConfidenceQualifiedNumber getAverage(long fromTime, long graceAllowed) { if (timestamps.isEmpty()) { return lastAverage = new ConfidenceQualifiedNumber(lastAverage.value, 0.0d); } + + // (previously there was an old comment here, pre-Jul-2014, + // saying "grkvlt - see email to development list"; + // but i can't find that email) + // some of the more recent confidence and bogus-timestamp + exclusion logic might fix this though + + long firstTimestamp = -1; + Iterator<Long> ti = timestamps.iterator(); + while (ti.hasNext()) { + firstTimestamp = ti.next(); + if (firstTimestamp>0) break; + } + if (firstTimestamp<=0) { + // no values with reasonable timestamps + return lastAverage = new ConfidenceQualifiedNumber(values.get(values.size()-1).doubleValue(), 0.0d); + } - // XXX grkvlt - see email to development list + long lastTimestamp = timestamps.get(timestamps.size()-1); + long now = fromTime; + if (lastTimestamp > fromTime - graceAllowed) { + // without this, if the computation takes place X seconds after the publish, + // we treat X seconds as time for which we have no confidence in the data + now = lastTimestamp; + } + pruneValues(now); - long lastTimestamp = timestamps.get(timestamps.size()-1); - Double confidence = ((double)(timePeriod.toMilliseconds() - (now - lastTimestamp))) / timePeriod.toMilliseconds(); - if (confidence <= 0.0d) { + long windowStart = Math.max(now-timePeriod.toMilliseconds(), firstTimestamp); + long windowEnd = Math.max(now-timePeriod.toMilliseconds(), lastTimestamp); + Double confidence = ((double)(windowEnd - windowStart)) / timePeriod.toMilliseconds(); + if (confidence <= 0.0000001d) { + // not enough timestamps in window double lastValue = values.get(values.size()-1).doubleValue(); return lastAverage = new ConfidenceQualifiedNumber(lastValue, 0.0d); } - long start = (now - timePeriod.toMilliseconds()); + long start = windowStart; long end; double weightedAverage = 0.0d; @@ -151,7 +205,8 @@ public class RollingTimeWindowMeanEnricher<T extends Number> extends AbstractTyp * Discards out-of-date values, but keeps at least one value. */ private void pruneValues(long now) { - while(timestamps.size() > 1 && timestamps.get(0) < (now - timePeriod.toMilliseconds())) { + // keep one value from before the period, so that we can tell the window's start time + while(timestamps.size() > 1 && timestamps.get(1) < (now - timePeriod.toMilliseconds())) { timestamps.removeFirst(); values.removeFirst(); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java b/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java index 0d4ff84..b746edd 100644 --- a/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java +++ b/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java @@ -23,7 +23,6 @@ import groovy.lang.Closure; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import brooklyn.catalog.Catalog; import brooklyn.enricher.basic.AbstractTypeTransformingEnricher; import brooklyn.entity.Entity; import brooklyn.event.AttributeSensor; @@ -102,8 +101,8 @@ public class TimeWeightedDeltaEnricher<T extends Number> extends AbstractTypeTra return; } - if (eventTime > lastTime) { - if (lastValue == null) { + if (eventTime > 0 && eventTime > lastTime) { + if (lastValue == null || lastTime <= 0) { // cannot calculate time-based delta with a single value if (LOG.isTraceEnabled()) LOG.trace("{} received event but no last value so will not emit, null -> {} at {}", new Object[] {this, current, eventTime}); } else { @@ -116,6 +115,9 @@ public class TimeWeightedDeltaEnricher<T extends Number> extends AbstractTypeTra } lastValue = current; lastTime = eventTime; + } else if (lastTime<0) { + lastValue = current; + lastTime = -1; } } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java b/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java index a733fb6..a5f0ded 100644 --- a/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java +++ b/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java @@ -690,7 +690,7 @@ public class AutoScalerPolicy extends AbstractPolicy { unboundedSize = (int)Math.ceil(currentTotalActivity/metricUpperBoundD); desiredSize = toBoundedDesiredPoolSize(unboundedSize); if (desiredSize > currentSize) { - if (LOG.isTraceEnabled()) LOG.trace("{} resizing out pool {} from {} to {} ({} > {})", new Object[] {this, poolEntity, currentSize, desiredSize, currentMetricD, metricUpperBoundD}); + if (LOG.isDebugEnabled()) LOG.debug("{} provisionally resizing out pool {} from {} to {} ({} > {})", new Object[] {this, poolEntity, currentSize, desiredSize, currentMetricD, metricUpperBoundD}); scheduleResize(desiredSize); } else { if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} ({} > {} > {}, but scale-out blocked eg by bounds/check)", new Object[] {this, poolEntity, currentSize, currentMetricD, metricUpperBoundD, metricLowerBoundD}); @@ -708,7 +708,7 @@ public class AutoScalerPolicy extends AbstractPolicy { desiredSize = toBoundedDesiredPoolSize(desiredSize); } if (desiredSize < currentSize) { - if (LOG.isTraceEnabled()) LOG.trace("{} resizing back pool {} from {} to {} ({} < {})", new Object[] {this, poolEntity, currentSize, desiredSize, currentMetricD, metricLowerBoundD}); + if (LOG.isDebugEnabled()) LOG.debug("{} provisionally resizing back pool {} from {} to {} ({} < {})", new Object[] {this, poolEntity, currentSize, desiredSize, currentMetricD, metricLowerBoundD}); scheduleResize(desiredSize); } else { if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} ({} < {} < {}, but scale-back blocked eg by bounds/check)", new Object[] {this, poolEntity, currentSize, currentMetricD, metricLowerBoundD, metricUpperBoundD}); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/policy/src/test/java/brooklyn/enricher/RebindEnricherTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/brooklyn/enricher/RebindEnricherTest.java b/policy/src/test/java/brooklyn/enricher/RebindEnricherTest.java index bcfc651..dc08e19 100644 --- a/policy/src/test/java/brooklyn/enricher/RebindEnricherTest.java +++ b/policy/src/test/java/brooklyn/enricher/RebindEnricherTest.java @@ -35,6 +35,7 @@ import brooklyn.test.EntityTestUtils; import brooklyn.test.entity.TestApplication; import brooklyn.util.http.BetterMockWebServer; import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; import com.google.mockwebserver.MockResponse; @@ -108,6 +109,8 @@ public class RebindEnricherTest extends RebindTestFixtureWithApp { TestApplication newApp = rebind(); newApp.setAttribute(INT_METRIC, 10); + Time.sleep(Duration.millis(10)); + newApp.setAttribute(INT_METRIC, 10); EntityTestUtils.assertAttributeEqualsEventually(newApp, DOUBLE_METRIC, 10d); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/policy/src/test/java/brooklyn/enricher/RollingTimeWindowMeanEnricherTest.groovy ---------------------------------------------------------------------- diff --git a/policy/src/test/java/brooklyn/enricher/RollingTimeWindowMeanEnricherTest.groovy b/policy/src/test/java/brooklyn/enricher/RollingTimeWindowMeanEnricherTest.groovy index 1a54302..ae45081 100644 --- a/policy/src/test/java/brooklyn/enricher/RollingTimeWindowMeanEnricherTest.groovy +++ b/policy/src/test/java/brooklyn/enricher/RollingTimeWindowMeanEnricherTest.groovy @@ -93,9 +93,17 @@ class RollingTimeWindowMeanEnricherTest { } @Test - public void testSingleValueAverage() { + public void testSingleValueTimeAverage() { averager.onEvent(intSensor.newEvent(producer, 10), 1000) average = averager.getAverage(1000) + assertEquals(average.confidence, 0d) + } + + @Test + public void testTwoValueAverageForPeriod() { + averager.onEvent(intSensor.newEvent(producer, 10), 1000) + averager.onEvent(intSensor.newEvent(producer, 10), 2000) + average = averager.getAverage(2000) assertEquals(average.value, 10 /1d) assertEquals(average.confidence, 1d) }
