Repository: cassandra
Updated Branches:
  refs/heads/trunk 1d387f5e7 -> 4e744e768


Improve LatencyMetrics performance by reducing write path processing

Patch by Michael Burman; Reviewed by Chris Lohfink for CASSANDRA-14281


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e744e76
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e744e76
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e744e76

Branch: refs/heads/trunk
Commit: 4e744e7688e01d35a6acac1cf8a7a3ff2573836f
Parents: 1d387f5
Author: Michael Burman <y...@iki.fi>
Authored: Thu Mar 1 14:59:53 2018 +0200
Committer: Jeff Jirsa <jji...@apple.com>
Committed: Tue Apr 24 20:58:09 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../DecayingEstimatedHistogramReservoir.java    | 230 +++++++++++--------
 .../cassandra/metrics/LatencyMetrics.java       | 158 +++++++++++--
 .../test/microbench/LatencyTrackingBench.java   | 118 ++++++++++
 ...DecayingEstimatedHistogramReservoirTest.java |  35 +++
 .../cassandra/metrics/LatencyMetricsTest.java   |  70 +++++-
 6 files changed, 502 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e744e76/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 784fa2b..fe03ae1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Improve LatencyMetrics performance by reducing write path processing 
(CASSANDRA-14281)
  * Add network authz (CASSANDRA-13985)
  * Use the correct IP/Port for Streaming when localAddress is left unbound 
(CASSANDAR-14389)
  * nodetool listsnapshots is missing local system keyspace snapshots 
(CASSANDRA-14381)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e744e76/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java
 
b/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java
index 118f062..f17de78 100644
--- 
a/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java
+++ 
b/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java
@@ -24,8 +24,7 @@ import java.io.PrintWriter;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLongArray;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.atomic.LongAdder;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -85,8 +84,8 @@ public class DecayingEstimatedHistogramReservoir implements 
Reservoir
     private final long[] bucketOffsets;
 
     // decayingBuckets and buckets are one element longer than bucketOffsets 
-- the last element is values greater than the last offset
-    private final AtomicLongArray decayingBuckets;
-    private final AtomicLongArray buckets;
+    private final LongAdder[] decayingBuckets;
+    private final LongAdder[] buckets;
 
     public static final long HALF_TIME_IN_S = 60L;
     public static final double MEAN_LIFETIME_IN_S = HALF_TIME_IN_S / 
Math.log(2.0);
@@ -95,8 +94,6 @@ public class DecayingEstimatedHistogramReservoir implements 
Reservoir
     private final AtomicBoolean rescaling = new AtomicBoolean(false);
     private volatile long decayLandmark;
 
-    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
     // Wrapper around System.nanoTime() to simplify unit testing.
     private final Clock clock;
 
@@ -150,8 +147,15 @@ public class DecayingEstimatedHistogramReservoir 
implements Reservoir
         {
             bucketOffsets = EstimatedHistogram.newOffsets(bucketCount, 
considerZeroes);
         }
-        decayingBuckets = new AtomicLongArray(bucketOffsets.length + 1);
-        buckets = new AtomicLongArray(bucketOffsets.length + 1);
+        decayingBuckets = new LongAdder[bucketOffsets.length + 1];
+        buckets = new LongAdder[bucketOffsets.length + 1];
+
+        for(int i = 0; i < buckets.length; i++) 
+        {
+            decayingBuckets[i] = new LongAdder();
+            buckets[i] = new LongAdder();
+        }
+
         this.clock = clock;
         decayLandmark = clock.getTime();
     }
@@ -174,18 +178,8 @@ public class DecayingEstimatedHistogramReservoir 
implements Reservoir
         }
         // else exact match; we're good
 
-        lockForRegularUsage();
-
-        try
-        {
-            decayingBuckets.getAndAdd(index, 
Math.round(forwardDecayWeight(now)));
-        }
-        finally
-        {
-            unlockForRegularUsage();
-        }
-
-        buckets.getAndIncrement(index);
+        decayingBuckets[index].add(Math.round(forwardDecayWeight(now)));
+        buckets[index].increment();
     }
 
     private double forwardDecayWeight(long now)
@@ -202,7 +196,7 @@ public class DecayingEstimatedHistogramReservoir implements 
Reservoir
      */
     public int size()
     {
-        return decayingBuckets.length();
+        return decayingBuckets.length;
     }
 
     /**
@@ -215,17 +209,7 @@ public class DecayingEstimatedHistogramReservoir 
implements Reservoir
     public Snapshot getSnapshot()
     {
         rescaleIfNeeded();
-
-        lockForRegularUsage();
-
-        try
-        {
-            return new EstimatedHistogramReservoirSnapshot(this);
-        }
-        finally
-        {
-            unlockForRegularUsage();
-        }
+        return new EstimatedHistogramReservoirSnapshot(this);
     }
 
     /**
@@ -234,7 +218,7 @@ public class DecayingEstimatedHistogramReservoir implements 
Reservoir
     @VisibleForTesting
     boolean isOverflowed()
     {
-        return decayingBuckets.get(decayingBuckets.length() - 1) > 0;
+        return decayingBuckets[decayingBuckets.length - 1].sum() > 0;
     }
 
     private void rescaleIfNeeded()
@@ -254,6 +238,7 @@ public class DecayingEstimatedHistogramReservoir implements 
Reservoir
                 }
                 finally
                 {
+                    decayLandmark = now;
                     rescaling.set(false);
                 }
             }
@@ -262,27 +247,14 @@ public class DecayingEstimatedHistogramReservoir 
implements Reservoir
 
     private void rescale(long now)
     {
-        // Check again to make sure that another thread didn't complete 
rescale already
-        if (needRescale(now))
-        {
-            lockForRescale();
-
-            try
-            {
-                final double rescaleFactor = forwardDecayWeight(now);
-                decayLandmark = now;
+        final double rescaleFactor = forwardDecayWeight(now);
 
-                final int bucketCount = decayingBuckets.length();
-                for (int i = 0; i < bucketCount; i++)
-                {
-                    long newValue = Math.round((decayingBuckets.get(i) / 
rescaleFactor));
-                    decayingBuckets.set(i, newValue);
-                }
-            }
-            finally
-            {
-                unlockForRescale();
-            }
+        final int bucketCount = decayingBuckets.length;
+        for (int i = 0; i < bucketCount; i++)
+        {
+            long storedValue = decayingBuckets[i].sumThenReset();
+            storedValue = Math.round(storedValue / rescaleFactor);
+            decayingBuckets[i].add(storedValue);
         }
     }
 
@@ -294,41 +266,44 @@ public class DecayingEstimatedHistogramReservoir 
implements Reservoir
     @VisibleForTesting
     public void clear()
     {
-        lockForRescale();
-
-        try
-        {
-            final int bucketCount = decayingBuckets.length();
-            for (int i = 0; i < bucketCount; i++)
-            {
-                decayingBuckets.set(i, 0L);
-                buckets.set(i, 0L);
-            }
-        }
-        finally
+        final int bucketCount = decayingBuckets.length;
+        for (int i = 0; i < bucketCount; i++)
         {
-            unlockForRescale();
+            decayingBuckets[i].reset();
+            buckets[i].reset();
         }
     }
 
-    private void lockForRegularUsage()
+    /**
+     * Replaces current internal values with the given one from a Snapshot. 
This method is NOT thread safe, values
+     * added at the same time to this reservoir using methods such as update 
may lose their data
+     */
+    public void rebase(EstimatedHistogramReservoirSnapshot snapshot)
     {
-        this.lock.readLock().lock();
-    }
+        // Check bucket count
+        if (decayingBuckets.length != snapshot.decayingBuckets.length)
+        {
+            throw new IllegalStateException("Unable to merge two 
DecayingEstimatedHistogramReservoirs with different bucket sizes");
+        }
 
-    private void unlockForRegularUsage()
-    {
-        this.lock.readLock().unlock();
-    }
+        // Check bucketOffsets
+        for (int i = 0; i < bucketOffsets.length; i++)
+        {
+            if (bucketOffsets[i] != snapshot.bucketOffsets[i])
+            {
+                throw new IllegalStateException("Merge is only supported with 
equal bucketOffsets");
+            }
+        }
 
-    private void lockForRescale()
-    {
-        this.lock.writeLock().lock();
-    }
+        this.decayLandmark = snapshot.snapshotLandmark;
+        for (int i = 0; i < decayingBuckets.length; i++)
+        {
+            decayingBuckets[i].reset();
+            buckets[i].reset();
 
-    private void unlockForRescale()
-    {
-        this.lock.writeLock().unlock();
+            decayingBuckets[i].add(snapshot.decayingBuckets[i]);
+            buckets[i].add(snapshot.values[i]);
+        }
     }
 
     /**
@@ -341,19 +316,32 @@ public class DecayingEstimatedHistogramReservoir 
implements Reservoir
      * The decaying buckets will be used for quantile calculations and mean 
values, but the non decaying buckets will be
      * exposed for calls to {@link Snapshot#getValues()}.
      */
-    private class EstimatedHistogramReservoirSnapshot extends Snapshot
+    class EstimatedHistogramReservoirSnapshot extends Snapshot
     {
         private final long[] decayingBuckets;
+        private final long[] values;
+        private long count;
+        private long snapshotLandmark;
+        private long[] bucketOffsets;
+        private DecayingEstimatedHistogramReservoir reservoir;
 
         public 
EstimatedHistogramReservoirSnapshot(DecayingEstimatedHistogramReservoir 
reservoir)
         {
-            final int length = reservoir.decayingBuckets.length();
+            final int length = reservoir.decayingBuckets.length;
             final double rescaleFactor = forwardDecayWeight(clock.getTime());
 
             this.decayingBuckets = new long[length];
+            this.values = new long[length];
+            this.count = count();
+            this.snapshotLandmark = decayLandmark;
+            this.bucketOffsets = reservoir.bucketOffsets; // No need to copy, 
these are immutable
 
             for (int i = 0; i < length; i++)
-                this.decayingBuckets[i] = 
Math.round(reservoir.decayingBuckets.get(i) / rescaleFactor);
+            {
+                this.decayingBuckets[i] = 
Math.round(reservoir.decayingBuckets[i].sum() / rescaleFactor);
+                this.values[i] = buckets[i].sum();
+            }
+            this.reservoir = reservoir;
         }
 
         /**
@@ -396,13 +384,6 @@ public class DecayingEstimatedHistogramReservoir 
implements Reservoir
          */
         public long[] getValues()
         {
-            final int length = buckets.length();
-
-            long[] values = new long[length];
-
-            for (int i = 0; i < length; i++)
-                values[i] = buckets.get(i);
-
             return values;
         }
 
@@ -418,6 +399,12 @@ public class DecayingEstimatedHistogramReservoir 
implements Reservoir
             return decayingBuckets.length;
         }
 
+        @VisibleForTesting
+        public long getSnapshotLandmark()
+        {
+            return snapshotLandmark;
+        }
+
         /**
          * Return the number of registered values taking forward decay into 
account.
          *
@@ -547,5 +534,68 @@ public class DecayingEstimatedHistogramReservoir 
implements Reservoir
                 }
             }
         }
+
+        /**
+         * Adds another DecayingEstimatedHistogramReservoir's Snapshot to this 
one. Both reservoirs must have same bucket definitions. This will rescale both 
snapshots if needed.
+         *
+         * @param other EstimatedHistogramReservoirSnapshot with identical 
bucket definition (offsets and length)
+         */
+        public void add(Snapshot other)
+        {
+            if (!(other instanceof EstimatedHistogramReservoirSnapshot))
+            {
+                throw new IllegalStateException("Unable to add other types of 
Snapshot than another DecayingEstimatedHistogramReservoir");
+            }
+
+            EstimatedHistogramReservoirSnapshot snapshot = 
(EstimatedHistogramReservoirSnapshot) other;
+
+            if (decayingBuckets.length != snapshot.decayingBuckets.length)
+            {
+                throw new IllegalStateException("Unable to merge two 
DecayingEstimatedHistogramReservoirs with different bucket sizes");
+            }
+
+            // Check bucketOffsets
+            for (int i = 0; i < bucketOffsets.length; i++)
+            {
+                if (bucketOffsets[i] != snapshot.bucketOffsets[i])
+                {
+                    throw new IllegalStateException("Merge is only supported 
with equal bucketOffsets");
+                }
+            }
+
+            // We need to rescale the reservoirs to the same landmark
+            if (snapshot.snapshotLandmark < snapshotLandmark)
+            {
+                rescaleArray(snapshot.decayingBuckets, (snapshotLandmark - 
snapshot.snapshotLandmark));
+            }
+            else if (snapshot.snapshotLandmark > snapshotLandmark)
+            {
+                rescaleArray(decayingBuckets, (snapshot.snapshotLandmark - 
snapshotLandmark));
+                this.snapshotLandmark = snapshot.snapshotLandmark;
+            }
+
+            // Now merge the buckets
+            for (int i = 0; i < snapshot.decayingBuckets.length; i++)
+            {
+                decayingBuckets[i] += snapshot.decayingBuckets[i];
+                values[i] += snapshot.values[i];
+            }
+
+            this.count += snapshot.count;
+        }
+
+        private void rescaleArray(long[] decayingBuckets, long 
landMarkDifference)
+        {
+            final double rescaleFactor = Math.exp((landMarkDifference / 
1000.0) / MEAN_LIFETIME_IN_S);
+            for (int i = 0; i < decayingBuckets.length; i++)
+            {
+                decayingBuckets[i] = Math.round(decayingBuckets[i] / 
rescaleFactor);
+            }
+        }
+
+        public void rebaseReservoir() 
+        {
+            this.reservoir.rebase(this);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e744e76/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/LatencyMetrics.java 
b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
index a1915b1..7d5d288 100644
--- a/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
@@ -17,15 +17,17 @@
  */
 package org.apache.cassandra.metrics;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Lists;
+
 import com.codahale.metrics.Counter;
+import com.codahale.metrics.Reservoir;
+import com.codahale.metrics.Snapshot;
 import com.codahale.metrics.Timer;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
 
@@ -41,7 +43,8 @@ public class LatencyMetrics
 
     /** parent metrics to replicate any updates to **/
     private List<LatencyMetrics> parents = Lists.newArrayList();
-    
+    private List<LatencyMetrics> children = Lists.newArrayList();
+
     protected final MetricNameFactory factory;
     protected final MetricNameFactory aliasFactory;
     protected final String namePrefix;
@@ -86,15 +89,18 @@ public class LatencyMetrics
         this.aliasFactory = aliasFactory;
         this.namePrefix = namePrefix;
 
+        Timer timer = new LatencyMetrics.LatencyMetricsTimer(new 
DecayingEstimatedHistogramReservoir());
+        Counter counter = new LatencyMetricsCounter();
+
         if (aliasFactory == null)
         {
-            latency = Metrics.timer(factory.createMetricName(namePrefix + 
"Latency"));
-            totalLatency = Metrics.counter(factory.createMetricName(namePrefix 
+ "TotalLatency"));
+            latency = Metrics.register(factory.createMetricName(namePrefix + 
"Latency"), timer);
+            totalLatency = 
Metrics.register(factory.createMetricName(namePrefix + "TotalLatency"), 
counter);
         }
         else
         {
-            latency = Metrics.timer(factory.createMetricName(namePrefix + 
"Latency"), aliasFactory.createMetricName(namePrefix + "Latency"));
-            totalLatency = Metrics.counter(factory.createMetricName(namePrefix 
+ "TotalLatency"), aliasFactory.createMetricName(namePrefix + "TotalLatency"));
+            latency = Metrics.register(factory.createMetricName(namePrefix + 
"Latency"), aliasFactory.createMetricName(namePrefix + "Latency"), timer);
+            totalLatency = 
Metrics.register(factory.createMetricName(namePrefix + "TotalLatency"), 
aliasFactory.createMetricName(namePrefix + "TotalLatency"), counter);
         }
     }
     
@@ -109,7 +115,38 @@ public class LatencyMetrics
     public LatencyMetrics(MetricNameFactory factory, String namePrefix, 
LatencyMetrics ... parents)
     {
         this(factory, null, namePrefix);
-        this.parents.addAll(ImmutableList.copyOf(parents));
+        this.parents = Arrays.asList(parents);
+        for (LatencyMetrics parent : parents)
+        {
+            parent.addChildren(this);
+        }
+    }
+
+    public void addChildren(LatencyMetrics latencyMetric) 
+    {
+        this.children.add(latencyMetric);
+    }
+
+    public synchronized void removeChildren(LatencyMetrics toRelease)
+    {
+        /*
+        Merge details of removed children metrics and add them to our local 
copy to prevent metrics from going
+        backwards. Synchronized since these methods are not thread safe to 
prevent multiple simultaneous removals.
+        Will not protect against simultaneous updates, but since these methods 
are used by linked parent instances only,
+        they should not receive any updates.
+         */
+        ((LatencyMetricsTimer) this.latency).releasedLatencyCount += 
toRelease.latency.getCount();
+
+        
DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot 
childSnapshot = 
(DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot) 
toRelease.latency.getSnapshot();
+        
DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot 
snapshot = 
(DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot) 
this.latency.getSnapshot();
+
+        snapshot.add(childSnapshot);
+        snapshot.rebaseReservoir();
+
+        this.totalLatency.inc(toRelease.totalLatency.getCount());
+
+        // Now we can remove the reference
+        this.children.removeIf(latencyMetrics -> 
latencyMetrics.equals(toRelease));
     }
 
     /** takes nanoseconds **/
@@ -118,14 +155,15 @@ public class LatencyMetrics
         // convert to microseconds. 1 millionth
         latency.update(nanos, TimeUnit.NANOSECONDS);
         totalLatency.inc(nanos / 1000);
-        for(LatencyMetrics parent : parents)
-        {
-            parent.addNano(nanos);
-        }
     }
 
     public void release()
     {
+        // Notify parent metrics that this metric is being released
+        for (LatencyMetrics parent : this.parents)
+        {
+            parent.removeChildren(this);
+        }
         if (aliasFactory == null)
         {
             Metrics.remove(factory.createMetricName(namePrefix + "Latency"));
@@ -137,4 +175,98 @@ public class LatencyMetrics
             Metrics.remove(factory.createMetricName(namePrefix + 
"TotalLatency"), aliasFactory.createMetricName(namePrefix + "TotalLatency"));
         }
     }
+
+    class LatencyMetricsTimer extends Timer 
+    {
+
+        long releasedLatencyCount = 0;
+
+        public LatencyMetricsTimer(Reservoir reservoir) 
+        {
+            super(reservoir);
+        }
+
+        @Override
+        public long getCount()
+        {
+            long count = super.getCount() + releasedLatencyCount;
+            for (LatencyMetrics child : children)
+            {
+                count += child.latency.getCount();
+            }
+
+            return count;
+        }
+
+        @Override
+        public double getFifteenMinuteRate()
+        {
+            double rate = super.getFifteenMinuteRate();
+            for (LatencyMetrics child : children)
+            {
+                rate += child.latency.getFifteenMinuteRate();
+            }
+            return rate;
+        }
+
+        @Override
+        public double getFiveMinuteRate()
+        {
+            double rate = super.getFiveMinuteRate();
+            for (LatencyMetrics child : children)
+            {
+                rate += child.latency.getFiveMinuteRate();
+            }
+            return rate;
+        }
+
+        @Override
+        public double getMeanRate()
+        {
+            // Not necessarily 100% accurate, but close enough
+            double rate = super.getMeanRate();
+            for (LatencyMetrics child : children)
+            {
+                rate += child.latency.getMeanRate();
+            }
+            return rate;
+        }
+
+        @Override
+        public double getOneMinuteRate()
+        {
+            double rate = super.getOneMinuteRate();
+            for (LatencyMetrics child : children)
+            {
+                rate += child.latency.getOneMinuteRate();
+            }
+            return rate;
+        }
+
+        @Override
+        public Snapshot getSnapshot()
+        {
+            
DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot parent 
= (DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot) 
super.getSnapshot();
+            for (LatencyMetrics child : children)
+            {
+                parent.add(child.latency.getSnapshot());
+            }
+
+            return parent;
+        }
+    }
+
+    class LatencyMetricsCounter extends Counter 
+    {
+        @Override
+        public long getCount()
+        {
+            long count = super.getCount();
+            for (LatencyMetrics child : children)
+            {
+                count += child.totalLatency.getCount();
+            }
+            return count;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e744e76/test/microbench/org/apache/cassandra/test/microbench/LatencyTrackingBench.java
----------------------------------------------------------------------
diff --git 
a/test/microbench/org/apache/cassandra/test/microbench/LatencyTrackingBench.java
 
b/test/microbench/org/apache/cassandra/test/microbench/LatencyTrackingBench.java
new file mode 100644
index 0000000..28e0da7
--- /dev/null
+++ 
b/test/microbench/org/apache/cassandra/test/microbench/LatencyTrackingBench.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.test.microbench;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Timer;
+import org.apache.cassandra.metrics.CassandraMetricsRegistry;
+import org.apache.cassandra.metrics.ClearableHistogram;
+import org.apache.cassandra.metrics.DecayingEstimatedHistogramReservoir;
+import org.apache.cassandra.metrics.LatencyMetrics;
+import org.apache.cassandra.metrics.LatencyMetricsTest;
+import org.apache.cassandra.metrics.MetricNameFactory;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.CompilerControl;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1,jvmArgsAppend = { "-Xmx512M", "-Djmh.executor=CUSTOM", 
"-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"})
+@Threads(4) // make sure this matches the number of _physical_cores_
+@State(Scope.Benchmark)
+public class LatencyTrackingBench
+{
+    private LatencyMetrics metrics;
+    private LatencyMetrics parent;
+    private LatencyMetrics grandParent;
+    private DecayingEstimatedHistogramReservoir dehr;
+    private final MetricNameFactory factory = new BenchMetricsNameFactory();
+    private long[] values = new long[1024];
+
+    class BenchMetricsNameFactory implements MetricNameFactory
+    {
+
+        @Override
+        public CassandraMetricsRegistry.MetricName createMetricName(String 
metricName)
+        {
+            return new 
CassandraMetricsRegistry.MetricName(BenchMetricsNameFactory.class, metricName);
+        }
+    }
+
+    @Setup(Level.Iteration)
+    public void setup() 
+    {
+        parent = new LatencyMetrics("test", "testCF");
+        grandParent = new LatencyMetrics("test", "testCF");
+
+        // Replicates behavior from ColumnFamilyStore metrics
+        metrics = new LatencyMetrics(factory, "testCF", parent, grandParent);
+        dehr = new DecayingEstimatedHistogramReservoir(false);
+        for(int i = 0; i < 1024; i++) 
+        {
+            values[i] = 
TimeUnit.MICROSECONDS.toNanos(ThreadLocalRandom.current().nextLong(346));
+        }
+    }
+
+    @Setup(Level.Invocation)
+    public void reset() 
+    {
+        dehr = new DecayingEstimatedHistogramReservoir(false);
+        metrics.release();
+        metrics = new LatencyMetrics(factory, "testCF", parent, grandParent);
+    }
+
+    @Benchmark
+    @OperationsPerInvocation(1024)
+    public void benchLatencyMetricsWrite() 
+    {
+        for(int i = 0; i < values.length; i++) 
+        {
+            metrics.addNano(values[i]);
+        }
+    }
+
+    @Benchmark
+    @OperationsPerInvocation(1024)
+    public void benchInsertToDEHR(Blackhole bh) 
+    {
+        for(int i = 0; i < values.length; i++) 
+        {
+            dehr.update(values[i]);
+        }
+        bh.consume(dehr);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e744e76/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java
 
b/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java
index ef1fed3..5cfd927 100644
--- 
a/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java
+++ 
b/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java
@@ -380,6 +380,41 @@ public class DecayingEstimatedHistogramReservoirTest
         }
     }
 
+    @Test
+    public void testAggregation()
+    {
+        TestClock clock = new TestClock();
+
+        DecayingEstimatedHistogramReservoir histogram = new 
DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION,
 DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock);
+        DecayingEstimatedHistogramReservoir another = new 
DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION,
 DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock);
+
+        
clock.addMillis(DecayingEstimatedHistogramReservoir.LANDMARK_RESET_INTERVAL_IN_MS
 - 1_000L);
+
+        histogram.update(1000);
+        clock.addMillis(100);
+        another.update(2000);
+        clock.addMillis(100);
+        histogram.update(2000);
+        clock.addMillis(100);
+        another.update(3000);
+        clock.addMillis(100);
+        histogram.update(3000);
+        clock.addMillis(100);
+        another.update(4000);
+
+        
DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot 
snapshot = 
(DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot) 
histogram.getSnapshot();
+        
DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot 
anotherSnapshot = 
(DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot) 
another.getSnapshot();
+
+        assertEquals(2000, snapshot.getMean(), 500D);
+        assertEquals(3000, anotherSnapshot.getMean(), 500D);
+
+        snapshot.add(anotherSnapshot);
+
+        // Another had newer decayLandmark, the aggregated snapshot should use 
it
+        assertEquals(anotherSnapshot.getSnapshotLandmark(), 
snapshot.getSnapshotLandmark());
+        assertEquals(2500, snapshot.getMean(), 500D);
+    }
+
     private void assertEstimatedQuantile(long expectedValue, double 
actualValue)
     {
         assertTrue("Expected at least [" + expectedValue + "] but actual is [" 
+ actualValue + "]", actualValue >= expectedValue);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e744e76/test/unit/org/apache/cassandra/metrics/LatencyMetricsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/metrics/LatencyMetricsTest.java 
b/test/unit/org/apache/cassandra/metrics/LatencyMetricsTest.java
index 62cb88e..d61c550 100644
--- a/test/unit/org/apache/cassandra/metrics/LatencyMetricsTest.java
+++ b/test/unit/org/apache/cassandra/metrics/LatencyMetricsTest.java
@@ -18,12 +18,27 @@
 
 package org.apache.cassandra.metrics;
 
+import java.util.concurrent.TimeUnit;
+
 import org.junit.Test;
 
+import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertFalse;
 
 public class LatencyMetricsTest
 {
+    private final MetricNameFactory factory = new TestMetricsNameFactory();
+
+    private class TestMetricsNameFactory implements MetricNameFactory
+    {
+
+        @Override
+        public CassandraMetricsRegistry.MetricName createMetricName(String 
metricName)
+        {
+            return new 
CassandraMetricsRegistry.MetricName(TestMetricsNameFactory.class, metricName);
+        }
+    }
+
     /**
      * Test bitsets in a "real-world" environment, i.e., bloom filters
      */
@@ -31,14 +46,10 @@ public class LatencyMetricsTest
     public void testGetRecentLatency()
     {
         final LatencyMetrics l = new LatencyMetrics("test", "test");
-        Runnable r = new Runnable()
-        {
-            public void run()
+        Runnable r = () -> {
+            for (int i = 0; i < 10000; i++)
             {
-                for (int i = 0; i < 10000; i++)
-                {
-                    l.addNano(1000);
-                }
+                l.addNano(1000);
             }
         };
         new Thread(r).start();
@@ -49,4 +60,49 @@ public class LatencyMetricsTest
             assertFalse(recent.equals(Double.POSITIVE_INFINITY));
         }
     }
+
+    /**
+     * Test that parent LatencyMetrics are receiving updates from child 
metrics when reading
+     */
+    @Test
+    public void testReadMerging()
+    {
+        final LatencyMetrics parent = new LatencyMetrics("testMerge", 
"testMerge");
+        final LatencyMetrics child = new LatencyMetrics(factory, "testChild", 
parent);
+
+        for (int i = 0; i < 100; i++)
+        {
+            child.addNano(TimeUnit.NANOSECONDS.convert(i, 
TimeUnit.MILLISECONDS));
+        }
+
+        assertEquals(4950000, child.totalLatency.getCount());
+        assertEquals(child.totalLatency.getCount(), 
parent.totalLatency.getCount());
+        assertEquals(child.latency.getSnapshot().getMean(), 
parent.latency.getSnapshot().getMean(), 50D);
+
+        child.release();
+        parent.release();
+    }
+
+    @Test
+    public void testRelease()
+    {
+        final LatencyMetrics parent = new LatencyMetrics("testRelease", 
"testRelease");
+        final LatencyMetrics child = new LatencyMetrics(factory, 
"testChildRelease", parent);
+
+        for (int i = 0; i < 100; i++)
+        {
+            child.addNano(TimeUnit.NANOSECONDS.convert(i, 
TimeUnit.MILLISECONDS));
+        }
+
+        double mean = parent.latency.getSnapshot().getMean();
+        long count = parent.totalLatency.getCount();
+
+        child.release();
+
+        // Check that no value was lost with the release
+        assertEquals(count, parent.totalLatency.getCount());
+        assertEquals(mean, parent.latency.getSnapshot().getMean(), 50D);
+
+        parent.release();
+    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to