This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 1cd0b382143ec56118105a6ed991c0803f400b18 Author: Sam Tunnicliffe <s...@apache.org> AuthorDate: Fri Jul 5 09:11:56 2024 +0100 Revert to localised start time for coordinator read latency measurement Patch by Sam Tunnicliffe; reviewed by Marcus Ericsson for CASSANDRA-19755 --- .../org/apache/cassandra/service/StorageProxy.java | 18 ++-- .../org/apache/cassandra/service/paxos/Paxos.java | 9 +- .../metrics/CoordinatorReadLatencyMetricTest.java | 116 +++++++++++++++++++++ 3 files changed, 134 insertions(+), 9 deletions(-) diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 8404a50808..56e038be45 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1866,6 +1866,7 @@ public class StorageProxy implements StorageProxyMBean private static PartitionIterator legacyReadWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, Dispatcher.RequestTime requestTime) throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException { + long start = nanoTime(); if (group.queries.size() > 1) throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time"); @@ -1946,9 +1947,11 @@ public class StorageProxy implements StorageProxyMBean } finally { - // We track latency based on request processing time, since the amount of time that request spends in the queue - // is not a representative metric of replica performance. - long latency = nanoTime() - requestTime.startedAtNanos(); + // We don't base latency tracking on the startedAtNanos of the RequestTime because queries which involve + // internal paging may be composed of multiple distinct reads, whereas RequestTime relates to the single + // client request. This is a measure of how long this specific individual read took, not total time since + // processing of the client began. + long latency = nanoTime() - start; readMetrics.addNano(latency); casReadMetrics.addNano(latency); readMetricsForLevel(consistencyLevel).addNano(latency); @@ -1962,6 +1965,7 @@ public class StorageProxy implements StorageProxyMBean private static PartitionIterator readRegular(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, Dispatcher.RequestTime requestTime) throws UnavailableException, ReadFailureException, ReadTimeoutException { + long start = nanoTime(); try { PartitionIterator result = fetchRows(group.queries, consistencyLevel, requestTime); @@ -2001,9 +2005,11 @@ public class StorageProxy implements StorageProxyMBean } finally { - // We track latency based on request processing time, since the amount of time that request spends in the queue - // is not a representative metric of replica performance. - long latency = nanoTime() - requestTime.startedAtNanos(); + // We don't base latency tracking on the startedAtNanos of the RequestTime because queries which involve + // internal paging may be composed of multiple distinct reads, whereas RequestTime relates to the single + // client request. This is a measure of how long this specific individual read took, not total time since + // processing of the client began. + long latency = nanoTime() - start; readMetrics.addNano(latency); readMetricsForLevel(consistencyLevel).addNano(latency); // TODO avoid giving every command the same latency number. Can fix this in CASSADRA-5329 diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java b/src/java/org/apache/cassandra/service/paxos/Paxos.java index c5540707d4..36968d8160 100644 --- a/src/java/org/apache/cassandra/service/paxos/Paxos.java +++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java @@ -825,6 +825,7 @@ public class Paxos private static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyForConsensus, Dispatcher.RequestTime requestTime, long deadline) throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException { + long start = nanoTime(); if (group.queries.size() > 1) throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time"); @@ -889,9 +890,11 @@ public class Paxos } finally { - // We track latency based on request processing time, since the amount of time that request spends in the queue - // is not a representative metric of replica performance. - long latency = nanoTime() - requestTime.startedAtNanos(); + // We don't base latency tracking on the startedAtNanos of the RequestTime because queries which involve + // internal paging may be composed of multiple distinct reads, whereas RequestTime relates to the single + // client request. This is a measure of how long this specific individual read took, not total time since + // processing of the client began. + long latency = nanoTime() - start; readMetrics.addNano(latency); casReadMetrics.addNano(latency); readMetricsMap.get(consistencyForConsensus).addNano(latency); diff --git a/test/distributed/org/apache/cassandra/distributed/test/metrics/CoordinatorReadLatencyMetricTest.java b/test/distributed/org/apache/cassandra/distributed/test/metrics/CoordinatorReadLatencyMetricTest.java new file mode 100644 index 0000000000..ab3de57cb6 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/metrics/CoordinatorReadLatencyMetricTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.metrics; + +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.Test; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.metrics.ClientRequestsMetricsHolder; +import org.apache.cassandra.service.paxos.Paxos; + +import static org.junit.Assert.assertTrue; + +public class CoordinatorReadLatencyMetricTest extends TestBaseImpl +{ + @Test + public void internalPagingWithAggregateTest() throws Throwable + { + try (Cluster cluster = init(builder().withNodes(1).start())) + { + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))")); + for (int i = 0; i < 100; i++) + cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (pk, ck ,v) values (0, ?, 1)"), ConsistencyLevel.ALL, i); + + // Serial and non-serial reads have separates code paths, so exercise them both + testAggregationQuery(cluster, ConsistencyLevel.ALL); + cluster.get(1).runOnInstance(() -> Paxos.setPaxosVariant(Config.PaxosVariant.v1)); + testAggregationQuery(cluster, ConsistencyLevel.SERIAL); + cluster.get(1).runOnInstance(() -> Paxos.setPaxosVariant(Config.PaxosVariant.v2)); + testAggregationQuery(cluster, ConsistencyLevel.SERIAL); + } + } + + private void testAggregationQuery(Cluster cluster, ConsistencyLevel cl) + { + for (int sliceSize : new int[]{1, 100}) + { + // This statement utilises an AggregationQueryPager, which breaks the slice being read into a + // number of subslices and performs a single read for each of them. The number of subpages is + // dictated by the pagesize, so for testing purposes we keep it to 1 which ensures that the number + // of subpages is equal to overall slice size. + String query = withKeyspace("SELECT count(v) from %s.tbl WHERE pk=0 and ck < " + sliceSize); + verifyLatencyMetricsWhenPaging(cluster, 1, sliceSize, query, cl); + } + } + + @Test + public void multiplePartitionKeyInClauseTest() throws Throwable + { + try (Cluster cluster = init(builder().withNodes(1).start())) + { + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, v int, PRIMARY KEY (pk))")); + for (int i = 0; i < 100; i++) + cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (pk, v) values (?, 1)"), ConsistencyLevel.ALL, i); + + for (int partitionKeys : new int[] {1, 100}) + { + // This statement translates to a single partition read for each value in the IN clause + // Latency metrics should be uniquely and independently recorded for each of these reads + // i.e. the timing of the read n does not include that of (n-1, n-2, n-3...) + String pkList = IntStream.range(0, partitionKeys) + .mapToObj(Integer::toString) + .collect(Collectors.joining(",", "(", ")")); + String query = withKeyspace("SELECT pk, v FROM %s.tbl WHERE pk IN " + pkList); + // We only keep executing the single partition reads until we have enough results to fill a page, so + // keep pagesize >= the number of partition keys in the IN clause to ensure that we read them all + verifyLatencyMetricsWhenPaging(cluster, 100, partitionKeys, query, ConsistencyLevel.ALL); + } + } + } + + private void verifyLatencyMetricsWhenPaging(Cluster cluster, + int pagesize, + int expectedQueries, + String query, + ConsistencyLevel consistencyLevel) + { + long countBefore = cluster.get(1).callOnInstance(() -> ClientRequestsMetricsHolder.readMetrics.latency.getCount()); + long totalLatencyBefore = cluster.get(1).callOnInstance(() -> ClientRequestsMetricsHolder.readMetrics.totalLatency.getCount()); + long startTime = System.nanoTime(); + cluster.coordinator(1).executeWithPaging(query, consistencyLevel, pagesize); + long elapsedTime = System.nanoTime() - startTime; + long countAfter = cluster.get(1).callOnInstance(() -> ClientRequestsMetricsHolder.readMetrics.latency.getCount()); + long totalLatencyAfter = cluster.get(1).callOnInstance(() -> ClientRequestsMetricsHolder.readMetrics.totalLatency.getCount()); + + long latenciesRecorded = countAfter - countBefore; + assertTrue("Expected to have recorded at least 1 latency measurement per-individual read", latenciesRecorded >= expectedQueries); + + long totalLatencyRecorded = TimeUnit.MICROSECONDS.toNanos(totalLatencyAfter - totalLatencyBefore); + assertTrue(String.format("Total latency delta %s should not exceed wall clock time elapsed %s", totalLatencyRecorded, elapsedTime), + totalLatencyRecorded <= elapsedTime); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org