Repository: incubator-brooklyn Updated Branches: refs/heads/master b08fe4eca -> 45dd54940
yaml-friendly enrichers for delta and rolling avg Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/089fe862 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/089fe862 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/089fe862 Branch: refs/heads/master Commit: 089fe862677e5588b68b9c084fefe88fb2cc2a1d Parents: 78cbc22 Author: Alex Heneveld <[email protected]> Authored: Fri Jun 19 04:56:39 2015 -0700 Committer: Alex Heneveld <[email protected]> Committed: Wed Jun 24 00:40:32 2015 -0700 ---------------------------------------------------------------------- .../enricher/basic/AbstractTransformer.java | 106 +++++++++++ .../brooklyn/enricher/basic/Transformer.java | 64 +------ .../YamlRollingTimeWindowMeanEnricher.java | 178 +++++++++++++++++++ .../basic/YamlTimeWeightedDeltaEnricher.java | 81 +++++++++ .../YamlRollingTimeWindowMeanEnricherTest.java | 178 +++++++++++++++++++ .../YamlTimeWeightedDeltaEnricherTest.java | 107 +++++++++++ .../enricher/RollingTimeWindowMeanEnricher.java | 4 +- .../enricher/TimeWeightedDeltaEnricher.java | 3 + 8 files changed, 658 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/089fe862/core/src/main/java/brooklyn/enricher/basic/AbstractTransformer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/enricher/basic/AbstractTransformer.java b/core/src/main/java/brooklyn/enricher/basic/AbstractTransformer.java new file mode 100644 index 0000000..28a6de1 --- /dev/null +++ b/core/src/main/java/brooklyn/enricher/basic/AbstractTransformer.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.enricher.basic; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.EntityLocal; +import brooklyn.event.AttributeSensor; +import brooklyn.event.Sensor; +import brooklyn.event.SensorEvent; +import brooklyn.event.SensorEventListener; +import brooklyn.event.basic.BasicSensorEvent; +import brooklyn.util.collections.MutableSet; +import brooklyn.util.task.Tasks; +import brooklyn.util.time.Duration; + +import com.google.common.base.Function; +import com.google.common.reflect.TypeToken; + +@SuppressWarnings("serial") +public abstract class AbstractTransformer<T,U> extends AbstractEnricher implements SensorEventListener<T> { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractTransformer.class); + + public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer"); + + public static ConfigKey<Sensor<?>> SOURCE_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.sourceSensor"); + + public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor"); + + protected Entity producer; + protected Sensor<T> sourceSensor; + protected Sensor<U> targetSensor; + + public AbstractTransformer() { + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + + Function<SensorEvent<T>, U> transformation = getTransformation(); + this.producer = getConfig(PRODUCER) == null ? entity: getConfig(PRODUCER); + this.sourceSensor = (Sensor<T>) getRequiredConfig(SOURCE_SENSOR); + Sensor<?> targetSensorSpecified = getConfig(TARGET_SENSOR); + this.targetSensor = targetSensorSpecified!=null ? (Sensor<U>) targetSensorSpecified : (Sensor<U>) this.sourceSensor; + if (producer.equals(entity) && targetSensorSpecified==null) { + LOG.error("Refusing to add an enricher which reads and publishes on the same sensor: "+ + producer+"."+sourceSensor+" (computing "+transformation+")"); + // we don't throw because this error may manifest itself after a lengthy deployment, + // and failing it at that point simply because of an enricher is not very pleasant + // (at least not until we have good re-run support across the board) + return; + } + + subscribe(producer, sourceSensor, this); + + if (sourceSensor instanceof AttributeSensor) { + Object value = producer.getAttribute((AttributeSensor<?>)sourceSensor); + // TODO would be useful to have a convenience to "subscribeAndThenIfItIsAlreadySetRunItOnce" + if (value!=null) { + onEvent(new BasicSensorEvent(sourceSensor, producer, value, -1)); + } + } + } + + /** returns a function for transformation, for immediate use only (not for caching, as it may change) */ + protected abstract Function<SensorEvent<T>, U> getTransformation(); + + @Override + public void onEvent(SensorEvent<T> event) { + emit(targetSensor, compute(event)); + } + + protected Object compute(SensorEvent<T> event) { + // transformation is not going to change, but this design makes it easier to support changing config in future. + // if it's an efficiency hole we can switch to populate the transformation at start. + U result = getTransformation().apply(event); + if (LOG.isTraceEnabled()) + LOG.trace("Enricher "+this+" computed "+result+" from "+event); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/089fe862/core/src/main/java/brooklyn/enricher/basic/Transformer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/enricher/basic/Transformer.java b/core/src/main/java/brooklyn/enricher/basic/Transformer.java index c6c88a6..2fa85fe 100644 --- a/core/src/main/java/brooklyn/enricher/basic/Transformer.java +++ b/core/src/main/java/brooklyn/enricher/basic/Transformer.java @@ -24,14 +24,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import brooklyn.config.ConfigKey; -import brooklyn.entity.Entity; import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.EntityLocal; -import brooklyn.event.AttributeSensor; -import brooklyn.event.Sensor; import brooklyn.event.SensorEvent; -import brooklyn.event.SensorEventListener; -import brooklyn.event.basic.BasicSensorEvent; import brooklyn.util.collections.MutableSet; import brooklyn.util.task.Tasks; import brooklyn.util.time.Duration; @@ -41,7 +35,7 @@ import com.google.common.reflect.TypeToken; //@Catalog(name="Transformer", description="Transforms attributes of an entity; see Enrichers.builder().transforming(...)") @SuppressWarnings("serial") -public class Transformer<T,U> extends AbstractEnricher implements SensorEventListener<T> { +public class Transformer<T,U> extends AbstractTransformer<T,U> { private static final Logger LOG = LoggerFactory.getLogger(Transformer.class); @@ -50,50 +44,11 @@ public class Transformer<T,U> extends AbstractEnricher implements SensorEventLis public static ConfigKey<Function<?, ?>> TRANSFORMATION_FROM_VALUE = ConfigKeys.newConfigKey(new TypeToken<Function<?, ?>>() {}, "enricher.transformation"); public static ConfigKey<Function<?, ?>> TRANSFORMATION_FROM_EVENT = ConfigKeys.newConfigKey(new TypeToken<Function<?, ?>>() {}, "enricher.transformation.fromevent"); - public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer"); - - public static ConfigKey<Sensor<?>> SOURCE_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.sourceSensor"); - - public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor"); - - protected Entity producer; - protected Sensor<T> sourceSensor; - protected Sensor<U> targetSensor; - public Transformer() { } - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - public void setEntity(EntityLocal entity) { - super.setEntity(entity); - - Function<SensorEvent<T>, U> transformation = getTransformation(); - this.producer = getConfig(PRODUCER) == null ? entity: getConfig(PRODUCER); - this.sourceSensor = (Sensor<T>) getRequiredConfig(SOURCE_SENSOR); - Sensor<?> targetSensorSpecified = getConfig(TARGET_SENSOR); - this.targetSensor = targetSensorSpecified!=null ? (Sensor<U>) targetSensorSpecified : (Sensor<U>) this.sourceSensor; - if (producer.equals(entity) && targetSensorSpecified==null) { - LOG.error("Refusing to add an enricher which reads and publishes on the same sensor: "+ - producer+"."+sourceSensor+" (computing "+transformation+")"); - // we don't throw because this error may manifest itself after a lengthy deployment, - // and failing it at that point simply because of an enricher is not very pleasant - // (at least not until we have good re-run support across the board) - return; - } - - subscribe(producer, sourceSensor, this); - - if (sourceSensor instanceof AttributeSensor) { - Object value = producer.getAttribute((AttributeSensor<?>)sourceSensor); - // TODO would be useful to have a convenience to "subscribeAndThenIfItIsAlreadySetRunItOnce" - if (value!=null) { - onEvent(new BasicSensorEvent(sourceSensor, producer, value, -1)); - } - } - } - /** returns a function for transformation, for immediate use only (not for caching, as it may change) */ + @Override @SuppressWarnings("unchecked") protected Function<SensorEvent<T>, U> getTransformation() { MutableSet<Object> suppliers = MutableSet.of(); @@ -144,18 +99,5 @@ public class Transformer<T,U> extends AbstractEnricher implements SensorEventLis } }; } - - @Override - public void onEvent(SensorEvent<T> event) { - emit(targetSensor, compute(event)); - } - - protected Object compute(SensorEvent<T> event) { - // transformation is not going to change, but this design makes it easier to support changing config in future. - // if it's an efficiency hole we can switch to populate the transformation at start. - U result = getTransformation().apply(event); - if (LOG.isTraceEnabled()) - LOG.trace("Enricher "+this+" computed "+result+" from "+event); - return result; - } + } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/089fe862/core/src/main/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricher.java b/core/src/main/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricher.java new file mode 100644 index 0000000..64333d4 --- /dev/null +++ b/core/src/main/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricher.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.enricher.basic; + +import java.util.Iterator; +import java.util.LinkedList; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.event.Sensor; +import brooklyn.event.SensorEvent; +import brooklyn.util.time.Duration; + +import com.google.common.base.Function; + +/** + * Transforms {@link Sensor} data into a rolling average based on a time window. + * + * All values within the window are weighted or discarded based on the timestamps associated with + * them (discards occur when a new value is added or an average is requested) + * <p> + * This will not extrapolate figures - it is assumed a value is valid and correct for the entire + * time period between it and the previous value. Normally, the average attribute is only updated + * when a new value arrives so it can give a fully informed average, but there is a danger of this + * going stale. + * <p> + * When an average is requested, it is likely there will be a segment of the window for which there + * isn't a value. Instead of extrapolating a value and providing different extrapolation techniques, + * the average is reported with a confidence value which reflects the fraction of the time + * window for which the values were valid. + * <p> + * Consumers of the average may ignore the confidence value and just use the last known average. + * They could multiply the returned value by the confidence value to get a decay-type behavior as + * the window empties. A third alternative is to, at a certain confidence threshold, report that + * the average is no longer meaningful. + * <p> + * The default average when no data has been received is 0, with a confidence of 0 + */ +public class YamlRollingTimeWindowMeanEnricher<T extends Number> extends AbstractTransformer<T,Double> { + + public static ConfigKey<Duration> WINDOW_DURATION = ConfigKeys.newConfigKey(Duration.class, "enricher.window.duration", + "Duration for which this window should store data, default one minute", Duration.ONE_MINUTE); + + public static ConfigKey<Double> CONFIDENCE_REQUIRED_TO_PUBLISH = ConfigKeys.newDoubleConfigKey("enricher.window.confidenceRequired", + "Minimum confidence level (ie period covered) required to publish a rolling average", 0.8d); + + public static class ConfidenceQualifiedNumber { + final Double value; + final double confidence; + + public ConfidenceQualifiedNumber(Double value, double confidence) { + this.value = value; + this.confidence = confidence; + } + + @Override + public String toString() { + return ""+value+" ("+(int)(confidence*100)+"%)"; + } + + } + + private final LinkedList<T> values = new LinkedList<T>(); + private final LinkedList<Long> timestamps = new LinkedList<Long>(); + volatile ConfidenceQualifiedNumber lastAverage = new ConfidenceQualifiedNumber(0d,0d); + + @Override + protected Function<SensorEvent<T>, Double> getTransformation() { + return new Function<SensorEvent<T>, Double>() { + @Override + public Double apply(SensorEvent<T> event) { + long eventTime = event.getTimestamp(); + if (event.getValue()==null) { + return null; + } + values.addLast(event.getValue()); + timestamps.addLast(eventTime); + if (eventTime>0) { + ConfidenceQualifiedNumber average = getAverage(eventTime, 0); + + if (average.confidence > getConfig(CONFIDENCE_REQUIRED_TO_PUBLISH)) { + // without confidence, we might publish wildly varying estimates, + // causing spurious resizes, so allow it to be configured, and + // by default require a high value + + // TODO would be nice to include timestamp, etc + return average.value; + } + } + return null; + } + }; + } + + public ConfidenceQualifiedNumber getAverage(long fromTime, long graceAllowed) { + if (timestamps.isEmpty()) { + return lastAverage = new ConfidenceQualifiedNumber(lastAverage.value, 0.0d); + } + + long firstTimestamp = -1; + Iterator<Long> ti = timestamps.iterator(); + while (ti.hasNext()) { + firstTimestamp = ti.next(); + if (firstTimestamp>0) break; + } + if (firstTimestamp<=0) { + // no values with reasonable timestamps + return lastAverage = new ConfidenceQualifiedNumber(values.get(values.size()-1).doubleValue(), 0.0d); + } + + long lastTimestamp = timestamps.get(timestamps.size()-1); + + long now = fromTime; + if (lastTimestamp > fromTime - graceAllowed) { + // without this, if the computation takes place X seconds after the publish, + // we treat X seconds as time for which we have no confidence in the data + now = lastTimestamp; + } + pruneValues(now); + + Duration timePeriod = getConfig(WINDOW_DURATION); + long windowStart = Math.max(now-timePeriod.toMilliseconds(), firstTimestamp); + long windowEnd = Math.max(now-timePeriod.toMilliseconds(), lastTimestamp); + Double confidence = ((double)(windowEnd - windowStart)) / timePeriod.toMilliseconds(); + if (confidence <= 0.0000001d) { + // not enough timestamps in window + double lastValue = values.get(values.size()-1).doubleValue(); + return lastAverage = new ConfidenceQualifiedNumber(lastValue, 0.0d); + } + + long start = windowStart; + long end; + double weightedAverage = 0.0d; + + Iterator<T> valuesIter = values.iterator(); + Iterator<Long> timestampsIter = timestamps.iterator(); + while (valuesIter.hasNext()) { + // Ignores null and out-of-date values (and also values that are received out-of-order, but that shouldn't happen!) + Number val = valuesIter.next(); + Long timestamp = timestampsIter.next(); + if (val!=null && timestamp >= start) { + end = timestamp; + weightedAverage += ((end - start) / (confidence * timePeriod.toMilliseconds())) * val.doubleValue(); + start = timestamp; + } + } + + return lastAverage = new ConfidenceQualifiedNumber(weightedAverage, confidence); + } + + /** + * Discards out-of-date values, but keeps at least one value. + */ + private void pruneValues(long now) { + // keep one value from before the period, so that we can tell the window's start time + Duration timePeriod = getConfig(WINDOW_DURATION); + while(timestamps.size() > 1 && timestamps.get(1) < (now - timePeriod.toMilliseconds())) { + timestamps.removeFirst(); + values.removeFirst(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/089fe862/core/src/main/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricher.java b/core/src/main/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricher.java new file mode 100644 index 0000000..b515da4 --- /dev/null +++ b/core/src/main/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricher.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.enricher.basic; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.ConfigKey; +import brooklyn.enricher.basic.AbstractTransformer; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.event.SensorEvent; +import brooklyn.util.flags.TypeCoercions; +import brooklyn.util.time.Duration; + +import com.google.common.base.Function; + +/** + * Converts an absolute count sensor into a delta sensor (i.e. the diff between the current and previous value), + * presented as a units/timeUnit based on the event timing. + * <p> + * For example, given a requests.count sensor, this can make a requests.per_sec sensor with {@link #DELTA_PERIOD} set to "1s" (the default). + * <p> + * Suitable for configuration from YAML. + */ +public class YamlTimeWeightedDeltaEnricher<T extends Number> extends AbstractTransformer<T,Double> { + private static final Logger LOG = LoggerFactory.getLogger(YamlTimeWeightedDeltaEnricher.class); + + Number lastValue; + long lastTime = -1; + + public static ConfigKey<Duration> DELTA_PERIOD = ConfigKeys.newConfigKey(Duration.class, "enricher.delta.period", + "Duration that this delta should compute for, default per second", Duration.ONE_SECOND); + + @Override + protected Function<SensorEvent<T>, Double> getTransformation() { + return new Function<SensorEvent<T>, Double>() { + @Override + public Double apply(SensorEvent<T> event) { + Number current = TypeCoercions.coerce(event.getValue(), Double.class); + + if (current == null) return null; + + long eventTime = event.getTimestamp(); + long unitMillis = getConfig(DELTA_PERIOD).toMilliseconds(); + Double result = null; + + if (eventTime > 0 && eventTime > lastTime) { + if (lastValue == null || lastTime < 0) { + // cannot calculate time-based delta with a single value + if (LOG.isTraceEnabled()) LOG.trace("{} received event but no last value so will not emit, null -> {} at {}", new Object[] {this, current, eventTime}); + } else { + double duration = eventTime - lastTime; + result = (current.doubleValue() - lastValue.doubleValue()) / (duration / unitMillis); + } + } + + lastValue = current; + lastTime = eventTime; + + return result; + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/089fe862/core/src/test/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricherTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricherTest.java b/core/src/test/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricherTest.java new file mode 100644 index 0000000..45b7ec3 --- /dev/null +++ b/core/src/test/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricherTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.enricher.basic; + +import static org.testng.Assert.assertEquals; + +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.enricher.basic.YamlRollingTimeWindowMeanEnricher.ConfidenceQualifiedNumber; +import brooklyn.entity.basic.AbstractApplication; +import brooklyn.entity.basic.BasicEntity; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensor; +import brooklyn.event.basic.BasicSensorEvent; +import brooklyn.management.SubscriptionContext; +import brooklyn.policy.EnricherSpec; +import brooklyn.util.time.Duration; + +public class YamlRollingTimeWindowMeanEnricherTest { + + AbstractApplication app; + + BasicEntity producer; + + AttributeSensor<Integer> intSensor; + AttributeSensor<Double> avgSensor, deltaSensor; + + Duration timePeriod = Duration.ONE_SECOND; + + YamlTimeWeightedDeltaEnricher<Double> delta; + YamlRollingTimeWindowMeanEnricher<Double> averager; + + ConfidenceQualifiedNumber average; + SubscriptionContext subscription; + + @SuppressWarnings("unchecked") + @BeforeMethod + public void before() { + app = new AbstractApplication() {}; + Entities.startManagement(app); + producer = app.addChild(EntitySpec.create(BasicEntity.class)); + + intSensor = new BasicAttributeSensor<Integer>(Integer.class, "int sensor"); + deltaSensor = new BasicAttributeSensor<Double>(Double.class, "delta sensor"); + avgSensor = new BasicAttributeSensor<Double>(Double.class, "avg sensor"); + + delta = producer.addEnricher(EnricherSpec.create(YamlTimeWeightedDeltaEnricher.class) + .configure(YamlTimeWeightedDeltaEnricher.PRODUCER, producer) + .configure(YamlTimeWeightedDeltaEnricher.SOURCE_SENSOR, intSensor) + .configure(YamlTimeWeightedDeltaEnricher.TARGET_SENSOR, deltaSensor)); + + averager = producer.addEnricher(EnricherSpec.create(YamlRollingTimeWindowMeanEnricher.class) + .configure(YamlRollingTimeWindowMeanEnricher.PRODUCER, producer) + .configure(YamlRollingTimeWindowMeanEnricher.SOURCE_SENSOR, deltaSensor) + .configure(YamlRollingTimeWindowMeanEnricher.TARGET_SENSOR, avgSensor) + .configure(YamlRollingTimeWindowMeanEnricher.WINDOW_DURATION, timePeriod)); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (app != null) Entities.destroyAll(app.getManagementContext()); + } + + @Test + public void testDefaultAverageWhenEmpty() { + ConfidenceQualifiedNumber average = averager.getAverage(0, 0); + assertEquals(average.value, 0d); + assertEquals(average.confidence, 0.0d); + } + + protected BasicSensorEvent<Integer> newIntSensorEvent(int value, long timestamp) { + return new BasicSensorEvent<Integer>(intSensor, producer, value, timestamp); + } + protected BasicSensorEvent<Double> newDeltaSensorEvent(double value, long timestamp) { + return new BasicSensorEvent<Double>(deltaSensor, producer, value, timestamp); + } + + @Test + public void testNoRecentValuesAverage() { + averager.onEvent(newDeltaSensorEvent(10, 0)); + average = averager.getAverage(timePeriod.toMilliseconds()+1000, 0); + assertEquals(average.value, 10d); + assertEquals(average.confidence, 0d); + } + + @Test + public void testNoRecentValuesUsesLastForAverage() { + averager.onEvent(newDeltaSensorEvent(10, 0)); + averager.onEvent(newDeltaSensorEvent(20, 10)); + average = averager.getAverage(timePeriod.toMilliseconds()+1000, 0); + assertEquals(average.value, 20d); + assertEquals(average.confidence, 0d); + } + + @Test + public void testSingleValueTimeAverage() { + averager.onEvent(newDeltaSensorEvent(10, 1000)); + average = averager.getAverage(1000, 0); + assertEquals(average.confidence, 0d); + } + + @Test + public void testTwoValueAverageForPeriod() { + averager.onEvent(newDeltaSensorEvent(10, 1000)); + averager.onEvent(newDeltaSensorEvent(10, 2000)); + average = averager.getAverage(2000, 0); + assertEquals(average.value, 10 /1d); + assertEquals(average.confidence, 1d); + } + + @Test + public void testMonospacedAverage() { + averager.onEvent(newDeltaSensorEvent(10, 1000)); + averager.onEvent(newDeltaSensorEvent(20, 1250)); + averager.onEvent(newDeltaSensorEvent(30, 1500)); + averager.onEvent(newDeltaSensorEvent(40, 1750)); + averager.onEvent(newDeltaSensorEvent(50, 2000)); + average = averager.getAverage(2000, 0); + assertEquals(average.value, (20+30+40+50)/4d); + assertEquals(average.confidence, 1d); + } + + @Test + public void testWeightedAverage() { + averager.onEvent(newDeltaSensorEvent(10, 1000)); + averager.onEvent(newDeltaSensorEvent(20, 1100)); + averager.onEvent(newDeltaSensorEvent(30, 1300)); + averager.onEvent(newDeltaSensorEvent(40, 1600)); + averager.onEvent(newDeltaSensorEvent(50, 2000)); + + average = averager.getAverage(2000, 0); + assertEquals(average.value, (20*0.1d)+(30*0.2d)+(40*0.3d)+(50*0.4d)); + assertEquals(average.confidence, 1d); + } + + @Test + public void testConfidenceDecay() { + averager.onEvent(newDeltaSensorEvent(10, 1000)); + averager.onEvent(newDeltaSensorEvent(20, 1250)); + averager.onEvent(newDeltaSensorEvent(30, 1500)); + averager.onEvent(newDeltaSensorEvent(40, 1750)); + averager.onEvent(newDeltaSensorEvent(50, 2000)); + + average = averager.getAverage(2250, 0); + assertEquals(average.value, (30+40+50)/3d); + assertEquals(average.confidence, 0.75d); + average = averager.getAverage(2500, 0); + assertEquals(average.value, (40+50)/2d); + assertEquals(average.confidence, 0.5d); + average = averager.getAverage(2750, 0); + assertEquals(average.value, 50d); + assertEquals(average.confidence, 0.25d); + average = averager.getAverage(3000, 0); + assertEquals(average.value, 50d); + assertEquals(average.confidence, 0d); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/089fe862/core/src/test/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricherTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricherTest.java b/core/src/test/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricherTest.java new file mode 100644 index 0000000..2a7a974 --- /dev/null +++ b/core/src/test/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricherTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.enricher.basic; + +import static org.testng.Assert.assertEquals; + +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.AbstractApplication; +import brooklyn.entity.basic.BasicEntity; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensor; +import brooklyn.event.basic.BasicSensorEvent; +import brooklyn.management.SubscriptionContext; +import brooklyn.policy.EnricherSpec; + +public class YamlTimeWeightedDeltaEnricherTest { + + AbstractApplication app; + + BasicEntity producer; + + AttributeSensor<Integer> intSensor; + AttributeSensor<Double> avgSensor, deltaSensor; + SubscriptionContext subscription; + + @BeforeMethod + public void before() { + app = new AbstractApplication() {}; + Entities.startManagement(app); + producer = app.addChild(EntitySpec.create(BasicEntity.class)); + + intSensor = new BasicAttributeSensor<Integer>(Integer.class, "int sensor"); + deltaSensor = new BasicAttributeSensor<Double>(Double.class, "delta sensor"); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (app != null) Entities.destroyAll(app.getManagementContext()); + } + + @Test + public void testMonospaceTimeWeightedDeltaEnricher() { + @SuppressWarnings("unchecked") + YamlTimeWeightedDeltaEnricher<Integer> delta = producer.addEnricher(EnricherSpec.create(YamlTimeWeightedDeltaEnricher.class) + .configure(YamlTimeWeightedDeltaEnricher.PRODUCER, producer) + .configure(YamlTimeWeightedDeltaEnricher.SOURCE_SENSOR, intSensor) + .configure(YamlTimeWeightedDeltaEnricher.TARGET_SENSOR, deltaSensor)); + + delta.onEvent(newIntSensorEvent(0, 0)); + assertEquals(producer.getAttribute(deltaSensor), null); + delta.onEvent(newIntSensorEvent(0, 1000)); + assertEquals(producer.getAttribute(deltaSensor), 0d); + delta.onEvent(newIntSensorEvent(1, 2000)); + assertEquals(producer.getAttribute(deltaSensor), 1d); + delta.onEvent(newIntSensorEvent(3, 3000)); + assertEquals(producer.getAttribute(deltaSensor), 2d); + delta.onEvent(newIntSensorEvent(8, 4000)); + assertEquals(producer.getAttribute(deltaSensor), 5d); + } + + protected BasicSensorEvent<Integer> newIntSensorEvent(int value, long timestamp) { + return new BasicSensorEvent<Integer>(intSensor, producer, value, timestamp); + } + + @Test + public void testVariableTimeWeightedDeltaEnricher() { + @SuppressWarnings("unchecked") + YamlTimeWeightedDeltaEnricher<Integer> delta = producer.addEnricher(EnricherSpec.create(YamlTimeWeightedDeltaEnricher.class) + .configure(YamlTimeWeightedDeltaEnricher.PRODUCER, producer) + .configure(YamlTimeWeightedDeltaEnricher.SOURCE_SENSOR, intSensor) + .configure(YamlTimeWeightedDeltaEnricher.TARGET_SENSOR, deltaSensor)); + + delta.onEvent(newIntSensorEvent(0, 0)); + delta.onEvent(newIntSensorEvent(0, 2000)); + assertEquals(producer.getAttribute(deltaSensor), 0d); + delta.onEvent(newIntSensorEvent(3, 5000)); + assertEquals(producer.getAttribute(deltaSensor), 1d); + delta.onEvent(newIntSensorEvent(7, 7000)); + assertEquals(producer.getAttribute(deltaSensor), 2d); + delta.onEvent(newIntSensorEvent(12, 7500)); + assertEquals(producer.getAttribute(deltaSensor), 10d); + delta.onEvent(newIntSensorEvent(15, 9500)); + assertEquals(producer.getAttribute(deltaSensor), 1.5d); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/089fe862/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java b/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java index 694977c..7c55a81 100644 --- a/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java +++ b/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java @@ -128,12 +128,12 @@ public class RollingTimeWindowMeanEnricher<T extends Number> extends AbstractTyp } } - @Deprecated /** @deprecatedsince 0.7.0; not used; use the 2-arg method */ + @Deprecated /** @deprecated since 0.7.0; not used except in groovy tests; use the 2-arg method */ public ConfidenceQualifiedNumber getAverage() { return getAverage(System.currentTimeMillis(), 0); } - @Deprecated /** @deprecated since 0.7.0; not used; use the 2-arg method */ + @Deprecated /** @deprecated since 0.7.0; not used except in groovy tests; use the 2-arg method */ public ConfidenceQualifiedNumber getAverage(long fromTimeExact) { return getAverage(fromTimeExact, 0); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/089fe862/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java b/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java index b746edd..42e418f 100644 --- a/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java +++ b/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import brooklyn.enricher.basic.AbstractTypeTransformingEnricher; +import brooklyn.enricher.basic.YamlTimeWeightedDeltaEnricher; import brooklyn.entity.Entity; import brooklyn.event.AttributeSensor; import brooklyn.event.Sensor; @@ -41,6 +42,8 @@ import com.google.common.base.Functions; * presented as a units/timeUnit based on the event timing. * <p> * NB for time (e.g. "total milliseconds consumed") use {@link TimeFractionDeltaEnricher} + * <p> + * See also {@link YamlTimeWeightedDeltaEnricher} designed for use from YAML. */ //@Catalog(name="Time-weighted Delta", description="Converts an absolute sensor into a delta sensor " // + "(i.e. the diff between the current and previous value), presented as a units/timeUnit "
