Repository: cassandra Updated Branches: refs/heads/trunk 1d387f5e7 -> 4e744e768
Improve LatencyMetrics performance by reducing write path processing Patch by Michael Burman; Reviewed by Chris Lohfink for CASSANDRA-14281 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e744e76 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e744e76 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e744e76 Branch: refs/heads/trunk Commit: 4e744e7688e01d35a6acac1cf8a7a3ff2573836f Parents: 1d387f5 Author: Michael Burman <y...@iki.fi> Authored: Thu Mar 1 14:59:53 2018 +0200 Committer: Jeff Jirsa <jji...@apple.com> Committed: Tue Apr 24 20:58:09 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../DecayingEstimatedHistogramReservoir.java | 230 +++++++++++-------- .../cassandra/metrics/LatencyMetrics.java | 158 +++++++++++-- .../test/microbench/LatencyTrackingBench.java | 118 ++++++++++ ...DecayingEstimatedHistogramReservoirTest.java | 35 +++ .../cassandra/metrics/LatencyMetricsTest.java | 70 +++++- 6 files changed, 502 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e744e76/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 784fa2b..fe03ae1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Improve LatencyMetrics performance by reducing write path processing (CASSANDRA-14281) * Add network authz (CASSANDRA-13985) * Use the correct IP/Port for Streaming when localAddress is left unbound (CASSANDAR-14389) * nodetool listsnapshots is missing local system keyspace snapshots (CASSANDRA-14381) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e744e76/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java b/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java index 118f062..f17de78 100644 --- a/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java +++ b/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java @@ -24,8 +24,7 @@ import java.io.PrintWriter; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLongArray; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.atomic.LongAdder; import com.google.common.annotations.VisibleForTesting; @@ -85,8 +84,8 @@ public class DecayingEstimatedHistogramReservoir implements Reservoir private final long[] bucketOffsets; // decayingBuckets and buckets are one element longer than bucketOffsets -- the last element is values greater than the last offset - private final AtomicLongArray decayingBuckets; - private final AtomicLongArray buckets; + private final LongAdder[] decayingBuckets; + private final LongAdder[] buckets; public static final long HALF_TIME_IN_S = 60L; public static final double MEAN_LIFETIME_IN_S = HALF_TIME_IN_S / Math.log(2.0); @@ -95,8 +94,6 @@ public class DecayingEstimatedHistogramReservoir implements Reservoir private final AtomicBoolean rescaling = new AtomicBoolean(false); private volatile long decayLandmark; - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - // Wrapper around System.nanoTime() to simplify unit testing. private final Clock clock; @@ -150,8 +147,15 @@ public class DecayingEstimatedHistogramReservoir implements Reservoir { bucketOffsets = EstimatedHistogram.newOffsets(bucketCount, considerZeroes); } - decayingBuckets = new AtomicLongArray(bucketOffsets.length + 1); - buckets = new AtomicLongArray(bucketOffsets.length + 1); + decayingBuckets = new LongAdder[bucketOffsets.length + 1]; + buckets = new LongAdder[bucketOffsets.length + 1]; + + for(int i = 0; i < buckets.length; i++) + { + decayingBuckets[i] = new LongAdder(); + buckets[i] = new LongAdder(); + } + this.clock = clock; decayLandmark = clock.getTime(); } @@ -174,18 +178,8 @@ public class DecayingEstimatedHistogramReservoir implements Reservoir } // else exact match; we're good - lockForRegularUsage(); - - try - { - decayingBuckets.getAndAdd(index, Math.round(forwardDecayWeight(now))); - } - finally - { - unlockForRegularUsage(); - } - - buckets.getAndIncrement(index); + decayingBuckets[index].add(Math.round(forwardDecayWeight(now))); + buckets[index].increment(); } private double forwardDecayWeight(long now) @@ -202,7 +196,7 @@ public class DecayingEstimatedHistogramReservoir implements Reservoir */ public int size() { - return decayingBuckets.length(); + return decayingBuckets.length; } /** @@ -215,17 +209,7 @@ public class DecayingEstimatedHistogramReservoir implements Reservoir public Snapshot getSnapshot() { rescaleIfNeeded(); - - lockForRegularUsage(); - - try - { - return new EstimatedHistogramReservoirSnapshot(this); - } - finally - { - unlockForRegularUsage(); - } + return new EstimatedHistogramReservoirSnapshot(this); } /** @@ -234,7 +218,7 @@ public class DecayingEstimatedHistogramReservoir implements Reservoir @VisibleForTesting boolean isOverflowed() { - return decayingBuckets.get(decayingBuckets.length() - 1) > 0; + return decayingBuckets[decayingBuckets.length - 1].sum() > 0; } private void rescaleIfNeeded() @@ -254,6 +238,7 @@ public class DecayingEstimatedHistogramReservoir implements Reservoir } finally { + decayLandmark = now; rescaling.set(false); } } @@ -262,27 +247,14 @@ public class DecayingEstimatedHistogramReservoir implements Reservoir private void rescale(long now) { - // Check again to make sure that another thread didn't complete rescale already - if (needRescale(now)) - { - lockForRescale(); - - try - { - final double rescaleFactor = forwardDecayWeight(now); - decayLandmark = now; + final double rescaleFactor = forwardDecayWeight(now); - final int bucketCount = decayingBuckets.length(); - for (int i = 0; i < bucketCount; i++) - { - long newValue = Math.round((decayingBuckets.get(i) / rescaleFactor)); - decayingBuckets.set(i, newValue); - } - } - finally - { - unlockForRescale(); - } + final int bucketCount = decayingBuckets.length; + for (int i = 0; i < bucketCount; i++) + { + long storedValue = decayingBuckets[i].sumThenReset(); + storedValue = Math.round(storedValue / rescaleFactor); + decayingBuckets[i].add(storedValue); } } @@ -294,41 +266,44 @@ public class DecayingEstimatedHistogramReservoir implements Reservoir @VisibleForTesting public void clear() { - lockForRescale(); - - try - { - final int bucketCount = decayingBuckets.length(); - for (int i = 0; i < bucketCount; i++) - { - decayingBuckets.set(i, 0L); - buckets.set(i, 0L); - } - } - finally + final int bucketCount = decayingBuckets.length; + for (int i = 0; i < bucketCount; i++) { - unlockForRescale(); + decayingBuckets[i].reset(); + buckets[i].reset(); } } - private void lockForRegularUsage() + /** + * Replaces current internal values with the given one from a Snapshot. This method is NOT thread safe, values + * added at the same time to this reservoir using methods such as update may lose their data + */ + public void rebase(EstimatedHistogramReservoirSnapshot snapshot) { - this.lock.readLock().lock(); - } + // Check bucket count + if (decayingBuckets.length != snapshot.decayingBuckets.length) + { + throw new IllegalStateException("Unable to merge two DecayingEstimatedHistogramReservoirs with different bucket sizes"); + } - private void unlockForRegularUsage() - { - this.lock.readLock().unlock(); - } + // Check bucketOffsets + for (int i = 0; i < bucketOffsets.length; i++) + { + if (bucketOffsets[i] != snapshot.bucketOffsets[i]) + { + throw new IllegalStateException("Merge is only supported with equal bucketOffsets"); + } + } - private void lockForRescale() - { - this.lock.writeLock().lock(); - } + this.decayLandmark = snapshot.snapshotLandmark; + for (int i = 0; i < decayingBuckets.length; i++) + { + decayingBuckets[i].reset(); + buckets[i].reset(); - private void unlockForRescale() - { - this.lock.writeLock().unlock(); + decayingBuckets[i].add(snapshot.decayingBuckets[i]); + buckets[i].add(snapshot.values[i]); + } } /** @@ -341,19 +316,32 @@ public class DecayingEstimatedHistogramReservoir implements Reservoir * The decaying buckets will be used for quantile calculations and mean values, but the non decaying buckets will be * exposed for calls to {@link Snapshot#getValues()}. */ - private class EstimatedHistogramReservoirSnapshot extends Snapshot + class EstimatedHistogramReservoirSnapshot extends Snapshot { private final long[] decayingBuckets; + private final long[] values; + private long count; + private long snapshotLandmark; + private long[] bucketOffsets; + private DecayingEstimatedHistogramReservoir reservoir; public EstimatedHistogramReservoirSnapshot(DecayingEstimatedHistogramReservoir reservoir) { - final int length = reservoir.decayingBuckets.length(); + final int length = reservoir.decayingBuckets.length; final double rescaleFactor = forwardDecayWeight(clock.getTime()); this.decayingBuckets = new long[length]; + this.values = new long[length]; + this.count = count(); + this.snapshotLandmark = decayLandmark; + this.bucketOffsets = reservoir.bucketOffsets; // No need to copy, these are immutable for (int i = 0; i < length; i++) - this.decayingBuckets[i] = Math.round(reservoir.decayingBuckets.get(i) / rescaleFactor); + { + this.decayingBuckets[i] = Math.round(reservoir.decayingBuckets[i].sum() / rescaleFactor); + this.values[i] = buckets[i].sum(); + } + this.reservoir = reservoir; } /** @@ -396,13 +384,6 @@ public class DecayingEstimatedHistogramReservoir implements Reservoir */ public long[] getValues() { - final int length = buckets.length(); - - long[] values = new long[length]; - - for (int i = 0; i < length; i++) - values[i] = buckets.get(i); - return values; } @@ -418,6 +399,12 @@ public class DecayingEstimatedHistogramReservoir implements Reservoir return decayingBuckets.length; } + @VisibleForTesting + public long getSnapshotLandmark() + { + return snapshotLandmark; + } + /** * Return the number of registered values taking forward decay into account. * @@ -547,5 +534,68 @@ public class DecayingEstimatedHistogramReservoir implements Reservoir } } } + + /** + * Adds another DecayingEstimatedHistogramReservoir's Snapshot to this one. Both reservoirs must have same bucket definitions. This will rescale both snapshots if needed. + * + * @param other EstimatedHistogramReservoirSnapshot with identical bucket definition (offsets and length) + */ + public void add(Snapshot other) + { + if (!(other instanceof EstimatedHistogramReservoirSnapshot)) + { + throw new IllegalStateException("Unable to add other types of Snapshot than another DecayingEstimatedHistogramReservoir"); + } + + EstimatedHistogramReservoirSnapshot snapshot = (EstimatedHistogramReservoirSnapshot) other; + + if (decayingBuckets.length != snapshot.decayingBuckets.length) + { + throw new IllegalStateException("Unable to merge two DecayingEstimatedHistogramReservoirs with different bucket sizes"); + } + + // Check bucketOffsets + for (int i = 0; i < bucketOffsets.length; i++) + { + if (bucketOffsets[i] != snapshot.bucketOffsets[i]) + { + throw new IllegalStateException("Merge is only supported with equal bucketOffsets"); + } + } + + // We need to rescale the reservoirs to the same landmark + if (snapshot.snapshotLandmark < snapshotLandmark) + { + rescaleArray(snapshot.decayingBuckets, (snapshotLandmark - snapshot.snapshotLandmark)); + } + else if (snapshot.snapshotLandmark > snapshotLandmark) + { + rescaleArray(decayingBuckets, (snapshot.snapshotLandmark - snapshotLandmark)); + this.snapshotLandmark = snapshot.snapshotLandmark; + } + + // Now merge the buckets + for (int i = 0; i < snapshot.decayingBuckets.length; i++) + { + decayingBuckets[i] += snapshot.decayingBuckets[i]; + values[i] += snapshot.values[i]; + } + + this.count += snapshot.count; + } + + private void rescaleArray(long[] decayingBuckets, long landMarkDifference) + { + final double rescaleFactor = Math.exp((landMarkDifference / 1000.0) / MEAN_LIFETIME_IN_S); + for (int i = 0; i < decayingBuckets.length; i++) + { + decayingBuckets[i] = Math.round(decayingBuckets[i] / rescaleFactor); + } + } + + public void rebaseReservoir() + { + this.reservoir.rebase(this); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e744e76/src/java/org/apache/cassandra/metrics/LatencyMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/LatencyMetrics.java b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java index a1915b1..7d5d288 100644 --- a/src/java/org/apache/cassandra/metrics/LatencyMetrics.java +++ b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java @@ -17,15 +17,17 @@ */ package org.apache.cassandra.metrics; +import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Lists; + import com.codahale.metrics.Counter; +import com.codahale.metrics.Reservoir; +import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; @@ -41,7 +43,8 @@ public class LatencyMetrics /** parent metrics to replicate any updates to **/ private List<LatencyMetrics> parents = Lists.newArrayList(); - + private List<LatencyMetrics> children = Lists.newArrayList(); + protected final MetricNameFactory factory; protected final MetricNameFactory aliasFactory; protected final String namePrefix; @@ -86,15 +89,18 @@ public class LatencyMetrics this.aliasFactory = aliasFactory; this.namePrefix = namePrefix; + Timer timer = new LatencyMetrics.LatencyMetricsTimer(new DecayingEstimatedHistogramReservoir()); + Counter counter = new LatencyMetricsCounter(); + if (aliasFactory == null) { - latency = Metrics.timer(factory.createMetricName(namePrefix + "Latency")); - totalLatency = Metrics.counter(factory.createMetricName(namePrefix + "TotalLatency")); + latency = Metrics.register(factory.createMetricName(namePrefix + "Latency"), timer); + totalLatency = Metrics.register(factory.createMetricName(namePrefix + "TotalLatency"), counter); } else { - latency = Metrics.timer(factory.createMetricName(namePrefix + "Latency"), aliasFactory.createMetricName(namePrefix + "Latency")); - totalLatency = Metrics.counter(factory.createMetricName(namePrefix + "TotalLatency"), aliasFactory.createMetricName(namePrefix + "TotalLatency")); + latency = Metrics.register(factory.createMetricName(namePrefix + "Latency"), aliasFactory.createMetricName(namePrefix + "Latency"), timer); + totalLatency = Metrics.register(factory.createMetricName(namePrefix + "TotalLatency"), aliasFactory.createMetricName(namePrefix + "TotalLatency"), counter); } } @@ -109,7 +115,38 @@ public class LatencyMetrics public LatencyMetrics(MetricNameFactory factory, String namePrefix, LatencyMetrics ... parents) { this(factory, null, namePrefix); - this.parents.addAll(ImmutableList.copyOf(parents)); + this.parents = Arrays.asList(parents); + for (LatencyMetrics parent : parents) + { + parent.addChildren(this); + } + } + + public void addChildren(LatencyMetrics latencyMetric) + { + this.children.add(latencyMetric); + } + + public synchronized void removeChildren(LatencyMetrics toRelease) + { + /* + Merge details of removed children metrics and add them to our local copy to prevent metrics from going + backwards. Synchronized since these methods are not thread safe to prevent multiple simultaneous removals. + Will not protect against simultaneous updates, but since these methods are used by linked parent instances only, + they should not receive any updates. + */ + ((LatencyMetricsTimer) this.latency).releasedLatencyCount += toRelease.latency.getCount(); + + DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot childSnapshot = (DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot) toRelease.latency.getSnapshot(); + DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot snapshot = (DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot) this.latency.getSnapshot(); + + snapshot.add(childSnapshot); + snapshot.rebaseReservoir(); + + this.totalLatency.inc(toRelease.totalLatency.getCount()); + + // Now we can remove the reference + this.children.removeIf(latencyMetrics -> latencyMetrics.equals(toRelease)); } /** takes nanoseconds **/ @@ -118,14 +155,15 @@ public class LatencyMetrics // convert to microseconds. 1 millionth latency.update(nanos, TimeUnit.NANOSECONDS); totalLatency.inc(nanos / 1000); - for(LatencyMetrics parent : parents) - { - parent.addNano(nanos); - } } public void release() { + // Notify parent metrics that this metric is being released + for (LatencyMetrics parent : this.parents) + { + parent.removeChildren(this); + } if (aliasFactory == null) { Metrics.remove(factory.createMetricName(namePrefix + "Latency")); @@ -137,4 +175,98 @@ public class LatencyMetrics Metrics.remove(factory.createMetricName(namePrefix + "TotalLatency"), aliasFactory.createMetricName(namePrefix + "TotalLatency")); } } + + class LatencyMetricsTimer extends Timer + { + + long releasedLatencyCount = 0; + + public LatencyMetricsTimer(Reservoir reservoir) + { + super(reservoir); + } + + @Override + public long getCount() + { + long count = super.getCount() + releasedLatencyCount; + for (LatencyMetrics child : children) + { + count += child.latency.getCount(); + } + + return count; + } + + @Override + public double getFifteenMinuteRate() + { + double rate = super.getFifteenMinuteRate(); + for (LatencyMetrics child : children) + { + rate += child.latency.getFifteenMinuteRate(); + } + return rate; + } + + @Override + public double getFiveMinuteRate() + { + double rate = super.getFiveMinuteRate(); + for (LatencyMetrics child : children) + { + rate += child.latency.getFiveMinuteRate(); + } + return rate; + } + + @Override + public double getMeanRate() + { + // Not necessarily 100% accurate, but close enough + double rate = super.getMeanRate(); + for (LatencyMetrics child : children) + { + rate += child.latency.getMeanRate(); + } + return rate; + } + + @Override + public double getOneMinuteRate() + { + double rate = super.getOneMinuteRate(); + for (LatencyMetrics child : children) + { + rate += child.latency.getOneMinuteRate(); + } + return rate; + } + + @Override + public Snapshot getSnapshot() + { + DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot parent = (DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot) super.getSnapshot(); + for (LatencyMetrics child : children) + { + parent.add(child.latency.getSnapshot()); + } + + return parent; + } + } + + class LatencyMetricsCounter extends Counter + { + @Override + public long getCount() + { + long count = super.getCount(); + for (LatencyMetrics child : children) + { + count += child.totalLatency.getCount(); + } + return count; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e744e76/test/microbench/org/apache/cassandra/test/microbench/LatencyTrackingBench.java ---------------------------------------------------------------------- diff --git a/test/microbench/org/apache/cassandra/test/microbench/LatencyTrackingBench.java b/test/microbench/org/apache/cassandra/test/microbench/LatencyTrackingBench.java new file mode 100644 index 0000000..28e0da7 --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/LatencyTrackingBench.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.test.microbench; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Timer; +import org.apache.cassandra.metrics.CassandraMetricsRegistry; +import org.apache.cassandra.metrics.ClearableHistogram; +import org.apache.cassandra.metrics.DecayingEstimatedHistogramReservoir; +import org.apache.cassandra.metrics.LatencyMetrics; +import org.apache.cassandra.metrics.LatencyMetricsTest; +import org.apache.cassandra.metrics.MetricNameFactory; +import org.apache.cassandra.metrics.TableMetrics; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.CompilerControl; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1,jvmArgsAppend = { "-Xmx512M", "-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"}) +@Threads(4) // make sure this matches the number of _physical_cores_ +@State(Scope.Benchmark) +public class LatencyTrackingBench +{ + private LatencyMetrics metrics; + private LatencyMetrics parent; + private LatencyMetrics grandParent; + private DecayingEstimatedHistogramReservoir dehr; + private final MetricNameFactory factory = new BenchMetricsNameFactory(); + private long[] values = new long[1024]; + + class BenchMetricsNameFactory implements MetricNameFactory + { + + @Override + public CassandraMetricsRegistry.MetricName createMetricName(String metricName) + { + return new CassandraMetricsRegistry.MetricName(BenchMetricsNameFactory.class, metricName); + } + } + + @Setup(Level.Iteration) + public void setup() + { + parent = new LatencyMetrics("test", "testCF"); + grandParent = new LatencyMetrics("test", "testCF"); + + // Replicates behavior from ColumnFamilyStore metrics + metrics = new LatencyMetrics(factory, "testCF", parent, grandParent); + dehr = new DecayingEstimatedHistogramReservoir(false); + for(int i = 0; i < 1024; i++) + { + values[i] = TimeUnit.MICROSECONDS.toNanos(ThreadLocalRandom.current().nextLong(346)); + } + } + + @Setup(Level.Invocation) + public void reset() + { + dehr = new DecayingEstimatedHistogramReservoir(false); + metrics.release(); + metrics = new LatencyMetrics(factory, "testCF", parent, grandParent); + } + + @Benchmark + @OperationsPerInvocation(1024) + public void benchLatencyMetricsWrite() + { + for(int i = 0; i < values.length; i++) + { + metrics.addNano(values[i]); + } + } + + @Benchmark + @OperationsPerInvocation(1024) + public void benchInsertToDEHR(Blackhole bh) + { + for(int i = 0; i < values.length; i++) + { + dehr.update(values[i]); + } + bh.consume(dehr); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e744e76/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java b/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java index ef1fed3..5cfd927 100644 --- a/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java +++ b/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java @@ -380,6 +380,41 @@ public class DecayingEstimatedHistogramReservoirTest } } + @Test + public void testAggregation() + { + TestClock clock = new TestClock(); + + DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock); + DecayingEstimatedHistogramReservoir another = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock); + + clock.addMillis(DecayingEstimatedHistogramReservoir.LANDMARK_RESET_INTERVAL_IN_MS - 1_000L); + + histogram.update(1000); + clock.addMillis(100); + another.update(2000); + clock.addMillis(100); + histogram.update(2000); + clock.addMillis(100); + another.update(3000); + clock.addMillis(100); + histogram.update(3000); + clock.addMillis(100); + another.update(4000); + + DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot snapshot = (DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot) histogram.getSnapshot(); + DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot anotherSnapshot = (DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot) another.getSnapshot(); + + assertEquals(2000, snapshot.getMean(), 500D); + assertEquals(3000, anotherSnapshot.getMean(), 500D); + + snapshot.add(anotherSnapshot); + + // Another had newer decayLandmark, the aggregated snapshot should use it + assertEquals(anotherSnapshot.getSnapshotLandmark(), snapshot.getSnapshotLandmark()); + assertEquals(2500, snapshot.getMean(), 500D); + } + private void assertEstimatedQuantile(long expectedValue, double actualValue) { assertTrue("Expected at least [" + expectedValue + "] but actual is [" + actualValue + "]", actualValue >= expectedValue); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e744e76/test/unit/org/apache/cassandra/metrics/LatencyMetricsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/metrics/LatencyMetricsTest.java b/test/unit/org/apache/cassandra/metrics/LatencyMetricsTest.java index 62cb88e..d61c550 100644 --- a/test/unit/org/apache/cassandra/metrics/LatencyMetricsTest.java +++ b/test/unit/org/apache/cassandra/metrics/LatencyMetricsTest.java @@ -18,12 +18,27 @@ package org.apache.cassandra.metrics; +import java.util.concurrent.TimeUnit; + import org.junit.Test; +import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertFalse; public class LatencyMetricsTest { + private final MetricNameFactory factory = new TestMetricsNameFactory(); + + private class TestMetricsNameFactory implements MetricNameFactory + { + + @Override + public CassandraMetricsRegistry.MetricName createMetricName(String metricName) + { + return new CassandraMetricsRegistry.MetricName(TestMetricsNameFactory.class, metricName); + } + } + /** * Test bitsets in a "real-world" environment, i.e., bloom filters */ @@ -31,14 +46,10 @@ public class LatencyMetricsTest public void testGetRecentLatency() { final LatencyMetrics l = new LatencyMetrics("test", "test"); - Runnable r = new Runnable() - { - public void run() + Runnable r = () -> { + for (int i = 0; i < 10000; i++) { - for (int i = 0; i < 10000; i++) - { - l.addNano(1000); - } + l.addNano(1000); } }; new Thread(r).start(); @@ -49,4 +60,49 @@ public class LatencyMetricsTest assertFalse(recent.equals(Double.POSITIVE_INFINITY)); } } + + /** + * Test that parent LatencyMetrics are receiving updates from child metrics when reading + */ + @Test + public void testReadMerging() + { + final LatencyMetrics parent = new LatencyMetrics("testMerge", "testMerge"); + final LatencyMetrics child = new LatencyMetrics(factory, "testChild", parent); + + for (int i = 0; i < 100; i++) + { + child.addNano(TimeUnit.NANOSECONDS.convert(i, TimeUnit.MILLISECONDS)); + } + + assertEquals(4950000, child.totalLatency.getCount()); + assertEquals(child.totalLatency.getCount(), parent.totalLatency.getCount()); + assertEquals(child.latency.getSnapshot().getMean(), parent.latency.getSnapshot().getMean(), 50D); + + child.release(); + parent.release(); + } + + @Test + public void testRelease() + { + final LatencyMetrics parent = new LatencyMetrics("testRelease", "testRelease"); + final LatencyMetrics child = new LatencyMetrics(factory, "testChildRelease", parent); + + for (int i = 0; i < 100; i++) + { + child.addNano(TimeUnit.NANOSECONDS.convert(i, TimeUnit.MILLISECONDS)); + } + + double mean = parent.latency.getSnapshot().getMean(); + long count = parent.totalLatency.getCount(); + + child.release(); + + // Check that no value was lost with the release + assertEquals(count, parent.totalLatency.getCount()); + assertEquals(mean, parent.latency.getSnapshot().getMean(), 50D); + + parent.release(); + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org