This is an automated email from the ASF dual-hosted git repository. jwest pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 2d323cb565 Add row,tombstone,and sstable count to profileload 2d323cb565 is described below commit 2d323cb56572e867b13b6d102a61aaff8bd66c86 Author: Jordan West <jord...@netflix.com> AuthorDate: Wed Dec 21 12:10:42 2022 -0800 Add row,tombstone,and sstable count to profileload Patch by Jordan West; Reviewed by David Capwell for CASSANDRA-18022 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/ReadCommand.java | 18 ++++ .../cassandra/db/SinglePartitionReadCommand.java | 2 + src/java/org/apache/cassandra/metrics/Sampler.java | 17 ++++ .../org/apache/cassandra/metrics/TableMetrics.java | 34 +++++++ .../apache/cassandra/tools/TopPartitionsTest.java | 108 +++++++++++++++++++++ 6 files changed, 180 insertions(+) diff --git a/CHANGES.txt b/CHANGES.txt index bf760aa8dc..84136b255d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.2 + * Add row, tombstone, and sstable count to nodetool profileload (CASSANDRA-18022) * Coordinator level metrics for read response and mutation row and column counts (CASSANDRA-18155) * Add CQL functions for dynamic data masking (CASSANDRA-17941) * Print friendly error when nodetool attempts to connect to uninitialized server (CASSANDRA-11537) diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index d03650ff36..95fa95da91 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -501,7 +501,9 @@ public abstract class ReadCommand extends AbstractReadQuery private final boolean enforceStrictLiveness = metadata().enforceStrictLiveness(); private int liveRows = 0; + private int lastReportedLiveRows = 0; private int tombstones = 0; + private int lastReportedTombstones = 0; private DecoratedKey currentKey; @@ -568,6 +570,22 @@ public abstract class ReadCommand extends AbstractReadQuery } } + @Override + protected void onPartitionClose() + { + int lr = liveRows - lastReportedLiveRows; + int ts = tombstones - lastReportedTombstones; + + if (lr > 0) + metric.topReadPartitionRowCount.addSample(currentKey.getKey(), lr); + + if (ts > 0) + metric.topReadPartitionTombstoneCount.addSample(currentKey.getKey(), ts); + + lastReportedLiveRows = liveRows; + lastReportedTombstones = tombstones; + } + @Override public void onClose() { diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 963b9fee1c..34414bbea6 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -820,6 +820,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar { DecoratedKey key = merged.partitionKey(); metrics.topReadPartitionFrequency.addSample(key.getKey(), 1); + metrics.topReadPartitionSSTableCount.addSample(key.getKey(), metricsCollector.getMergedSSTables()); } class UpdateSstablesIterated extends Transformation<UnfilteredRowIterator> @@ -963,6 +964,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar DecoratedKey key = result.partitionKey(); cfs.metric.topReadPartitionFrequency.addSample(key.getKey(), 1); + cfs.metric.topReadPartitionSSTableCount.addSample(key.getKey(), metricsCollector.getMergedSSTables()); StorageHook.instance.reportRead(cfs.metadata.id, partitionKey()); return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed()); diff --git a/src/java/org/apache/cassandra/metrics/Sampler.java b/src/java/org/apache/cassandra/metrics/Sampler.java index de7f0b2e1a..6eb8508b9f 100644 --- a/src/java/org/apache/cassandra/metrics/Sampler.java +++ b/src/java/org/apache/cassandra/metrics/Sampler.java @@ -54,6 +54,23 @@ public abstract class Sampler<T> resultBuilder.forType(samplerType, samplerType.description) .addColumn("Query", "value") .addColumn("Microseconds", "count"))), + READ_ROW_COUNT("Partitions read with the most rows", ((samplerType, resultBuilder) -> + resultBuilder.forType(samplerType, samplerType.description) + .addColumn("Table", "table") + .addColumn("Partition", "value") + .addColumn("Rows", "count"))), + + READ_TOMBSTONE_COUNT("Partitions read with the most tombstones", ((samplerType, resultBuilder) -> + resultBuilder.forType(samplerType, samplerType.description) + .addColumn("Table", "table") + .addColumn("Partition", "value") + .addColumn("Tombstones", "count"))), + + READ_SSTABLE_COUNT("Partitions read with the most sstables", ((samplerType, resultBuilder) -> + resultBuilder.forType(samplerType, samplerType.description) + .addColumn("Table", "table") + .addColumn("Partition", "value") + .addColumn("SSTables", "count"))), WRITE_SIZE("Max mutation size by partition", ((samplerType, resultBuilder) -> resultBuilder.forType(samplerType, samplerType.description) .addColumn("Table", "table") diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index 8f3645dd1e..04102f7995 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -262,6 +262,12 @@ public class TableMetrics public final Sampler<ByteBuffer> topCasPartitionContention; /** When sampler activated, will track the slowest local reads **/ public final Sampler<String> topLocalReadQueryTime; + /** When sampler activated, will track partitions read with the most rows **/ + public final Sampler<ByteBuffer> topReadPartitionRowCount; + /** When sampler activated, will track partitions read with the most tombstones **/ + public final Sampler<ByteBuffer> topReadPartitionTombstoneCount; + /** When sample activated, will track partitions read with the most merged sstables **/ + public final Sampler<ByteBuffer> topReadPartitionSSTableCount; public final TableMeter clientTombstoneWarnings; public final TableMeter clientTombstoneAborts; @@ -442,11 +448,39 @@ public class TableMetrics } }; + topReadPartitionRowCount = new MaxSampler<ByteBuffer>() + { + public String toString(ByteBuffer value) + { + return cfs.metadata().partitionKeyType.getString(value); + } + }; + + topReadPartitionTombstoneCount = new MaxSampler<ByteBuffer>() + { + public String toString(ByteBuffer value) + { + return cfs.metadata().partitionKeyType.getString(value); + } + }; + + topReadPartitionSSTableCount = new MaxSampler<ByteBuffer>() + { + @Override + public String toString(ByteBuffer value) + { + return cfs.metadata().partitionKeyType.getString(value); + } + }; + samplers.put(SamplerType.READS, topReadPartitionFrequency); samplers.put(SamplerType.WRITES, topWritePartitionFrequency); samplers.put(SamplerType.WRITE_SIZE, topWritePartitionSize); samplers.put(SamplerType.CAS_CONTENTIONS, topCasPartitionContention); samplers.put(SamplerType.LOCAL_READ_TIME, topLocalReadQueryTime); + samplers.put(SamplerType.READ_ROW_COUNT, topReadPartitionRowCount); + samplers.put(SamplerType.READ_TOMBSTONE_COUNT, topReadPartitionTombstoneCount); + samplers.put(SamplerType.READ_SSTABLE_COUNT, topReadPartitionSSTableCount); memtableColumnsCount = createTableGauge("MemtableColumnsCount", () -> cfs.getTracker().getView().getCurrentMemtable().operationCount()); diff --git a/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java b/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java index 934d4d814c..d9cfdeecef 100644 --- a/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java +++ b/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java @@ -41,6 +41,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.metrics.Sampler; +import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.StorageService; import static java.lang.String.format; @@ -54,10 +55,15 @@ import static org.junit.Assert.assertTrue; */ public class TopPartitionsTest { + public static String KEYSPACE = TopPartitionsTest.class.getSimpleName().toLowerCase(); + public static String TABLE = "test"; + @BeforeClass public static void loadSchema() throws ConfigurationException { SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1)); + executeInternal(format("CREATE TABLE %s.%s (k text, c text, v text, PRIMARY KEY (k, c))", KEYSPACE, TABLE)); } @Test @@ -93,6 +99,108 @@ public class TopPartitionsTest assertEquals("If this failed you probably have to raise the beginLocalSampling duration", 1, result.size()); } + @Test + public void testTopPartitionsRowTombstoneAndSSTableCount() throws Exception + { + int count = 10; + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(KEYSPACE, TABLE); + cfs.disableAutoCompaction(); + + executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('a', 'a', 'a')", KEYSPACE, TABLE)); + executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('a', 'b', 'a')", KEYSPACE, TABLE)); + cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + + executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('a', 'c', 'a')", KEYSPACE, TABLE)); + executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('b', 'b', 'b')", KEYSPACE, TABLE)); + executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('c', 'c', 'c')", KEYSPACE, TABLE)); + executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('c', 'd', 'a')", KEYSPACE, TABLE)); + executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('c', 'e', 'a')", KEYSPACE, TABLE)); + executeInternal(format("DELETE FROM %s.%s WHERE k='a' AND c='a'", KEYSPACE, TABLE)); + cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + + // test multi-partition read + cfs.beginLocalSampling("READ_ROW_COUNT", count, 240000); + cfs.beginLocalSampling("READ_TOMBSTONE_COUNT", count, 240000); + cfs.beginLocalSampling("READ_SSTABLE_COUNT", count, 240000); + + executeInternal(format("SELECT * FROM %s.%s", KEYSPACE, TABLE)); + Thread.sleep(2000); // simulate waiting before finishing sampling + + List<CompositeData> rowCounts = cfs.finishLocalSampling("READ_ROW_COUNT", count); + List<CompositeData> tsCounts = cfs.finishLocalSampling("READ_TOMBSTONE_COUNT", count); + List<CompositeData> sstCounts = cfs.finishLocalSampling("READ_SSTABLE_COUNT", count); + + assertEquals(0, sstCounts.size()); // not tracked on range reads + assertEquals(3, rowCounts.size()); // 3 partitions read (a, b, c) + assertEquals(1, tsCounts.size()); // 1 partition w tombstones (a) + + for (CompositeData data : rowCounts) + { + String partitionKey = (String) data.get("value"); + long numRows = (long) data.get("count"); + if (partitionKey.equalsIgnoreCase("a")) + { + assertEquals(2, numRows); + } + else if (partitionKey.equalsIgnoreCase("b")) + assertEquals(1, numRows); + else if (partitionKey.equalsIgnoreCase("c")) + assertEquals(3, numRows); + } + + assertEquals("a", tsCounts.get(0).get("value")); + assertEquals(1, (long) tsCounts.get(0).get("count")); + + // test single partition read + cfs.beginLocalSampling("READ_ROW_COUNT", count, 240000); + cfs.beginLocalSampling("READ_TOMBSTONE_COUNT", count, 240000); + cfs.beginLocalSampling("READ_SSTABLE_COUNT", count, 240000); + + executeInternal(format("SELECT * FROM %s.%s WHERE k='a'", KEYSPACE, TABLE)); + executeInternal(format("SELECT * FROM %s.%s WHERE k='b'", KEYSPACE, TABLE)); + executeInternal(format("SELECT * FROM %s.%s WHERE k='c'", KEYSPACE, TABLE)); + Thread.sleep(2000); // simulate waiting before finishing sampling + + rowCounts = cfs.finishLocalSampling("READ_ROW_COUNT", count); + tsCounts = cfs.finishLocalSampling("READ_TOMBSTONE_COUNT", count); + sstCounts = cfs.finishLocalSampling("READ_SSTABLE_COUNT", count); + + assertEquals(3, sstCounts.size()); // 3 partitions read + assertEquals(3, rowCounts.size()); // 3 partitions read + assertEquals(1, tsCounts.size()); // 3 partitions read only one containing tombstones + + for (CompositeData data : sstCounts) + { + String partitionKey = (String) data.get("value"); + long numRows = (long) data.get("count"); + if (partitionKey.equalsIgnoreCase("a")) + { + assertEquals(2, numRows); + } + else if (partitionKey.equalsIgnoreCase("b")) + assertEquals(1, numRows); + else if (partitionKey.equalsIgnoreCase("c")) + assertEquals(1, numRows); + } + + for (CompositeData data : rowCounts) + { + String partitionKey = (String) data.get("value"); + long numRows = (long) data.get("count"); + if (partitionKey.equalsIgnoreCase("a")) + { + assertEquals(2, numRows); + } + else if (partitionKey.equalsIgnoreCase("b")) + assertEquals(1, numRows); + else if (partitionKey.equalsIgnoreCase("c")) + assertEquals(3, numRows); + } + + assertEquals("a", tsCounts.get(0).get("value")); + assertEquals(1, (long) tsCounts.get(0).get("count")); + } + @Test public void testStartAndStopScheduledSampling() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org