This is an automated email from the ASF dual-hosted git repository.

djoshi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d497c8c  Improved testing for batch metrics.
d497c8c is described below

commit d497c8c9f3f9a19e0193c1c463f684b75aeb7081
Author: Stephen Mallette <sp...@genoprime.com>
AuthorDate: Tue Mar 31 13:47:44 2020 -0400

    Improved testing for batch metrics.
    
    Refactored the test to include assertion metrics for counter batches
    in addition to what was previously tested in logged and unlogged batches.
    Modified tests to assert random ranges of batches, partitions and 
statements,
    printing the seed for the Random on failure so that the error state could be
    recreated.
    
    Patch by Stephen Mallette; Reviewed by David Capwell and Dinesh Joshi for 
CASSANDRA-15718
---
 .../DecayingEstimatedHistogramReservoir.java       |  64 +++++++--
 .../apache/cassandra/metrics/BatchMetricsTest.java | 160 +++++++++++++++------
 2 files changed, 171 insertions(+), 53 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java
 
b/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java
index 6d24314..6dd1687 100644
--- 
a/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java
+++ 
b/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java
@@ -22,6 +22,7 @@ import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.nio.charset.StandardCharsets;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongArray;
 
@@ -41,7 +42,7 @@ import static java.lang.Math.min;
  * collected in the previous minute. Measured values are collected in variable 
sized buckets, using small buckets in the
  * lower range and larger buckets in the upper range. Use this histogram when 
you want to know if the distribution of
  * the underlying data stream has changed recently and you want high 
resolution on values in the lower range.
- *
+ * <p/>
  * The histogram use forward decay [1] to make recent values more significant. 
The forward decay factor will be doubled
  * every minute (half-life time set to 60 seconds) [2]. The forward decay 
landmark is reset every 30 minutes (or at
  * first read/update after 30 minutes). During landmark reset, updates and 
reads in the reservoir will be blocked in a
@@ -49,29 +50,31 @@ import static java.lang.Math.min;
  * assumption that in an extreme case we would have to collect a metric 1M 
times for a single bucket each second. By the
  * end of the 30:th minute all collected values will roughly add up to 
1.000.000 * 60 * pow(2, 30) which can be
  * represented with 56 bits giving us some head room in a signed 64 bit long.
- *
+ * <p/>
  * Internally two reservoirs are maintained, one with decay and one without 
decay. All public getters in a {@link Snapshot}
  * will expose the decay functionality with the exception of the {@link 
Snapshot#getValues()} which will return values
  * from the reservoir without decay. This makes it possible for the caller to 
maintain precise deltas in an interval of
- * its choise.
- *
+ * its choice.
+ * <p/>
  * The bucket size starts at 1 and grows by 1.2 each time (rounding and 
removing duplicates). It goes from 1 to around
  * 18T by default (creating 164+1 buckets), which will give a timing 
resolution from microseconds to roughly 210 days,
  * with less precision as the numbers get larger.
- *
+ * <p/>
  * The series of values to which the counts in `decayingBuckets` correspond:
  * 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 14, 17, 20, 24, 29, 35, 42, 50, 60, 72 etc.
  * Thus, a `decayingBuckets` of [0, 0, 1, 10] would mean we had seen 1 value 
of 3 and 10 values of 4.
- *
+ * <p/>
  * Each bucket represents values from (previous bucket offset, current offset].
- *
+ * <p/>
  * To reduce contention each logical bucket is striped accross a configurable 
number of stripes (default: 2). Threads are
  * assigned to specific stripes. In addition, logical buckets are distributed 
across the physical storage to reduce conention
  * when logically adjacent buckets are updated. See CASSANDRA-15213.
- *
- * [1]: http://dimacs.rutgers.edu/~graham/pubs/papers/fwddecay.pdf
- * [2]: https://en.wikipedia.org/wiki/Half-life
- * [3]: 
https://github.com/dropwizard/metrics/blob/v3.1.2/metrics-core/src/main/java/com/codahale/metrics/ExponentiallyDecayingReservoir.java
+ * <p/>
+ * <ul>
+ *   <li>[1]: http://dimacs.rutgers.edu/~graham/pubs/papers/fwddecay.pdf</li>
+ *   <li>[2]: https://en.wikipedia.org/wiki/Half-life</li>
+ *   <li>[3]: 
https://github.com/dropwizard/metrics/blob/v3.1.2/metrics-core/src/main/java/com/codahale/metrics/ExponentiallyDecayingReservoir.java</li>
+ * </ul>
  */
 public class DecayingEstimatedHistogramReservoir implements Reservoir
 {
@@ -522,6 +525,15 @@ public class DecayingEstimatedHistogramReservoir 
implements Reservoir
             return snapshotLandmark;
         }
 
+        @VisibleForTesting
+        public Range getBucketingRangeForValue(long value)
+        {
+            int index = findIndex(bucketOffsets, value);
+            long max = bucketOffsets[index];
+            long min = index == 0 ? 0 : 1 + bucketOffsets[index - 1];
+            return new Range(min, max);
+        }
+
         /**
          * Return the number of registered values taking forward decay into 
account.
          *
@@ -538,7 +550,7 @@ public class DecayingEstimatedHistogramReservoir implements 
Reservoir
         /**
          * Get the estimated max-value that could have been added to this 
reservoir.
          *
-         * As values are collected in variable sized buckets, the actual max 
value recored in the reservoir may be less
+         * As values are collected in variable sized buckets, the actual max 
value recorded in the reservoir may be less
          * than the value returned.
          *
          * @return the largest value that could have been added to this 
reservoir, or Long.MAX_VALUE if the reservoir
@@ -587,7 +599,7 @@ public class DecayingEstimatedHistogramReservoir implements 
Reservoir
         /**
          * Get the estimated min-value that could have been added to this 
reservoir.
          *
-         * As values are collected in variable sized buckets, the actual min 
value recored in the reservoir may be
+         * As values are collected in variable sized buckets, the actual min 
value recorded in the reservoir may be
          * higher than the value returned.
          *
          * @return the smallest value that could have been added to this 
reservoir
@@ -715,4 +727,30 @@ public class DecayingEstimatedHistogramReservoir 
implements Reservoir
             this.reservoir.rebase(this);
         }
     }
+
+    static class Range
+    {
+        public final long min;
+        public final long max;
+
+        public Range(long min, long max)
+        {
+            this.min = min;
+            this.max = max;
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Range that = (Range) o;
+            return min == that.min &&
+                   max == that.max;
+        }
+
+        public int hashCode()
+        {
+            return Objects.hash(min, max);
+        }
+    }
 }
diff --git a/test/unit/org/apache/cassandra/metrics/BatchMetricsTest.java 
b/test/unit/org/apache/cassandra/metrics/BatchMetricsTest.java
index 968480b..900ac75 100644
--- a/test/unit/org/apache/cassandra/metrics/BatchMetricsTest.java
+++ b/test/unit/org/apache/cassandra/metrics/BatchMetricsTest.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.metrics;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -34,23 +35,34 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.service.EmbeddedCassandraService;
+import 
org.apache.cassandra.metrics.DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
 import static org.apache.cassandra.cql3.statements.BatchStatement.metrics;
+import static 
org.apache.cassandra.metrics.DecayingEstimatedHistogramReservoir.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.quicktheories.QuickTheory.qt;
+import static org.quicktheories.generators.Generate.intArrays;
+import static org.quicktheories.generators.SourceDSL.integers;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class BatchMetricsTest extends SchemaLoader
 {
+    private static final int MAX_ROUNDS_TO_PERFORM = 3;
+    private static final int MAX_DISTINCT_PARTITIONS = 128;
+    private static final int MAX_STATEMENTS_PER_ROUND = 32;
+
     private static EmbeddedCassandraService cassandra;
 
     private static Cluster cluster;
     private static Session session;
 
     private static String KEYSPACE = "junit";
-    private static final String TABLE = "batchmetricstest";
+    private static final String LOGGER_TABLE = "loggerbatchmetricstest";
+    private static final String COUNTER_TABLE = "counterbatchmetricstest";
 
-    private static PreparedStatement ps;
+    private static PreparedStatement psLogger;
+    private static PreparedStatement psCounter;
 
     @BeforeClass()
     public static void setup() throws ConfigurationException, IOException
@@ -60,31 +72,34 @@ public class BatchMetricsTest extends SchemaLoader
         cassandra = new EmbeddedCassandraService();
         cassandra.start();
 
+        DatabaseDescriptor.setWriteRpcTimeout(TimeUnit.SECONDS.toMillis(10));
+
         cluster = 
Cluster.builder().addContactPoint("127.0.0.1").withPort(DatabaseDescriptor.getNativeTransportPort()).build();
         session = cluster.connect();
 
         session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH 
replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };");
         session.execute("USE " + KEYSPACE);
-        session.execute("CREATE TABLE IF NOT EXISTS " + TABLE + " (id int 
PRIMARY KEY, val text);");
+        session.execute("CREATE TABLE IF NOT EXISTS " + LOGGER_TABLE + " (id 
int PRIMARY KEY, val text);");
+        session.execute("CREATE TABLE IF NOT EXISTS " + COUNTER_TABLE + " (id 
int PRIMARY KEY, val counter);");
 
-        ps = session.prepare("INSERT INTO " + KEYSPACE + '.' + TABLE + " (id, 
val) VALUES (?, ?);");
+        psLogger = session.prepare("INSERT INTO " + KEYSPACE + '.' + 
LOGGER_TABLE + " (id, val) VALUES (?, ?);");
+        psCounter = session.prepare("UPDATE " + KEYSPACE + '.' + COUNTER_TABLE 
+ " SET val = val + 1 WHERE id = ?;");
     }
 
-    private void executeBatch(boolean isLogged, int distinctPartitions, int 
statementsPerPartition)
+    private void executeLoggerBatch(BatchStatement.Type batchStatementType, 
int distinctPartitions, int statementsPerPartition)
     {
-        BatchStatement.Type batchType;
-
-        if (isLogged) {
-            batchType = BatchStatement.Type.LOGGED;
-        } else {
-            batchType = BatchStatement.Type.UNLOGGED;
-        }
-
-        BatchStatement batch = new BatchStatement(batchType);
-
-        for (int i=0; i<distinctPartitions; i++) {
-            for (int j=0; j<statementsPerPartition; j++) {
-                batch.add(ps.bind(i, "aaaaaaaa"));
+        BatchStatement batch = new BatchStatement(batchStatementType);
+
+        for (int i = 0; i < distinctPartitions; i++)
+        {
+            for (int j = 0; j < statementsPerPartition; j++)
+            {
+                if (batchStatementType == BatchStatement.Type.UNLOGGED || 
batchStatementType == BatchStatement.Type.LOGGED)
+                    batch.add(psLogger.bind(i, "aaaaaaaa"));
+                else if (batchStatementType == BatchStatement.Type.COUNTER)
+                    batch.add(psCounter.bind(i));
+                else
+                    throw new IllegalStateException("There is no a case for 
BatchStatement.Type." + batchStatementType.name());
             }
         }
 
@@ -92,28 +107,93 @@ public class BatchMetricsTest extends SchemaLoader
     }
 
     @Test
-    public void testLoggedPartitionsPerBatch() {
-        int partitionsPerBatchCountPre = (int) 
metrics.partitionsPerLoggedBatch.getCount();
-        executeBatch(true, 10, 2);
-        assertEquals(partitionsPerBatchCountPre+1, 
metrics.partitionsPerLoggedBatch.getCount());
-        assertTrue(partitionsPerBatchCountPre <= 
metrics.partitionsPerLoggedBatch.getSnapshot().getMax()); // decayingBuckets 
may not have exact value
-
-        partitionsPerBatchCountPre = (int) 
metrics.partitionsPerLoggedBatch.getCount();
-        executeBatch(true, 21, 2);
-        assertEquals(partitionsPerBatchCountPre+1, 
metrics.partitionsPerLoggedBatch.getCount());
-        assertTrue(partitionsPerBatchCountPre <= 
metrics.partitionsPerLoggedBatch.getSnapshot().getMax());
+    public void testLoggedPartitionsPerBatch()
+    {
+        qt().withExamples(25)
+            .forAll(intArrays(integers().between(1, MAX_ROUNDS_TO_PERFORM),
+                              integers().between(1, MAX_STATEMENTS_PER_ROUND)),
+                    integers().between(1, MAX_DISTINCT_PARTITIONS))
+            .checkAssert((rounds, distinctPartitions) ->
+                         assertMetrics(BatchStatement.Type.LOGGED, rounds, 
distinctPartitions));
+    }
+
+    @Test
+    public void testUnloggedPartitionsPerBatch()
+    {
+        qt().withExamples(25)
+            .forAll(intArrays(integers().between(1, MAX_ROUNDS_TO_PERFORM),
+                              integers().between(1, MAX_STATEMENTS_PER_ROUND)),
+                    integers().between(1, MAX_DISTINCT_PARTITIONS))
+            .checkAssert((rounds, distinctPartitions) ->
+                         assertMetrics(BatchStatement.Type.UNLOGGED, rounds, 
distinctPartitions));
     }
 
     @Test
-    public void testUnloggedPartitionsPerBatch() {
-        int partitionsPerBatchCountPre = (int) 
metrics.partitionsPerUnloggedBatch.getCount();
-        executeBatch(false, 7, 2);
-        assertEquals(partitionsPerBatchCountPre+1, 
metrics.partitionsPerUnloggedBatch.getCount());
-        assertTrue(partitionsPerBatchCountPre <= 
metrics.partitionsPerUnloggedBatch.getSnapshot().getMax());
-
-        partitionsPerBatchCountPre = (int) 
metrics.partitionsPerUnloggedBatch.getCount();
-        executeBatch(false, 25, 2);
-        assertEquals(partitionsPerBatchCountPre+1, 
metrics.partitionsPerUnloggedBatch.getCount());
-        assertTrue(partitionsPerBatchCountPre <= 
metrics.partitionsPerUnloggedBatch.getSnapshot().getMax());
+    public void testCounterPartitionsPerBatch()
+    {
+        qt().withExamples(10)
+            .forAll(intArrays(integers().between(1, MAX_ROUNDS_TO_PERFORM),
+                              integers().between(1, MAX_STATEMENTS_PER_ROUND)),
+                    integers().between(1, MAX_DISTINCT_PARTITIONS))
+            .checkAssert((rounds, distinctPartitions) ->
+                         assertMetrics(BatchStatement.Type.COUNTER, rounds, 
distinctPartitions));
+    }
+
+    private void assertMetrics(BatchStatement.Type batchTypeTested, int[] 
rounds, int distinctPartitions)
+    {
+        // reset the histogram between runs
+        clearHistogram();
+
+        // roundsOfStatementsPerPartition - array length is the number of 
rounds to executeLoggerBatch() and each
+        // value in the array represents the number of statements to execute 
per partition on that round
+        for (int ix = 0; ix < rounds.length; ix++)
+        {
+            long partitionsPerLoggedBatchCountPre = 
metrics.partitionsPerLoggedBatch.getCount();
+            long expectedPartitionsPerLoggedBatchCount = 
partitionsPerLoggedBatchCountPre + (batchTypeTested == 
BatchStatement.Type.LOGGED ? 1 : 0);
+            long partitionsPerUnloggedBatchCountPre = 
metrics.partitionsPerUnloggedBatch.getCount();
+            long expectedPartitionsPerUnloggedBatchCount = 
partitionsPerUnloggedBatchCountPre + (batchTypeTested == 
BatchStatement.Type.UNLOGGED ? 1 : 0);
+            long partitionsPerCounterBatchCountPre = 
metrics.partitionsPerCounterBatch.getCount();
+            long expectedPartitionsPerCounterBatchCount = 
partitionsPerCounterBatchCountPre + (batchTypeTested == 
BatchStatement.Type.COUNTER ? 1 : 0);
+
+            executeLoggerBatch(batchTypeTested, distinctPartitions, 
rounds[ix]);
+
+            assertEquals(expectedPartitionsPerUnloggedBatchCount, 
metrics.partitionsPerUnloggedBatch.getCount());
+            assertEquals(expectedPartitionsPerLoggedBatchCount, 
metrics.partitionsPerLoggedBatch.getCount());
+            assertEquals(expectedPartitionsPerCounterBatchCount, 
metrics.partitionsPerCounterBatch.getCount());
+
+            EstimatedHistogramReservoirSnapshot 
partitionsPerLoggedBatchSnapshot = (EstimatedHistogramReservoirSnapshot) 
metrics.partitionsPerLoggedBatch.getSnapshot();
+            EstimatedHistogramReservoirSnapshot 
partitionsPerUnloggedBatchSnapshot = (EstimatedHistogramReservoirSnapshot) 
metrics.partitionsPerUnloggedBatch.getSnapshot();
+            EstimatedHistogramReservoirSnapshot 
partitionsPerCounterBatchSnapshot = (EstimatedHistogramReservoirSnapshot) 
metrics.partitionsPerCounterBatch.getSnapshot();
+
+            // BatchMetrics uses DecayingEstimatedHistogramReservoir which 
notes that the return of getMax()
+            // may be more than the actual max value recorded in the reservoir 
with similar but reverse properties
+            // for getMin(). uses getBucketingForValue() on the snapshot to 
identify the exact max. since the
+            // distinctPartitions doesn't change per test round these values 
shouldn't change.
+            Range expectedPartitionsPerLoggedBatchMinMax = batchTypeTested == 
BatchStatement.Type.LOGGED ?
+                                                           
determineExpectedMinMax(partitionsPerLoggedBatchSnapshot, distinctPartitions) :
+                                                           new Range(0L, 0L);
+            Range expectedPartitionsPerUnloggedBatchMinMax = batchTypeTested 
== BatchStatement.Type.UNLOGGED ?
+                                                             
determineExpectedMinMax(partitionsPerUnloggedBatchSnapshot, distinctPartitions) 
:
+                                                             new Range(0L, 0L);
+            Range expectedPartitionsPerCounterBatchMinMax = batchTypeTested == 
BatchStatement.Type.COUNTER ?
+                                                            
determineExpectedMinMax(partitionsPerCounterBatchSnapshot, distinctPartitions) :
+                                                            new Range(0L, 0L);
+
+            assertEquals(expectedPartitionsPerLoggedBatchMinMax, new 
Range(partitionsPerLoggedBatchSnapshot.getMin(), 
partitionsPerLoggedBatchSnapshot.getMax()));
+            assertEquals(expectedPartitionsPerUnloggedBatchMinMax, new 
Range(partitionsPerUnloggedBatchSnapshot.getMin(), 
partitionsPerUnloggedBatchSnapshot.getMax()));
+            assertEquals(expectedPartitionsPerCounterBatchMinMax, new 
Range(partitionsPerCounterBatchSnapshot.getMin(), 
partitionsPerCounterBatchSnapshot.getMax()));
+        }
+    }
+
+    private void clearHistogram()
+    {
+        ((ClearableHistogram) metrics.partitionsPerLoggedBatch).clear();
+        ((ClearableHistogram) metrics.partitionsPerUnloggedBatch).clear();
+        ((ClearableHistogram) metrics.partitionsPerCounterBatch).clear();
+    }
+
+    private Range determineExpectedMinMax(EstimatedHistogramReservoirSnapshot 
snapshot, long value)
+    {
+        return snapshot.getBucketingRangeForValue(value);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to