This is an automated email from the ASF dual-hosted git repository.

jwest pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2d323cb565 Add row,tombstone,and sstable count to profileload
2d323cb565 is described below

commit 2d323cb56572e867b13b6d102a61aaff8bd66c86
Author: Jordan West <jord...@netflix.com>
AuthorDate: Wed Dec 21 12:10:42 2022 -0800

    Add row,tombstone,and sstable count to profileload
    
    Patch by Jordan West; Reviewed by David Capwell for CASSANDRA-18022
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/db/ReadCommand.java  |  18 ++++
 .../cassandra/db/SinglePartitionReadCommand.java   |   2 +
 src/java/org/apache/cassandra/metrics/Sampler.java |  17 ++++
 .../org/apache/cassandra/metrics/TableMetrics.java |  34 +++++++
 .../apache/cassandra/tools/TopPartitionsTest.java  | 108 +++++++++++++++++++++
 6 files changed, 180 insertions(+)

diff --git a/CHANGES.txt b/CHANGES.txt
index bf760aa8dc..84136b255d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Add row, tombstone, and sstable count to nodetool profileload 
(CASSANDRA-18022)
  * Coordinator level metrics for read response and mutation row and column 
counts (CASSANDRA-18155)
  * Add CQL functions for dynamic data masking (CASSANDRA-17941)
  * Print friendly error when nodetool attempts to connect to uninitialized 
server (CASSANDRA-11537)
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index d03650ff36..95fa95da91 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -501,7 +501,9 @@ public abstract class ReadCommand extends AbstractReadQuery
             private final boolean enforceStrictLiveness = 
metadata().enforceStrictLiveness();
 
             private int liveRows = 0;
+            private int lastReportedLiveRows = 0;
             private int tombstones = 0;
+            private int lastReportedTombstones = 0;
 
             private DecoratedKey currentKey;
 
@@ -568,6 +570,22 @@ public abstract class ReadCommand extends AbstractReadQuery
                 }
             }
 
+            @Override
+            protected void onPartitionClose()
+            {
+                int lr = liveRows - lastReportedLiveRows;
+                int ts = tombstones - lastReportedTombstones;
+
+                if (lr > 0)
+                    
metric.topReadPartitionRowCount.addSample(currentKey.getKey(), lr);
+
+                if (ts > 0)
+                    
metric.topReadPartitionTombstoneCount.addSample(currentKey.getKey(), ts);
+
+                lastReportedLiveRows = liveRows;
+                lastReportedTombstones = tombstones;
+            }
+
             @Override
             public void onClose()
             {
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 963b9fee1c..34414bbea6 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -820,6 +820,7 @@ public class SinglePartitionReadCommand extends ReadCommand 
implements SinglePar
         {
             DecoratedKey key = merged.partitionKey();
             metrics.topReadPartitionFrequency.addSample(key.getKey(), 1);
+            metrics.topReadPartitionSSTableCount.addSample(key.getKey(), 
metricsCollector.getMergedSSTables());
         }
 
         class UpdateSstablesIterated extends 
Transformation<UnfilteredRowIterator>
@@ -963,6 +964,7 @@ public class SinglePartitionReadCommand extends ReadCommand 
implements SinglePar
 
         DecoratedKey key = result.partitionKey();
         cfs.metric.topReadPartitionFrequency.addSample(key.getKey(), 1);
+        cfs.metric.topReadPartitionSSTableCount.addSample(key.getKey(), 
metricsCollector.getMergedSSTables());
         StorageHook.instance.reportRead(cfs.metadata.id, partitionKey());
 
         return result.unfilteredIterator(columnFilter(), Slices.ALL, 
clusteringIndexFilter().isReversed());
diff --git a/src/java/org/apache/cassandra/metrics/Sampler.java 
b/src/java/org/apache/cassandra/metrics/Sampler.java
index de7f0b2e1a..6eb8508b9f 100644
--- a/src/java/org/apache/cassandra/metrics/Sampler.java
+++ b/src/java/org/apache/cassandra/metrics/Sampler.java
@@ -54,6 +54,23 @@ public abstract class Sampler<T>
                                                      
resultBuilder.forType(samplerType, samplerType.description)
                                                                   
.addColumn("Query", "value")
                                                                   
.addColumn("Microseconds", "count"))),
+        READ_ROW_COUNT("Partitions read with the most rows", ((samplerType, 
resultBuilder) ->
+                                                      
resultBuilder.forType(samplerType, samplerType.description)
+                                                      .addColumn("Table", 
"table")
+                                                      .addColumn("Partition", 
"value")
+                                                      .addColumn("Rows", 
"count"))),
+
+        READ_TOMBSTONE_COUNT("Partitions read with the most tombstones", 
((samplerType, resultBuilder) ->
+                                                      
resultBuilder.forType(samplerType, samplerType.description)
+                                                                   
.addColumn("Table", "table")
+                                                                   
.addColumn("Partition", "value")
+                                                                   
.addColumn("Tombstones", "count"))),
+
+        READ_SSTABLE_COUNT("Partitions read with the most sstables", 
((samplerType, resultBuilder) ->
+                                                                      
resultBuilder.forType(samplerType, samplerType.description)
+                                                                               
    .addColumn("Table", "table")
+                                                                               
    .addColumn("Partition", "value")
+                                                                               
    .addColumn("SSTables", "count"))),
         WRITE_SIZE("Max mutation size by partition", ((samplerType, 
resultBuilder) ->
                                                       
resultBuilder.forType(samplerType, samplerType.description)
                                                                    
.addColumn("Table", "table")
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java 
b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 8f3645dd1e..04102f7995 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -262,6 +262,12 @@ public class TableMetrics
     public final Sampler<ByteBuffer> topCasPartitionContention;
     /** When sampler activated, will track the slowest local reads **/
     public final Sampler<String> topLocalReadQueryTime;
+    /** When sampler activated, will track partitions read with the most rows 
**/
+    public final Sampler<ByteBuffer> topReadPartitionRowCount;
+    /** When sampler activated, will track partitions read with the most 
tombstones **/
+    public final Sampler<ByteBuffer> topReadPartitionTombstoneCount;
+    /** When sample activated, will track partitions read with the most merged 
sstables **/
+    public final Sampler<ByteBuffer> topReadPartitionSSTableCount;
 
     public final TableMeter clientTombstoneWarnings;
     public final TableMeter clientTombstoneAborts;
@@ -442,11 +448,39 @@ public class TableMetrics
             }
         };
 
+        topReadPartitionRowCount = new MaxSampler<ByteBuffer>()
+        {
+            public String toString(ByteBuffer value)
+            {
+                return cfs.metadata().partitionKeyType.getString(value);
+            }
+        };
+
+        topReadPartitionTombstoneCount = new MaxSampler<ByteBuffer>()
+        {
+            public String toString(ByteBuffer value)
+            {
+                return cfs.metadata().partitionKeyType.getString(value);
+            }
+        };
+
+        topReadPartitionSSTableCount = new MaxSampler<ByteBuffer>()
+        {
+            @Override
+            public String toString(ByteBuffer value)
+            {
+                return cfs.metadata().partitionKeyType.getString(value);
+            }
+        };
+
         samplers.put(SamplerType.READS, topReadPartitionFrequency);
         samplers.put(SamplerType.WRITES, topWritePartitionFrequency);
         samplers.put(SamplerType.WRITE_SIZE, topWritePartitionSize);
         samplers.put(SamplerType.CAS_CONTENTIONS, topCasPartitionContention);
         samplers.put(SamplerType.LOCAL_READ_TIME, topLocalReadQueryTime);
+        samplers.put(SamplerType.READ_ROW_COUNT, topReadPartitionRowCount);
+        samplers.put(SamplerType.READ_TOMBSTONE_COUNT, 
topReadPartitionTombstoneCount);
+        samplers.put(SamplerType.READ_SSTABLE_COUNT, 
topReadPartitionSSTableCount);
 
         memtableColumnsCount = createTableGauge("MemtableColumnsCount", 
                                                 () -> 
cfs.getTracker().getView().getCurrentMemtable().operationCount());
diff --git a/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java 
b/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java
index 934d4d814c..d9cfdeecef 100644
--- a/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java
+++ b/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.metrics.Sampler;
+import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.StorageService;
 
 import static java.lang.String.format;
@@ -54,10 +55,15 @@ import static org.junit.Assert.assertTrue;
  */
 public class TopPartitionsTest
 {
+    public static String KEYSPACE = 
TopPartitionsTest.class.getSimpleName().toLowerCase();
+    public static String TABLE = "test";
+
     @BeforeClass
     public static void loadSchema() throws ConfigurationException
     {
         SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1));
+        executeInternal(format("CREATE TABLE %s.%s (k text, c text, v text, 
PRIMARY KEY (k, c))", KEYSPACE, TABLE));
     }
 
     @Test
@@ -93,6 +99,108 @@ public class TopPartitionsTest
         assertEquals("If this failed you probably have to raise the 
beginLocalSampling duration", 1, result.size());
     }
 
+    @Test
+    public void testTopPartitionsRowTombstoneAndSSTableCount() throws Exception
+    {
+        int count = 10;
+        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(KEYSPACE, TABLE);
+        cfs.disableAutoCompaction();
+
+        executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('a', 'a', 
'a')", KEYSPACE, TABLE));
+        executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('a', 'b', 
'a')", KEYSPACE, TABLE));
+        cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
+
+        executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('a', 'c', 
'a')", KEYSPACE, TABLE));
+        executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('b', 'b', 
'b')", KEYSPACE, TABLE));
+        executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('c', 'c', 
'c')", KEYSPACE, TABLE));
+        executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('c', 'd', 
'a')", KEYSPACE, TABLE));
+        executeInternal(format("INSERT INTO %s.%s(k,c,v) VALUES ('c', 'e', 
'a')", KEYSPACE, TABLE));
+        executeInternal(format("DELETE FROM %s.%s WHERE k='a' AND c='a'", 
KEYSPACE, TABLE));
+        cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
+
+        // test multi-partition read
+        cfs.beginLocalSampling("READ_ROW_COUNT", count, 240000);
+        cfs.beginLocalSampling("READ_TOMBSTONE_COUNT", count, 240000);
+        cfs.beginLocalSampling("READ_SSTABLE_COUNT", count, 240000);
+
+        executeInternal(format("SELECT * FROM %s.%s", KEYSPACE, TABLE));
+        Thread.sleep(2000); // simulate waiting before finishing sampling
+
+        List<CompositeData> rowCounts = 
cfs.finishLocalSampling("READ_ROW_COUNT", count);
+        List<CompositeData> tsCounts = 
cfs.finishLocalSampling("READ_TOMBSTONE_COUNT", count);
+        List<CompositeData> sstCounts = 
cfs.finishLocalSampling("READ_SSTABLE_COUNT", count);
+
+        assertEquals(0, sstCounts.size()); // not tracked on range reads
+        assertEquals(3, rowCounts.size()); // 3 partitions read (a, b, c)
+        assertEquals(1, tsCounts.size()); // 1 partition w tombstones (a)
+
+        for (CompositeData data : rowCounts)
+        {
+            String partitionKey = (String) data.get("value");
+            long numRows = (long) data.get("count");
+            if (partitionKey.equalsIgnoreCase("a"))
+            {
+                assertEquals(2, numRows);
+            }
+            else if (partitionKey.equalsIgnoreCase("b"))
+                assertEquals(1, numRows);
+            else if (partitionKey.equalsIgnoreCase("c"))
+                assertEquals(3, numRows);
+        }
+
+        assertEquals("a", tsCounts.get(0).get("value"));
+        assertEquals(1, (long) tsCounts.get(0).get("count"));
+
+        // test single partition read
+        cfs.beginLocalSampling("READ_ROW_COUNT", count, 240000);
+        cfs.beginLocalSampling("READ_TOMBSTONE_COUNT", count, 240000);
+        cfs.beginLocalSampling("READ_SSTABLE_COUNT", count, 240000);
+
+        executeInternal(format("SELECT * FROM %s.%s WHERE k='a'", KEYSPACE, 
TABLE));
+        executeInternal(format("SELECT * FROM %s.%s WHERE k='b'", KEYSPACE, 
TABLE));
+        executeInternal(format("SELECT * FROM %s.%s WHERE k='c'", KEYSPACE, 
TABLE));
+        Thread.sleep(2000); // simulate waiting before finishing sampling
+
+        rowCounts = cfs.finishLocalSampling("READ_ROW_COUNT", count);
+        tsCounts = cfs.finishLocalSampling("READ_TOMBSTONE_COUNT", count);
+        sstCounts = cfs.finishLocalSampling("READ_SSTABLE_COUNT", count);
+
+        assertEquals(3, sstCounts.size()); // 3 partitions read
+        assertEquals(3, rowCounts.size()); // 3 partitions read
+        assertEquals(1, tsCounts.size());  // 3 partitions read only one 
containing tombstones
+
+        for (CompositeData data : sstCounts)
+        {
+            String partitionKey = (String) data.get("value");
+            long numRows = (long) data.get("count");
+            if (partitionKey.equalsIgnoreCase("a"))
+            {
+                assertEquals(2, numRows);
+            }
+            else if (partitionKey.equalsIgnoreCase("b"))
+                assertEquals(1, numRows);
+            else if (partitionKey.equalsIgnoreCase("c"))
+                assertEquals(1, numRows);
+        }
+
+        for (CompositeData data : rowCounts)
+        {
+            String partitionKey = (String) data.get("value");
+            long numRows = (long) data.get("count");
+            if (partitionKey.equalsIgnoreCase("a"))
+            {
+                assertEquals(2, numRows);
+            }
+            else if (partitionKey.equalsIgnoreCase("b"))
+                assertEquals(1, numRows);
+            else if (partitionKey.equalsIgnoreCase("c"))
+                assertEquals(3, numRows);
+        }
+
+        assertEquals("a", tsCounts.get(0).get("value"));
+        assertEquals(1, (long) tsCounts.get(0).get("count"));
+    }
+
     @Test
     public void testStartAndStopScheduledSampling()
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to