Updated Branches: refs/heads/cassandra-2.0 c8915cea4 -> c3b7669d0 refs/heads/trunk 6d099b416 -> ee4b50ca8
add RestorableMeter Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c3b7669d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c3b7669d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c3b7669d Branch: refs/heads/cassandra-2.0 Commit: c3b7669d003cc18fbba5f8c9bd3258c624f54c4d Parents: c8915ce Author: Jonathan Ellis <jbel...@apache.org> Authored: Sat Sep 28 12:16:55 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Sat Sep 28 12:16:55 2013 -0500 ---------------------------------------------------------------------- .../cassandra/metrics/RestorableMeter.java | 185 +++++++++++++++++++ 1 file changed, 185 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3b7669d/src/java/org/apache/cassandra/metrics/RestorableMeter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/RestorableMeter.java b/src/java/org/apache/cassandra/metrics/RestorableMeter.java new file mode 100644 index 0000000..7da2ff9 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/RestorableMeter.java @@ -0,0 +1,185 @@ +package org.apache.cassandra.metrics; + +import com.yammer.metrics.core.Clock; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static java.lang.Math.exp; + +/** + * A meter metric which measures mean throughput as well as fifteen-minute and two-hour + * exponentially-weighted moving average throughputs. + * + * This is based heavily on the Meter and EWMA classes from codahale/yammer metrics. + * + * @see <a href="http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average">EMA</a> + */ +public class RestorableMeter +{ + private static final long TICK_INTERVAL = TimeUnit.SECONDS.toNanos(5); + private static final double NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1); + + private final RestorableEWMA m15Rate; + private final RestorableEWMA m120Rate; + + private final AtomicLong count = new AtomicLong(); + private final long startTime; + private final AtomicLong lastTick; + private final Clock clock = Clock.defaultClock(); + + /** + * Creates a new, uninitialized RestorableMeter. + */ + public RestorableMeter() { + this.m15Rate = new RestorableEWMA(TimeUnit.MINUTES.toSeconds(15)); + this.m120Rate = new RestorableEWMA(TimeUnit.MINUTES.toSeconds(120)); + this.startTime = this.clock.tick(); + this.lastTick = new AtomicLong(startTime); + } + + /** + * Restores a RestorableMeter from the last seen 15m and 2h rates. + * @param lastM15Rate the last-seen 15m rate, in terms of events per second + * @param lastM120Rate the last seen 2h rate, in terms of events per second + */ + public RestorableMeter(double lastM15Rate, double lastM120Rate) { + this.m15Rate = new RestorableEWMA(lastM15Rate, TimeUnit.MINUTES.toSeconds(15)); + this.m120Rate = new RestorableEWMA(lastM120Rate, TimeUnit.MINUTES.toSeconds(120)); + this.startTime = this.clock.tick(); + this.lastTick = new AtomicLong(startTime); + } + + /** + * Updates the moving averages as needed. + */ + private void tickIfNecessary() { + final long oldTick = lastTick.get(); + final long newTick = clock.tick(); + final long age = newTick - oldTick; + if (age > TICK_INTERVAL) { + final long newIntervalStartTick = newTick - age % TICK_INTERVAL; + if (lastTick.compareAndSet(oldTick, newIntervalStartTick)) { + final long requiredTicks = age / TICK_INTERVAL; + for (long i = 0; i < requiredTicks; i++) { + m15Rate.tick(); + m120Rate.tick(); + } + } + } + } + + /** + * Mark the occurrence of an event. + */ + public void mark() { + mark(1); + } + + /** + * Mark the occurrence of a given number of events. + * + * @param n the number of events + */ + public void mark(long n) { + tickIfNecessary(); + count.addAndGet(n); + m15Rate.update(n); + m120Rate.update(n); + } + + /** + * Returns the 15-minute rate in terms of events per second. This carries the previous rate when restored. + */ + public double fifteenMinuteRate() { + tickIfNecessary(); + return m15Rate.rate(); + } + + /** + * Returns the two-hour rate in terms of events per second. This carries the previous rate when restored. + */ + public double twoHourRate() { + tickIfNecessary(); + return m120Rate.rate(); + } + + /** + * The total number of events that have occurred since this object was created. Note that the previous count + * is *not* carried over when a RestorableMeter is restored. + */ + public long count() { + return count.get(); + } + + /** + * Returns the mean rate of events per second since this object was created. Note that the mean rate + * does *not* carry over when a RestorableMeter is restored, so the mean rate is only a measure since + * this object was created. + */ + public double meanRate() { + if (count() == 0) { + return 0.0; + } else { + final long elapsed = (clock.tick() - startTime); + return (count() / (double) elapsed) * NANOS_PER_SECOND; + } + } + + class RestorableEWMA { + private volatile boolean initialized = false; + private volatile double rate = 0.0; // average rate in terms of events per nanosecond + + private final AtomicLong uncounted = new AtomicLong(); + private final double alpha, interval; + + /** + * Create a new, uninitialized EWMA with a given window. + * + * @param windowInSeconds the window of time this EWMA should average over, expressed as a number of seconds + */ + public RestorableEWMA(long windowInSeconds) { + this.alpha = 1 - exp((-TICK_INTERVAL / NANOS_PER_SECOND) / windowInSeconds); + this.interval = (double) TICK_INTERVAL; + } + + /** + * Restore an EWMA from a last-seen rate and a given window. + * + * @param intervalInSeconds the window of time this EWMA should average over, expressed as a number of seconds + */ + public RestorableEWMA(double lastRate, long intervalInSeconds) { + this(intervalInSeconds); + this.rate = lastRate / NANOS_PER_SECOND; + this.initialized = true; + } + + /** + * Update the moving average with a new value. + */ + public void update(long n) { + uncounted.addAndGet(n); + } + + /** + * Mark the passage of time and decay the current rate accordingly. + */ + public void tick() { + final long count = uncounted.getAndSet(0); + final double instantRate = count / interval; + if (initialized) { + rate += (alpha * (instantRate - rate)); + } else { + rate = instantRate; + initialized = true; + } + } + + /** + * Returns the rate in terms of events per second. + */ + public double rate() { + return rate * NANOS_PER_SECOND; + } + } +} \ No newline at end of file