Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master f2de6ecfb -> 692eb0c87


fix time-sensitive sensor derivation when rebinding to old sensors

before this, autoscaler might scale out on rebind, because the
last total reqs might be 100, if data is v old, current value might be 10000;
a recent timestamp is attached to initial value, meaning it computes 9900 new 
reqs in a 1s window.

this forces an invalid timestamp for enrichers, and time-based sensors are 
stricter
(configurable) about how they see windows and exclude timestamps.

this may change behaviour in some places, particularly where sensors were being 
fired based on incomplete or stale data.

better fix would be to store timestamp on sensors themselves


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/87604e28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/87604e28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/87604e28

Branch: refs/heads/master
Commit: 87604e28cb6f3bb860a051aef579361d4ee7b2ef
Parents: 8c5dc9d
Author: Alex Heneveld <[email protected]>
Authored: Mon Feb 2 12:37:55 2015 +0000
Committer: Alex Heneveld <[email protected]>
Committed: Mon Feb 2 17:31:50 2015 +0000

----------------------------------------------------------------------
 .../basic/AbstractTypeTransformingEnricher.java |  2 +-
 .../brooklyn/enricher/basic/AddingEnricher.java |  2 +-
 .../java/brooklyn/enricher/basic/Combiner.java  |  2 +-
 .../brooklyn/enricher/basic/Transformer.java    |  2 +-
 .../brooklyn/event/basic/BasicSensorEvent.java  |  9 +--
 .../enricher/RollingTimeWindowMeanEnricher.java | 83 ++++++++++++++++----
 .../enricher/TimeWeightedDeltaEnricher.java     |  8 +-
 .../policy/autoscaling/AutoScalerPolicy.java    |  4 +-
 .../brooklyn/enricher/RebindEnricherTest.java   |  3 +
 .../RollingTimeWindowMeanEnricherTest.groovy    | 10 ++-
 10 files changed, 94 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/core/src/main/java/brooklyn/enricher/basic/AbstractTypeTransformingEnricher.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/brooklyn/enricher/basic/AbstractTypeTransformingEnricher.java
 
b/core/src/main/java/brooklyn/enricher/basic/AbstractTypeTransformingEnricher.java
index 27eac93..ab31ebc 100644
--- 
a/core/src/main/java/brooklyn/enricher/basic/AbstractTypeTransformingEnricher.java
+++ 
b/core/src/main/java/brooklyn/enricher/basic/AbstractTypeTransformingEnricher.java
@@ -61,7 +61,7 @@ public abstract class AbstractTypeTransformingEnricher<T,U> 
extends AbstractEnri
             Object value = producer.getAttribute((AttributeSensor)source);
             // TODO Aled didn't you write a convenience to 
"subscribeAndRunIfSet" ? (-Alex)
             if (value!=null)
-                onEvent(new BasicSensorEvent(source, producer, value));
+                onEvent(new BasicSensorEvent(source, producer, value, -1));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/core/src/main/java/brooklyn/enricher/basic/AddingEnricher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/enricher/basic/AddingEnricher.java 
b/core/src/main/java/brooklyn/enricher/basic/AddingEnricher.java
index 11254bf..868240d 100644
--- a/core/src/main/java/brooklyn/enricher/basic/AddingEnricher.java
+++ b/core/src/main/java/brooklyn/enricher/basic/AddingEnricher.java
@@ -61,7 +61,7 @@ public class AddingEnricher extends AbstractEnricher 
implements SensorEventListe
             if (source instanceof AttributeSensor) {
                 Object value = entity.getAttribute((AttributeSensor)source);
                 if (value!=null)
-                    onEvent(new BasicSensorEvent(source, entity, value));
+                    onEvent(new BasicSensorEvent(source, entity, value, -1));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/core/src/main/java/brooklyn/enricher/basic/Combiner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/enricher/basic/Combiner.java 
b/core/src/main/java/brooklyn/enricher/basic/Combiner.java
index cd410a8..cc37d8c 100644
--- a/core/src/main/java/brooklyn/enricher/basic/Combiner.java
+++ b/core/src/main/java/brooklyn/enricher/basic/Combiner.java
@@ -103,7 +103,7 @@ public class Combiner<T,U> extends AbstractEnricher 
implements SensorEventListen
                 // TODO Aled didn't you write a convenience to 
"subscribeAndRunIfSet" ? (-Alex)
                 //      Unfortunately not yet!
                 if (value != null) {
-                    onEvent(new BasicSensorEvent(sourceSensor, producer, 
value));
+                    onEvent(new BasicSensorEvent(sourceSensor, producer, 
value, -1));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/core/src/main/java/brooklyn/enricher/basic/Transformer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/enricher/basic/Transformer.java 
b/core/src/main/java/brooklyn/enricher/basic/Transformer.java
index d8244c3..d126307 100644
--- a/core/src/main/java/brooklyn/enricher/basic/Transformer.java
+++ b/core/src/main/java/brooklyn/enricher/basic/Transformer.java
@@ -98,7 +98,7 @@ public class Transformer<T,U> extends AbstractEnricher 
implements SensorEventLis
             Object value = 
producer.getAttribute((AttributeSensor<?>)sourceSensor);
             // TODO would be useful to have a convenience to 
"subscribeAndThenIfItIsAlreadySetRunItOnce"
             if (value!=null) {
-                onEvent(new BasicSensorEvent(sourceSensor, producer, value));
+                onEvent(new BasicSensorEvent(sourceSensor, producer, value, 
-1));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/core/src/main/java/brooklyn/event/basic/BasicSensorEvent.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/event/basic/BasicSensorEvent.java 
b/core/src/main/java/brooklyn/event/basic/BasicSensorEvent.java
index da912a8..b571eb7 100644
--- a/core/src/main/java/brooklyn/event/basic/BasicSensorEvent.java
+++ b/core/src/main/java/brooklyn/event/basic/BasicSensorEvent.java
@@ -51,19 +51,14 @@ public class BasicSensorEvent<T> implements SensorEvent<T> {
 
     /** arguments should not be null (except in certain limited testing 
situations) */
     public BasicSensorEvent(Sensor<T> sensor, Entity source, T value) {
-        this(sensor, source, value, 0);
+        this(sensor, source, value, System.currentTimeMillis());
     }
     
     public BasicSensorEvent(Sensor<T> sensor, Entity source, T value, long 
timestamp) {
         this.sensor = sensor;
         this.source = source;
         this.value = value;
-
-        if (timestamp > 0) {
-            this.timestamp = timestamp;
-        } else {
-            this.timestamp = System.currentTimeMillis();
-        }
+        this.timestamp = timestamp;
     }
     
     public static <T> SensorEvent<T> of(Sensor<T> sensor, Entity source, T 
value, long timestamp) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java 
b/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java
index ea70445..3c9247b 100644
--- a/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java
+++ b/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java
@@ -21,11 +21,10 @@ package brooklyn.enricher;
 import java.util.Iterator;
 import java.util.LinkedList;
 
-import com.google.common.base.Preconditions;
-
-import brooklyn.catalog.Catalog;
+import brooklyn.config.ConfigKey;
 import brooklyn.enricher.basic.AbstractTypeTransformingEnricher;
 import brooklyn.entity.Entity;
+import brooklyn.entity.basic.ConfigKeys;
 import brooklyn.event.AttributeSensor;
 import brooklyn.event.Sensor;
 import brooklyn.event.SensorEvent;
@@ -33,6 +32,8 @@ import brooklyn.util.flags.SetFromFlag;
 import brooklyn.util.javalang.JavaClassNames;
 import brooklyn.util.time.Duration;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Transforms {@link Sensor} data into a rolling average based on a time 
window.
  * 
@@ -59,6 +60,15 @@ import brooklyn.util.time.Duration;
 //@Catalog(name="Rolling Mean in Time Window", description="Transforms a 
sensor's data into a rolling average "
 //        + "based on a time window.")
 public class RollingTimeWindowMeanEnricher<T extends Number> extends 
AbstractTypeTransformingEnricher<T,Double> {
+    
+    public static ConfigKey<Double> CONFIDENCE_REQUIRED_TO_PUBLISH = 
ConfigKeys.newDoubleConfigKey("confidenceRequired",
+        "Minimum confidence level (ie period covered) required to publish a 
rolling average", 0.8d);
+
+    // without this, we will refuse to publish if the server time differs from 
the publisher time (in a distributed setup);
+    // also we won't publish if a lot of time is spent actually doing the 
computation
+    public static ConfigKey<Duration> TIMESTAMP_GRACE_TIME = 
ConfigKeys.newConfigKey(Duration.class, "timestampGraceTime",
+        "When computing windowed average, allow this much slippage time 
between published metrics and local clock", Duration.millis(500));
+
     public static class ConfidenceQualifiedNumber {
         final Double value;
         final double confidence;
@@ -67,6 +77,12 @@ public class RollingTimeWindowMeanEnricher<T extends Number> 
extends AbstractTyp
             this.value = value;
             this.confidence = confidence;
         }
+        
+        @Override
+        public String toString() {
+            return ""+value+" ("+(int)(confidence*100)+"%)";
+        } 
+        
     }
     
     private final LinkedList<T> values = new LinkedList<T>();
@@ -103,31 +119,69 @@ public class RollingTimeWindowMeanEnricher<T extends 
Number> extends AbstractTyp
     public void onEvent(SensorEvent<T> event, long eventTime) {
         values.addLast(event.getValue());
         timestamps.addLast(eventTime);
-        pruneValues(eventTime);
-        entity.setAttribute((AttributeSensor<Double>)target, 
getAverage(eventTime).value); //TODO this can potentially go stale... maybe we 
need to timestamp as well?
+        if (eventTime>0) {
+            ConfidenceQualifiedNumber average = getAverage(eventTime, 0);
+
+            if (average.confidence > 
getConfig(CONFIDENCE_REQUIRED_TO_PUBLISH)) { 
+                // without confidence, we might publish wildly varying 
estimates,
+                // causing spurious resizes, so allow it to be configured, and
+                // by default require a high value
+
+                // TODO would be nice to include timestamp, etc
+                entity.setAttribute((AttributeSensor<Double>)target, 
average.value); 
+            }
+        }
     }
     
     public ConfidenceQualifiedNumber getAverage() {
-        return getAverage(System.currentTimeMillis());
+        return getAverage(System.currentTimeMillis(), 
getConfig(TIMESTAMP_GRACE_TIME).toMilliseconds());
     }
     
-    public ConfidenceQualifiedNumber getAverage(long now) {
-        pruneValues(now);
+    public ConfidenceQualifiedNumber getAverage(long fromTimeExact) {
+        return getAverage(fromTimeExact, 0);
+    }
+    
+    public ConfidenceQualifiedNumber getAverage(long fromTime, long 
graceAllowed) {
         if (timestamps.isEmpty()) {
             return lastAverage = new 
ConfidenceQualifiedNumber(lastAverage.value, 0.0d);
         }
+        
+        // (previously there was an old comment here, pre-Jul-2014,  
+        // saying "grkvlt - see email to development list";
+        // but i can't find that email)
+        // some of the more recent confidence and bogus-timestamp + exclusion 
logic might fix this though
+        
+        long firstTimestamp = -1;
+        Iterator<Long> ti = timestamps.iterator();
+        while (ti.hasNext()) {
+            firstTimestamp = ti.next();
+            if (firstTimestamp>0) break;
+        }
+        if (firstTimestamp<=0) {
+            // no values with reasonable timestamps
+            return lastAverage = new 
ConfidenceQualifiedNumber(values.get(values.size()-1).doubleValue(), 0.0d);
+        }
 
-        // XXX grkvlt - see email to development list
+        long lastTimestamp = timestamps.get(timestamps.size()-1);
 
+        long now = fromTime;
+        if (lastTimestamp > fromTime - graceAllowed) {
+            // without this, if the computation takes place X seconds after 
the publish,
+            // we treat X seconds as time for which we have no confidence in 
the data
+            now = lastTimestamp;
+        }
+        pruneValues(now);
         
-        long lastTimestamp = timestamps.get(timestamps.size()-1);
-        Double confidence = ((double)(timePeriod.toMilliseconds() - (now - 
lastTimestamp))) / timePeriod.toMilliseconds();
-        if (confidence <= 0.0d) {
+        long windowStart = Math.max(now-timePeriod.toMilliseconds(), 
firstTimestamp);
+        long windowEnd = Math.max(now-timePeriod.toMilliseconds(), 
lastTimestamp);
+        Double confidence = ((double)(windowEnd - windowStart)) / 
timePeriod.toMilliseconds();
+        if (confidence <= 0.0000001d) {
+            // not enough timestamps in window 
             double lastValue = values.get(values.size()-1).doubleValue();
             return lastAverage = new ConfidenceQualifiedNumber(lastValue, 
0.0d);
         }
         
-        long start = (now - timePeriod.toMilliseconds());
+        long start = windowStart;
         long end;
         double weightedAverage = 0.0d;
         
@@ -151,7 +205,8 @@ public class RollingTimeWindowMeanEnricher<T extends 
Number> extends AbstractTyp
      * Discards out-of-date values, but keeps at least one value.
      */
     private void pruneValues(long now) {
-        while(timestamps.size() > 1 && timestamps.get(0) < (now - 
timePeriod.toMilliseconds())) {
+        // keep one value from before the period, so that we can tell the 
window's start time 
+        while(timestamps.size() > 1 && timestamps.get(1) < (now - 
timePeriod.toMilliseconds())) {
             timestamps.removeFirst();
             values.removeFirst();
         }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java 
b/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java
index 0d4ff84..b746edd 100644
--- a/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java
+++ b/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java
@@ -23,7 +23,6 @@ import groovy.lang.Closure;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import brooklyn.catalog.Catalog;
 import brooklyn.enricher.basic.AbstractTypeTransformingEnricher;
 import brooklyn.entity.Entity;
 import brooklyn.event.AttributeSensor;
@@ -102,8 +101,8 @@ public class TimeWeightedDeltaEnricher<T extends Number> 
extends AbstractTypeTra
             return;
         }
         
-        if (eventTime > lastTime) {
-            if (lastValue == null) {
+        if (eventTime > 0 && eventTime > lastTime) {
+            if (lastValue == null || lastTime <= 0) {
                 // cannot calculate time-based delta with a single value
                 if (LOG.isTraceEnabled()) LOG.trace("{} received event but no 
last value so will not emit, null -> {} at {}", new Object[] {this, current, 
eventTime}); 
             } else {
@@ -116,6 +115,9 @@ public class TimeWeightedDeltaEnricher<T extends Number> 
extends AbstractTypeTra
             }
             lastValue = current;
             lastTime = eventTime;
+        } else if (lastTime<0) {
+            lastValue = current;
+            lastTime = -1;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java 
b/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java
index a733fb6..a5f0ded 100644
--- a/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java
+++ b/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java
@@ -690,7 +690,7 @@ public class AutoScalerPolicy extends AbstractPolicy {
             unboundedSize = 
(int)Math.ceil(currentTotalActivity/metricUpperBoundD);
             desiredSize = toBoundedDesiredPoolSize(unboundedSize);
             if (desiredSize > currentSize) {
-                if (LOG.isTraceEnabled()) LOG.trace("{} resizing out pool {} 
from {} to {} ({} > {})", new Object[] {this, poolEntity, currentSize, 
desiredSize, currentMetricD, metricUpperBoundD});
+                if (LOG.isDebugEnabled()) LOG.debug("{} provisionally resizing 
out pool {} from {} to {} ({} > {})", new Object[] {this, poolEntity, 
currentSize, desiredSize, currentMetricD, metricUpperBoundD});
                 scheduleResize(desiredSize);
             } else {
                 if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} 
from {} ({} > {} > {}, but scale-out blocked eg by bounds/check)", new Object[] 
{this, poolEntity, currentSize, currentMetricD, metricUpperBoundD, 
metricLowerBoundD});
@@ -708,7 +708,7 @@ public class AutoScalerPolicy extends AbstractPolicy {
                 desiredSize = toBoundedDesiredPoolSize(desiredSize);
             }
             if (desiredSize < currentSize) {
-                if (LOG.isTraceEnabled()) LOG.trace("{} resizing back pool {} 
from {} to {} ({} < {})", new Object[] {this, poolEntity, currentSize, 
desiredSize, currentMetricD, metricLowerBoundD});
+                if (LOG.isDebugEnabled()) LOG.debug("{} provisionally resizing 
back pool {} from {} to {} ({} < {})", new Object[] {this, poolEntity, 
currentSize, desiredSize, currentMetricD, metricLowerBoundD});
                 scheduleResize(desiredSize);
             } else {
                 if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} 
from {} ({} < {} < {}, but scale-back blocked eg by bounds/check)", new 
Object[] {this, poolEntity, currentSize, currentMetricD, metricLowerBoundD, 
metricUpperBoundD});

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/policy/src/test/java/brooklyn/enricher/RebindEnricherTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/brooklyn/enricher/RebindEnricherTest.java 
b/policy/src/test/java/brooklyn/enricher/RebindEnricherTest.java
index bcfc651..dc08e19 100644
--- a/policy/src/test/java/brooklyn/enricher/RebindEnricherTest.java
+++ b/policy/src/test/java/brooklyn/enricher/RebindEnricherTest.java
@@ -35,6 +35,7 @@ import brooklyn.test.EntityTestUtils;
 import brooklyn.test.entity.TestApplication;
 import brooklyn.util.http.BetterMockWebServer;
 import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
 
 import com.google.mockwebserver.MockResponse;
 
@@ -108,6 +109,8 @@ public class RebindEnricherTest extends 
RebindTestFixtureWithApp {
         TestApplication newApp = rebind();
 
         newApp.setAttribute(INT_METRIC, 10);
+        Time.sleep(Duration.millis(10));
+        newApp.setAttribute(INT_METRIC, 10);
         EntityTestUtils.assertAttributeEqualsEventually(newApp, DOUBLE_METRIC, 
10d);
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/87604e28/policy/src/test/java/brooklyn/enricher/RollingTimeWindowMeanEnricherTest.groovy
----------------------------------------------------------------------
diff --git 
a/policy/src/test/java/brooklyn/enricher/RollingTimeWindowMeanEnricherTest.groovy
 
b/policy/src/test/java/brooklyn/enricher/RollingTimeWindowMeanEnricherTest.groovy
index 1a54302..ae45081 100644
--- 
a/policy/src/test/java/brooklyn/enricher/RollingTimeWindowMeanEnricherTest.groovy
+++ 
b/policy/src/test/java/brooklyn/enricher/RollingTimeWindowMeanEnricherTest.groovy
@@ -93,9 +93,17 @@ class RollingTimeWindowMeanEnricherTest {
     }
     
     @Test
-    public void testSingleValueAverage() {
+    public void testSingleValueTimeAverage() {
         averager.onEvent(intSensor.newEvent(producer, 10), 1000)
         average = averager.getAverage(1000)
+        assertEquals(average.confidence, 0d)
+    }
+    
+    @Test
+    public void testTwoValueAverageForPeriod() {
+        averager.onEvent(intSensor.newEvent(producer, 10), 1000)
+        averager.onEvent(intSensor.newEvent(producer, 10), 2000)
+        average = averager.getAverage(2000)
         assertEquals(average.value, 10 /1d)
         assertEquals(average.confidence, 1d)
     }

Reply via email to