This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 1cd0b382143ec56118105a6ed991c0803f400b18
Author: Sam Tunnicliffe <s...@apache.org>
AuthorDate: Fri Jul 5 09:11:56 2024 +0100

    Revert to localised start time for coordinator read latency measurement
    
    Patch by Sam Tunnicliffe; reviewed by Marcus Ericsson for
    CASSANDRA-19755
---
 .../org/apache/cassandra/service/StorageProxy.java |  18 ++--
 .../org/apache/cassandra/service/paxos/Paxos.java  |   9 +-
 .../metrics/CoordinatorReadLatencyMetricTest.java  | 116 +++++++++++++++++++++
 3 files changed, 134 insertions(+), 9 deletions(-)

diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 8404a50808..56e038be45 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1866,6 +1866,7 @@ public class StorageProxy implements StorageProxyMBean
     private static PartitionIterator 
legacyReadWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel 
consistencyLevel, Dispatcher.RequestTime requestTime)
     throws InvalidRequestException, UnavailableException, 
ReadFailureException, ReadTimeoutException
     {
+        long start = nanoTime();
         if (group.queries.size() > 1)
             throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency 
may only be requested for one partition at a time");
 
@@ -1946,9 +1947,11 @@ public class StorageProxy implements StorageProxyMBean
         }
         finally
         {
-            // We track latency based on request processing time, since the 
amount of time that request spends in the queue
-            // is not a representative metric of replica performance.
-            long latency = nanoTime() - requestTime.startedAtNanos();
+            // We don't base latency tracking on the startedAtNanos of the 
RequestTime because queries which involve
+            // internal paging may be composed of multiple distinct reads, 
whereas RequestTime relates to the single
+            // client request. This is a measure of how long this specific 
individual read took, not total time since
+            // processing of the client began.
+            long latency = nanoTime() - start;
             readMetrics.addNano(latency);
             casReadMetrics.addNano(latency);
             readMetricsForLevel(consistencyLevel).addNano(latency);
@@ -1962,6 +1965,7 @@ public class StorageProxy implements StorageProxyMBean
     private static PartitionIterator 
readRegular(SinglePartitionReadCommand.Group group, ConsistencyLevel 
consistencyLevel, Dispatcher.RequestTime requestTime)
     throws UnavailableException, ReadFailureException, ReadTimeoutException
     {
+        long start = nanoTime();
         try
         {
             PartitionIterator result = fetchRows(group.queries, 
consistencyLevel, requestTime);
@@ -2001,9 +2005,11 @@ public class StorageProxy implements StorageProxyMBean
         }
         finally
         {
-            // We track latency based on request processing time, since the 
amount of time that request spends in the queue
-            // is not a representative metric of replica performance.
-            long latency = nanoTime() - requestTime.startedAtNanos();
+            // We don't base latency tracking on the startedAtNanos of the 
RequestTime because queries which involve
+            // internal paging may be composed of multiple distinct reads, 
whereas RequestTime relates to the single
+            // client request. This is a measure of how long this specific 
individual read took, not total time since
+            // processing of the client began.
+            long latency = nanoTime() - start;
             readMetrics.addNano(latency);
             readMetricsForLevel(consistencyLevel).addNano(latency);
             // TODO avoid giving every command the same latency number.  Can 
fix this in CASSADRA-5329
diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java 
b/src/java/org/apache/cassandra/service/paxos/Paxos.java
index c5540707d4..36968d8160 100644
--- a/src/java/org/apache/cassandra/service/paxos/Paxos.java
+++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java
@@ -825,6 +825,7 @@ public class Paxos
     private static PartitionIterator read(SinglePartitionReadCommand.Group 
group, ConsistencyLevel consistencyForConsensus, Dispatcher.RequestTime 
requestTime, long deadline)
             throws InvalidRequestException, UnavailableException, 
ReadFailureException, ReadTimeoutException
     {
+        long start = nanoTime();
         if (group.queries.size() > 1)
             throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency 
may only be requested for one partition at a time");
 
@@ -889,9 +890,11 @@ public class Paxos
         }
         finally
         {
-            // We track latency based on request processing time, since the 
amount of time that request spends in the queue
-            // is not a representative metric of replica performance.
-            long latency = nanoTime() - requestTime.startedAtNanos();
+            // We don't base latency tracking on the startedAtNanos of the 
RequestTime because queries which involve
+            // internal paging may be composed of multiple distinct reads, 
whereas RequestTime relates to the single
+            // client request. This is a measure of how long this specific 
individual read took, not total time since
+            // processing of the client began.
+            long latency = nanoTime() - start;
             readMetrics.addNano(latency);
             casReadMetrics.addNano(latency);
             readMetricsMap.get(consistencyForConsensus).addNano(latency);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/metrics/CoordinatorReadLatencyMetricTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/metrics/CoordinatorReadLatencyMetricTest.java
new file mode 100644
index 0000000000..ab3de57cb6
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/metrics/CoordinatorReadLatencyMetricTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.metrics;
+
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.metrics.ClientRequestsMetricsHolder;
+import org.apache.cassandra.service.paxos.Paxos;
+
+import static org.junit.Assert.assertTrue;
+
+public class CoordinatorReadLatencyMetricTest extends TestBaseImpl
+{
+    @Test
+    public void internalPagingWithAggregateTest() throws Throwable
+    {
+        try (Cluster cluster = init(builder().withNodes(1).start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck 
int, v int, PRIMARY KEY (pk, ck))"));
+            for (int i = 0; i < 100; i++)
+                cluster.coordinator(1).execute(withKeyspace("insert into 
%s.tbl (pk, ck ,v) values (0, ?, 1)"), ConsistencyLevel.ALL, i);
+
+            // Serial and non-serial reads have separates code paths, so 
exercise them both
+            testAggregationQuery(cluster, ConsistencyLevel.ALL);
+            cluster.get(1).runOnInstance(() -> 
Paxos.setPaxosVariant(Config.PaxosVariant.v1));
+            testAggregationQuery(cluster, ConsistencyLevel.SERIAL);
+            cluster.get(1).runOnInstance(() -> 
Paxos.setPaxosVariant(Config.PaxosVariant.v2));
+            testAggregationQuery(cluster, ConsistencyLevel.SERIAL);
+        }
+    }
+
+    private void testAggregationQuery(Cluster cluster, ConsistencyLevel cl)
+    {
+        for (int sliceSize : new int[]{1, 100})
+        {
+            // This statement utilises an AggregationQueryPager, which breaks 
the slice being read into a
+            // number of subslices and performs a single read for each of 
them. The number of subpages is
+            // dictated by the pagesize, so for testing purposes we keep it to 
1 which ensures that the number
+            // of subpages is equal to overall slice size.
+            String query = withKeyspace("SELECT count(v) from %s.tbl WHERE 
pk=0 and ck < " + sliceSize);
+            verifyLatencyMetricsWhenPaging(cluster, 1, sliceSize, query, cl);
+        }
+    }
+
+    @Test
+    public void multiplePartitionKeyInClauseTest() throws Throwable
+    {
+        try (Cluster cluster = init(builder().withNodes(1).start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, v 
int, PRIMARY KEY (pk))"));
+            for (int i = 0; i < 100; i++)
+                    cluster.coordinator(1).execute(withKeyspace("insert into 
%s.tbl (pk, v) values (?, 1)"), ConsistencyLevel.ALL, i);
+
+            for (int partitionKeys : new int[] {1, 100})
+            {
+                // This statement translates to a single partition read for 
each value in the IN clause
+                // Latency metrics should be uniquely and independently 
recorded for each of these reads
+                // i.e. the timing of the read n does not include that of 
(n-1, n-2, n-3...)
+                String pkList = IntStream.range(0, partitionKeys)
+                                         .mapToObj(Integer::toString)
+                                         .collect(Collectors.joining(",", "(", 
")"));
+                String query = withKeyspace("SELECT pk, v FROM %s.tbl WHERE pk 
IN " + pkList);
+                // We only keep executing the single partition reads until we 
have enough results to fill a page, so
+                // keep pagesize >= the number of partition keys in the IN 
clause to ensure that we read them all
+                verifyLatencyMetricsWhenPaging(cluster, 100, partitionKeys, 
query, ConsistencyLevel.ALL);
+            }
+        }
+    }
+
+    private void verifyLatencyMetricsWhenPaging(Cluster cluster,
+                                                int pagesize,
+                                                int expectedQueries,
+                                                String query,
+                                                ConsistencyLevel 
consistencyLevel)
+    {
+        long countBefore = cluster.get(1).callOnInstance(() -> 
ClientRequestsMetricsHolder.readMetrics.latency.getCount());
+        long totalLatencyBefore = cluster.get(1).callOnInstance(() -> 
ClientRequestsMetricsHolder.readMetrics.totalLatency.getCount());
+        long startTime = System.nanoTime();
+        cluster.coordinator(1).executeWithPaging(query, consistencyLevel, 
pagesize);
+        long elapsedTime = System.nanoTime() - startTime;
+        long countAfter = cluster.get(1).callOnInstance(() -> 
ClientRequestsMetricsHolder.readMetrics.latency.getCount());
+        long totalLatencyAfter = cluster.get(1).callOnInstance(() -> 
ClientRequestsMetricsHolder.readMetrics.totalLatency.getCount());
+
+        long latenciesRecorded = countAfter - countBefore;
+        assertTrue("Expected to have recorded at least 1 latency measurement 
per-individual read", latenciesRecorded >= expectedQueries);
+
+        long totalLatencyRecorded = 
TimeUnit.MICROSECONDS.toNanos(totalLatencyAfter - totalLatencyBefore);
+        assertTrue(String.format("Total latency delta %s should not exceed 
wall clock time elapsed %s", totalLatencyRecorded, elapsedTime),
+                   totalLatencyRecorded <= elapsedTime);
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to