This is an automated email from the ASF dual-hosted git repository.

jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bd2ecb3454 Add metrics around storage usage and compression
bd2ecb3454 is described below

commit bd2ecb3454649d8c84cf0b1ce8c1e94ed1e06e74
Author: Josh McKenzie <[email protected]>
AuthorDate: Thu Sep 15 14:22:34 2022 -0400

    Add metrics around storage usage and compression
    
    Patch by Caleb Rackliffe; reviewed by Abe Ratnofsky and Josh McKenzie for 
CASSANDRA-17898
    
    Co-authored-by: Caleb Rackliffe <[email protected]>
    Co-authored-by: Josh McKenzie <[email protected]>
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/lifecycle/Tracker.java |  10 ++
 .../io/sstable/IndexSummaryRedistribution.java     |  18 ++-
 .../org/apache/cassandra/io/sstable/SSTable.java   |  10 --
 .../cassandra/io/sstable/format/SSTableReader.java |  29 ++++
 .../apache/cassandra/metrics/KeyspaceMetrics.java  |  24 ++-
 .../apache/cassandra/metrics/StorageMetrics.java   |  20 +++
 .../org/apache/cassandra/metrics/TableMetrics.java |   3 +
 .../apache/cassandra/service/StorageService.java   |   8 +-
 .../cassandra/service/StorageServiceMBean.java     |   3 +
 src/java/org/apache/cassandra/tools/NodeProbe.java |   5 +
 .../org/apache/cassandra/tools/nodetool/Info.java  |   2 +
 .../distributed/test/ClusterStorageUsageTest.java  | 165 +++++++++++++++++++++
 .../cassandra/distributed/test/NodeToolTest.java   |  17 +++
 .../apache/cassandra/io/DiskSpaceMetricsTest.java  |  33 ++++-
 .../io/sstable/IndexSummaryRedistributionTest.java |  26 +++-
 .../cassandra/io/sstable/SSTableRewriterTest.java  |  14 +-
 17 files changed, 356 insertions(+), 32 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index c59faa6fe1..0439b08b82 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Add metrics around storage usage and compression (CASSANDRA-17898)
  * Remove usage of deprecated javax certificate classes (CASSANDRA-17867)
  * Make sure preview repairs don't optimise streams unless configured to 
(CASSANDRA-17865)
  * Optionally avoid hint transfer during decommission (CASSANDRA-17808)
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java 
b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index ab8a74bd1a..66ecf1c8d8 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -152,6 +152,8 @@ public class Tracker
             return accumulate;
 
         long add = 0;
+        long addUncompressed = 0;
+
         for (SSTableReader sstable : newSSTables)
         {
             if (logger.isTraceEnabled())
@@ -159,13 +161,17 @@ public class Tracker
             try
             {
                 add += sstable.bytesOnDisk();
+                addUncompressed += sstable.logicalBytesOnDisk();
             }
             catch (Throwable t)
             {
                 accumulate = merge(accumulate, t);
             }
         }
+
         long subtract = 0;
+        long subtractUncompressed = 0;
+
         for (SSTableReader sstable : oldSSTables)
         {
             if (logger.isTraceEnabled())
@@ -173,6 +179,7 @@ public class Tracker
             try
             {
                 subtract += sstable.bytesOnDisk();
+                subtractUncompressed += sstable.logicalBytesOnDisk();
             }
             catch (Throwable t)
             {
@@ -181,7 +188,10 @@ public class Tracker
         }
 
         StorageMetrics.load.inc(add - subtract);
+        StorageMetrics.uncompressedLoad.inc(addUncompressed - 
subtractUncompressed);
+
         cfstore.metric.liveDiskSpaceUsed.inc(add - subtract);
+        cfstore.metric.uncompressedLiveDiskSpaceUsed.inc(addUncompressed - 
subtractUncompressed);
 
         // we don't subtract from total until the sstable is deleted, see 
TransactionLogs.SSTableTidier
         cfstore.metric.totalDiskSpaceUsed.inc(add);
diff --git 
a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java 
b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index 8bbe709e13..74ae43daaf 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@ -262,11 +262,15 @@ public class IndexSummaryRedistribution extends 
CompactionInfo.Holder
                          entry.newSamplingLevel, 
Downsampling.BASE_SAMPLING_LEVEL);
             ColumnFamilyStore cfs = 
Keyspace.open(sstable.metadata().keyspace).getColumnFamilyStore(sstable.metadata().id);
             long oldSize = sstable.bytesOnDisk();
+            long oldSizeUncompressed = sstable.logicalBytesOnDisk();
+
             SSTableReader replacement = 
sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
             long newSize = replacement.bytesOnDisk();
+            long newSizeUncompressed = replacement.logicalBytesOnDisk();
+
             newSSTables.add(replacement);
             transactions.get(sstable.metadata().id).update(replacement, true);
-            addHooks(cfs, transactions, oldSize, newSize);
+            addHooks(cfs, transactions, oldSize, newSize, oldSizeUncompressed, 
newSizeUncompressed);
         }
 
         return newSSTables;
@@ -276,20 +280,28 @@ public class IndexSummaryRedistribution extends 
CompactionInfo.Holder
      * Add hooks to correctly update the storage load metrics once the 
transaction is closed/aborted
      */
     @SuppressWarnings("resource") // Transactions are closed in finally 
outside of this method
-    private void addHooks(ColumnFamilyStore cfs, Map<TableId, 
LifecycleTransaction> transactions, long oldSize, long newSize)
+    private void addHooks(ColumnFamilyStore cfs, Map<TableId, 
LifecycleTransaction> transactions, long oldSize, long newSize, long 
oldSizeUncompressed, long newSizeUncompressed)
     {
         LifecycleTransaction txn = transactions.get(cfs.metadata.id);
         txn.runOnCommit(() -> {
             // The new size will be added in Transactional.commit() as an 
updated SSTable, more details: CASSANDRA-13738
             StorageMetrics.load.dec(oldSize);
+            StorageMetrics.uncompressedLoad.dec(oldSizeUncompressed);
+
             cfs.metric.liveDiskSpaceUsed.dec(oldSize);
+            cfs.metric.uncompressedLiveDiskSpaceUsed.dec(oldSizeUncompressed);
             cfs.metric.totalDiskSpaceUsed.dec(oldSize);
         });
         txn.runOnAbort(() -> {
-            // the local disk was modified but book keeping couldn't be 
commited, apply the delta
+            // the local disk was modified but bookkeeping couldn't be 
commited, apply the delta
             long delta = oldSize - newSize; // if new is larger this will be 
negative, so dec will become a inc
+            long deltaUncompressed = oldSizeUncompressed - newSizeUncompressed;
+
             StorageMetrics.load.dec(delta);
+            StorageMetrics.uncompressedLoad.dec(deltaUncompressed);
+
             cfs.metric.liveDiskSpaceUsed.dec(delta);
+            cfs.metric.uncompressedLiveDiskSpaceUsed.dec(deltaUncompressed);
             cfs.metric.totalDiskSpaceUsed.dec(delta);
         });
     }
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java 
b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 5194abb101..3c4f5cd0a9 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -320,16 +320,6 @@ public abstract class SSTable
         return estimatedRows;
     }
 
-    public long bytesOnDisk()
-    {
-        long bytes = 0;
-        for (Component component : components)
-        {
-            bytes += new File(descriptor.filenameFor(component)).length();
-        }
-        return bytes;
-    }
-
     @Override
     public String toString()
     {
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index f26cf65c93..d7dad42b16 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -2360,4 +2360,33 @@ public abstract class SSTableReader extends SSTable 
implements UnfilteredSource,
         ExecutorUtils.shutdownNowAndWait(timeout, unit, syncExecutor);
         resetTidying();
     }
+
+    /**
+     * @return the physical size on disk of all components for this SSTable in 
bytes
+     */
+    public long bytesOnDisk()
+    {
+        return bytesOnDisk(false);
+    }
+
+    /**
+     * @return the total logical/uncompressed size in bytes of all components 
for this SSTable
+     */
+    public long logicalBytesOnDisk()
+    {
+        return bytesOnDisk(true);
+    }
+
+    private long bytesOnDisk(boolean logical)
+    {
+        long bytes = 0;
+        for (Component component : components)
+        {
+            // Only the data file is compressable.
+            bytes += logical && component == Component.DATA && compression
+                     ? getCompressionMetadata().dataLength
+                     : new File(descriptor.filenameFor(component)).length();
+        }
+        return bytes;
+    }
 }
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java 
b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 776027e395..bba4cd3213 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -59,9 +59,14 @@ public class KeyspaceMetrics
     public final Gauge<Long> pendingFlushes;
     /** Estimate of number of pending compactios for this CF */
     public final Gauge<Long> pendingCompactions;
-    /** Disk space used by SSTables belonging to this CF */
+    /** Disk space used by SSTables belonging to tables in this keyspace */
     public final Gauge<Long> liveDiskSpaceUsed;
-    /** Total disk space used by SSTables belonging to this CF, including 
obsolete ones waiting to be GC'd */
+    /** Disk space used by SSTables belonging to tables in this keyspace, 
scaled down by replication factor */
+    public final Gauge<Long> unreplicatedLiveDiskSpaceUsed;
+    /** Uncompressed/logical size of SSTables belonging to tables in this 
keyspace */
+    public final Gauge<Long> uncompressedLiveDiskSpaceUsed;
+    /** Uncompressed/logical size of SSTables belonging to tables in this 
keyspace, scaled down by replication factor */
+    public final Gauge<Long> unreplicatedUncompressedLiveDiskSpaceUsed;
     public final Gauge<Long> totalDiskSpaceUsed;
     /** Disk space used by bloom filter */
     public final Gauge<Long> bloomFilterDiskSpaceUsed;
@@ -169,7 +174,7 @@ public class KeyspaceMetrics
     public final Histogram rowIndexSize;
 
     public final MetricNameFactory factory;
-    private Keyspace keyspace;
+    private final Keyspace keyspace;
 
     /** set containing names of all the metrics stored here, for releasing 
later */
     private Set<ReleasableMetric> allMetrics = Sets.newHashSet();
@@ -201,8 +206,15 @@ public class KeyspaceMetrics
                 metric -> metric.memtableSwitchCount.getCount());
         pendingCompactions = createKeyspaceGauge("PendingCompactions", metric 
-> metric.pendingCompactions.getValue());
         pendingFlushes = createKeyspaceGauge("PendingFlushes", metric -> 
metric.pendingFlushes.getCount());
+
         liveDiskSpaceUsed = createKeyspaceGauge("LiveDiskSpaceUsed", metric -> 
metric.liveDiskSpaceUsed.getCount());
+        uncompressedLiveDiskSpaceUsed = 
createKeyspaceGauge("UncompressedLiveDiskSpaceUsed", metric -> 
metric.uncompressedLiveDiskSpaceUsed.getCount());
+        unreplicatedLiveDiskSpaceUsed = 
createKeyspaceGauge("UnreplicatedLiveDiskSpaceUsed",
+                                                            metric -> 
metric.liveDiskSpaceUsed.getCount() / 
keyspace.getReplicationStrategy().getReplicationFactor().fullReplicas);
+        unreplicatedUncompressedLiveDiskSpaceUsed = 
createKeyspaceGauge("UnreplicatedUncompressedLiveDiskSpaceUsed",
+                                                                        metric 
-> metric.uncompressedLiveDiskSpaceUsed.getCount() / 
keyspace.getReplicationStrategy().getReplicationFactor().fullReplicas);
         totalDiskSpaceUsed = createKeyspaceGauge("TotalDiskSpaceUsed", metric 
-> metric.totalDiskSpaceUsed.getCount());
+
         bloomFilterDiskSpaceUsed = 
createKeyspaceGauge("BloomFilterDiskSpaceUsed",
                 metric -> metric.bloomFilterDiskSpaceUsed.getValue());
         bloomFilterOffHeapMemoryUsed = 
createKeyspaceGauge("BloomFilterOffHeapMemoryUsed",
@@ -280,8 +292,10 @@ public class KeyspaceMetrics
 
     /**
      * Creates a gauge that will sum the current value of a metric for all 
column families in this keyspace
-     * @param name
-     * @param extractor
+     *
+     * @param name the name of the metric being created
+     * @param extractor a function that produces a specified metric value for 
a given table
+     *
      * @return Gauge&gt;Long> that computes sum of MetricValue.getValue()
      */
     private Gauge<Long> createKeyspaceGauge(String name, final 
ToLongFunction<TableMetrics> extractor)
diff --git a/src/java/org/apache/cassandra/metrics/StorageMetrics.java 
b/src/java/org/apache/cassandra/metrics/StorageMetrics.java
index 9399ba6789..d86a2144f2 100644
--- a/src/java/org/apache/cassandra/metrics/StorageMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/StorageMetrics.java
@@ -17,7 +17,12 @@
  */
 package org.apache.cassandra.metrics;
 
+import java.util.function.ToLongFunction;
+import java.util.stream.StreamSupport;
+
 import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import org.apache.cassandra.db.Keyspace;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
@@ -29,8 +34,23 @@ public class StorageMetrics
     private static final MetricNameFactory factory = new 
DefaultNameFactory("Storage");
 
     public static final Counter load = 
Metrics.counter(factory.createMetricName("Load"));
+    public static final Counter uncompressedLoad = 
Metrics.counter(factory.createMetricName("UncompressedLoad"));
+
+    public static final Gauge<Long> unreplicatedLoad =
+        createSummingGauge("UnreplicatedLoad", metric -> 
metric.unreplicatedLiveDiskSpaceUsed.getValue());
+    public static final Gauge<Long> unreplicatedUncompressedLoad =
+        createSummingGauge("UnreplicatedUncompressedLoad", metric -> 
metric.unreplicatedUncompressedLiveDiskSpaceUsed.getValue());
+
     public static final Counter uncaughtExceptions = 
Metrics.counter(factory.createMetricName("Exceptions"));
     public static final Counter totalHintsInProgress  = 
Metrics.counter(factory.createMetricName("TotalHintsInProgress"));
     public static final Counter totalHints = 
Metrics.counter(factory.createMetricName("TotalHints"));
     public static final Counter repairExceptions = 
Metrics.counter(factory.createMetricName("RepairExceptions"));
+
+    private static Gauge<Long> createSummingGauge(String name, 
ToLongFunction<KeyspaceMetrics> extractor)
+    {
+        return Metrics.register(factory.createMetricName(name),
+                                () -> 
StreamSupport.stream(Keyspace.all().spliterator(), false)
+                                                   .mapToLong(keyspace -> 
extractor.applyAsLong(keyspace.metric))
+                                                   .sum());
+    }
 }
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java 
b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 5e7ab78b29..8f3645dd1e 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -122,6 +122,8 @@ public class TableMetrics
     public final Gauge<Integer> oldVersionSSTableCount;
     /** Disk space used by SSTables belonging to this table */
     public final Counter liveDiskSpaceUsed;
+    /** Uncompressed/logical disk space used by SSTables belonging to this 
table */
+    public final Counter uncompressedLiveDiskSpaceUsed;
     /** Total disk space used by SSTables belonging to this table, including 
obsolete ones waiting to be GC'd */
     public final Counter totalDiskSpaceUsed;
     /** Size of the smallest compacted partition */
@@ -605,6 +607,7 @@ public class TableMetrics
             }
         });
         liveDiskSpaceUsed = createTableCounter("LiveDiskSpaceUsed");
+        uncompressedLiveDiskSpaceUsed = 
createTableCounter("UncompressedLiveDiskSpaceUsed");
         totalDiskSpaceUsed = createTableCounter("TotalDiskSpaceUsed");
         minPartitionSize = createTableGauge("MinPartitionSize", "MinRowSize", 
new Gauge<Long>()
         {
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 47fad4a7e0..70f0f17203 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3553,12 +3553,18 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             updateNetVersion(endpoint, netVersion);
     }
 
-
+    @Override
     public String getLoadString()
     {
         return FileUtils.stringifyFileSize(StorageMetrics.load.getCount());
     }
 
+    @Override
+    public String getUncompressedLoadString()
+    {
+        return 
FileUtils.stringifyFileSize(StorageMetrics.uncompressedLoad.getCount());
+    }
+
     public Map<String, String> getLoadMapWithPort()
     {
         return getLoadMap(true);
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 7e512cdaf7..d530dd6b3a 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -209,6 +209,9 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     /** Human-readable load value */
     public String getLoadString();
 
+    /** Human-readable uncompressed load value */
+    public String getUncompressedLoadString();
+
     /** Human-readable load value.  Keys are IP addresses. */
     @Deprecated public Map<String, String> getLoadMap();
     public Map<String, String> getLoadMapWithPort();
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index daf4eb2518..60edad3d1f 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -770,6 +770,11 @@ public class NodeProbe implements AutoCloseable
         return ssProxy.getLoadString();
     }
 
+    public String getUncompressedLoadString()
+    {
+        return ssProxy.getUncompressedLoadString();
+    }
+
     public String getReleaseVersion()
     {
         return ssProxy.getReleaseVersion();
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Info.java 
b/src/java/org/apache/cassandra/tools/nodetool/Info.java
index 69661571a9..90c9e9d8bd 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Info.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Info.java
@@ -51,6 +51,8 @@ public class Info extends NodeToolCmd
         out.printf("%-23s: %s%n", "Gossip active", gossipInitialized);
         out.printf("%-23s: %s%n", "Native Transport active", 
probe.isNativeTransportRunning());
         out.printf("%-23s: %s%n", "Load", probe.getLoadString());
+        out.printf("%-23s: %s%n", "Uncompressed load", 
probe.getUncompressedLoadString());
+
         if (gossipInitialized)
             out.printf("%-23s: %s%n", "Generation No", 
probe.getCurrentGenerationNumber());
         else
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ClusterStorageUsageTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ClusterStorageUsageTest.java
new file mode 100644
index 0000000000..9c379d1d50
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ClusterStorageUsageTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import 
org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableFunction;
+import org.apache.cassandra.metrics.DefaultNameFactory;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * This class verifies the behavior of our global disk usage metrics across 
different cluster and replication
+ * configurations. In addition, it verifies that they are properly exposed via 
the public metric registry.
+ *
+ * Disk usage metrics are characterized by how they handle compression and 
replication:
+ *
+ * "compressed" -> indicates raw disk usage
+ * "uncompressed" -> indicates uncompressed file size (which is equivalent to 
"compressed" with no compression enabled)
+ * "replicated" -> includes disk usage for data outside the node's primary 
range
+ * "unreplicated" -> indicates disk usage scaled down by replication factor 
across the entire cluster
+ */
+public class ClusterStorageUsageTest extends TestBaseImpl
+{
+    private static final DefaultNameFactory FACTORY = new 
DefaultNameFactory("Storage");
+    private static final int MUTATIONS = 1000;
+
+    @Test
+    public void testNoReplication() throws Throwable
+    {
+        // With a replication factor of 1 for our only user keyspace, system 
keyspaces using local replication, and
+        // empty distributed system tables, replicated and unreplicated 
versions of our compressed and uncompressed
+        // metrics should be equivalent.
+
+        try (Cluster cluster = init(builder().withNodes(2).start(), 1))
+        {
+            populateUserKeyspace(cluster);
+            verifyLoadMetricsWithoutReplication(cluster.get(1));
+            verifyLoadMetricsWithoutReplication(cluster.get(2));
+        }
+    }
+
+    private void verifyLoadMetricsWithoutReplication(IInvokableInstance node)
+    {
+        long compressedLoad = getLoad(node);
+        long uncompressedLoad = getUncompressedLoad(node);
+        assertThat(compressedLoad).isEqualTo(getUnreplicatedLoad(node));
+        
assertThat(uncompressedLoad).isEqualTo(getUnreplicatedUncompressedLoad(node));
+        assertThat(uncompressedLoad).isGreaterThan(compressedLoad);
+    }
+
+    @Test
+    public void testSimpleReplication() throws Throwable
+    {
+        // With a replication factor of 2 for our only user keyspace, disk 
space used by that keyspace should
+        // be scaled down by a factor of 2, while contributions from system 
keyspaces are unaffected.
+
+        try (Cluster cluster = init(builder().withNodes(3).start(), 2))
+        {
+            populateUserKeyspace(cluster);
+
+            verifyLoadMetricsWithReplication(cluster.get(1));
+            verifyLoadMetricsWithReplication(cluster.get(2));
+            verifyLoadMetricsWithReplication(cluster.get(3));
+        }
+    }
+
+    @Test
+    public void testMultiDatacenterReplication() throws Throwable
+    {
+        // With a replication factor of 1 for our only user keyspace in two 
DCs, disk space used by that keyspace should
+        // be scaled down by a factor of 2, while contributions from system 
keyspaces are unaffected.
+
+        try (Cluster cluster = builder().withDC("DC1", 2).withDC("DC2", 
2).start())
+        {
+            cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'NetworkTopologyStrategy', 'DC1': 1, 'DC2': 1};");
+            populateUserKeyspace(cluster);
+
+            verifyLoadMetricsWithReplication(cluster.get(1));
+            verifyLoadMetricsWithReplication(cluster.get(2));
+            verifyLoadMetricsWithReplication(cluster.get(3));
+            verifyLoadMetricsWithReplication(cluster.get(4));
+        }
+    }
+
+    private void populateUserKeyspace(Cluster cluster)
+    {
+        cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, v 
text, PRIMARY KEY (pk));"));
+
+        for (int i = 0; i < MUTATIONS; i++) {
+            cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl 
(pk, v) VALUES (?,?)"), ConsistencyLevel.ALL, i, "compressable");
+        }
+
+        cluster.forEach((i) -> i.flush(KEYSPACE));
+    }
+
+    private void verifyLoadMetricsWithReplication(IInvokableInstance node)
+    {
+        long unreplicatedLoad = getUnreplicatedLoad(node);
+        long expectedUnreplicatedLoad = computeUnreplicatedMetric(node, table 
-> table.metric.liveDiskSpaceUsed.getCount());
+        assertThat(expectedUnreplicatedLoad).isEqualTo(unreplicatedLoad);
+        assertThat(getLoad(node)).isGreaterThan(unreplicatedLoad);
+
+        long unreplicatedUncompressedLoad = 
getUnreplicatedUncompressedLoad(node);
+        long expectedUnreplicatedUncompressedLoad = 
computeUnreplicatedMetric(node, table -> 
table.metric.uncompressedLiveDiskSpaceUsed.getCount());
+        
assertThat(expectedUnreplicatedUncompressedLoad).isEqualTo(unreplicatedUncompressedLoad);
+        
assertThat(getUncompressedLoad(node)).isGreaterThan(unreplicatedUncompressedLoad);
+    }
+
+    private long getLoad(IInvokableInstance node)
+    {
+        return 
node.metrics().getCounter(FACTORY.createMetricName("Load").getMetricName());
+    }
+
+    private long getUncompressedLoad(IInvokableInstance node1)
+    {
+        return 
node1.metrics().getCounter(FACTORY.createMetricName("UncompressedLoad").getMetricName());
+    }
+
+    private long getUnreplicatedLoad(IInvokableInstance node)
+    {
+        return (Long) 
node.metrics().getGauge(FACTORY.createMetricName("UnreplicatedLoad").getMetricName());
+    }
+
+    private long getUnreplicatedUncompressedLoad(IInvokableInstance node)
+    {
+        return (Long) 
node.metrics().getGauge(FACTORY.createMetricName("UnreplicatedUncompressedLoad").getMetricName());
+    }
+
+    private long computeUnreplicatedMetric(IInvokableInstance node, 
SerializableFunction<ColumnFamilyStore, Long> metric)
+    {
+        return node.callOnInstance(() ->
+                                   {
+                                       long sum = 0;
+
+                                       for (Keyspace keyspace : Keyspace.all())
+                                           for (ColumnFamilyStore table : 
keyspace.getColumnFamilyStores())
+                                               sum += metric.apply(table) / 
keyspace.getReplicationStrategy().getReplicationFactor().fullReplicas;
+
+                                       return sum;
+                                   });
+    }
+}
\ No newline at end of file
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
index 9087f96296..486d9f5618 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
@@ -116,4 +116,21 @@ public class NodeToolTest extends TestBaseImpl
             ringResult.asserts().stderrContains("is not permitted as this 
cache is disabled");
         }
     }
+
+    @Test
+    public void testInfoOutput() throws Throwable
+    {
+        try (ICluster<?> cluster = init(builder().withNodes(1).start()))
+        {
+            NodeToolResult ringResult = cluster.get(1).nodetoolResult("info");
+            ringResult.asserts().stdoutContains("ID");
+            ringResult.asserts().stdoutContains("Gossip active");
+            ringResult.asserts().stdoutContains("Native Transport active");
+            ringResult.asserts().stdoutContains("Load");
+            ringResult.asserts().stdoutContains("Uncompressed load");
+            ringResult.asserts().stdoutContains("Generation");
+            ringResult.asserts().stdoutContains("Uptime");
+            ringResult.asserts().stdoutContains("Heap Memory");
+        }
+    }
 }
diff --git a/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java 
b/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java
index be8b162766..f07936593a 100644
--- a/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java
+++ b/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.io;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.ImmutableMap;
@@ -38,9 +39,13 @@ import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.io.sstable.IndexSummaryManager;
 import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+
 public class DiskSpaceMetricsTest extends CQLTester
 {
     /**
@@ -102,18 +107,36 @@ public class DiskSpaceMetricsTest extends CQLTester
 
     private void assertDiskSpaceEqual(ColumnFamilyStore cfs)
     {
+        Set<SSTableReader> liveSSTables = 
cfs.getTracker().getView().liveSSTables();
         long liveDiskSpaceUsed = cfs.metric.liveDiskSpaceUsed.getCount();
-        long actual = 0;
-        for (SSTableReader sstable : cfs.getTracker().getView().liveSSTables())
-            actual += sstable.bytesOnDisk();
+        long actual = 
liveSSTables.stream().mapToLong(SSTableReader::bytesOnDisk).sum();
+        long uncompressedLiveDiskSpaceUsed = 
cfs.metric.uncompressedLiveDiskSpaceUsed.getCount();
+        long actualUncompressed = 
liveSSTables.stream().mapToLong(SSTableReader::logicalBytesOnDisk).sum();
+
+        assertEquals("bytes on disk does not match current metric 
LiveDiskSpaceUsed", actual, liveDiskSpaceUsed);
+        assertEquals("bytes on disk does not match current metric 
UncompressedLiveDiskSpaceUsed", actualUncompressed, 
uncompressedLiveDiskSpaceUsed);
+
+        // Keyspace-level metrics should be equivalent to table-level metrics, 
as there is only one table.
+        
assertEquals(cfs.keyspace.metric.liveDiskSpaceUsed.getValue().longValue(), 
liveDiskSpaceUsed);
+        
assertEquals(cfs.keyspace.metric.uncompressedLiveDiskSpaceUsed.getValue().longValue(),
 uncompressedLiveDiskSpaceUsed);
+        
assertEquals(cfs.keyspace.metric.unreplicatedLiveDiskSpaceUsed.getValue().longValue(),
 liveDiskSpaceUsed);
+        
assertEquals(cfs.keyspace.metric.unreplicatedUncompressedLiveDiskSpaceUsed.getValue().longValue(),
 uncompressedLiveDiskSpaceUsed);
+
+        // Global load metrics should be internally consistent, given there is 
no replication, but slightly greater
+        // than table and keyspace-level metrics, given the global versions 
account for non-user tables.
+        long globalLoad = StorageMetrics.load.getCount();
+        assertEquals(globalLoad, 
StorageMetrics.unreplicatedLoad.getValue().longValue());
+        assertThat(globalLoad).isGreaterThan(liveDiskSpaceUsed);
 
-        Assert.assertEquals("bytes on disk does not match current metric 
liveDiskSpaceUsed", actual, liveDiskSpaceUsed);
+        long globalUncompressedLoad = 
StorageMetrics.uncompressedLoad.getCount();
+        assertEquals(globalUncompressedLoad, 
StorageMetrics.unreplicatedUncompressedLoad.getValue().longValue());
+        
assertThat(globalUncompressedLoad).isGreaterThan(uncompressedLiveDiskSpaceUsed);
 
         // totalDiskSpaceUsed is based off SStable delete, which is async: 
LogTransaction's tidy enqueues in ScheduledExecutors.nonPeriodicTasks
         // wait for there to be no more pending sstable releases
         LifecycleTransaction.waitForDeletions();
         long totalDiskSpaceUsed = cfs.metric.totalDiskSpaceUsed.getCount();
-        Assert.assertEquals("bytes on disk does not match current metric 
totalDiskSpaceUsed", actual, totalDiskSpaceUsed);
+        assertEquals("bytes on disk does not match current metric 
totalDiskSpaceUsed", actual, totalDiskSpaceUsed);
     }
 
     private static void indexDownsampleCancelLastSSTable(ColumnFamilyStore cfs)
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryRedistributionTest.java 
b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryRedistributionTest.java
index 57e4d4e4bd..9c121bf7bf 100644
--- 
a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryRedistributionTest.java
+++ 
b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryRedistributionTest.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.metrics.RestorableMeter;
@@ -68,8 +69,12 @@ public class IndexSummaryRedistributionTest
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
         int numSSTables = 1;
         int numRows = 1024 * 10;
+
         long load = StorageMetrics.load.getCount();
         StorageMetrics.load.dec(load); // reset the load metric
+        long uncompressedLoad = StorageMetrics.uncompressedLoad.getCount();
+        StorageMetrics.uncompressedLoad.dec(uncompressedLoad); // reset the 
uncompressed load metric
+
         createSSTables(ksname, cfname, numSSTables, numRows);
 
         List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
@@ -77,32 +82,45 @@ public class IndexSummaryRedistributionTest
             sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
 
         long oldSize = 0;
+        long oldSizeUncompressed = 0;
+
         for (SSTableReader sstable : sstables)
         {
             assertEquals(cfs.metadata().params.minIndexInterval, 
sstable.getEffectiveIndexInterval(), 0.001);
             oldSize += sstable.bytesOnDisk();
+            oldSizeUncompressed += sstable.logicalBytesOnDisk();
         }
 
         load = StorageMetrics.load.getCount();
         long others = load - oldSize; // Other SSTables size, e.g. schema and 
other system SSTables
 
+        uncompressedLoad = StorageMetrics.uncompressedLoad.getCount();
+        long othersUncompressed = uncompressedLoad - oldSizeUncompressed;
+
         int originalMinIndexInterval = cfs.metadata().params.minIndexInterval;
         // double the min_index_interval
         
SchemaTestUtil.announceTableUpdate(cfs.metadata().unbuild().minIndexInterval(originalMinIndexInterval
 * 2).build());
         IndexSummaryManager.instance.redistributeSummaries();
 
         long newSize = 0;
+        long newSizeUncompressed = 0;
+
         for (SSTableReader sstable : cfs.getLiveSSTables())
         {
             assertEquals(cfs.metadata().params.minIndexInterval, 
sstable.getEffectiveIndexInterval(), 0.001);
             assertEquals(numRows / cfs.metadata().params.minIndexInterval, 
sstable.getIndexSummarySize());
             newSize += sstable.bytesOnDisk();
+            newSizeUncompressed += sstable.logicalBytesOnDisk();
         }
+
         newSize += others;
         load = StorageMetrics.load.getCount();
-
         // new size we calculate should be almost the same as the load in 
metrics
-        assertEquals(newSize, load, newSize / 10);
+        assertEquals(newSize, load, newSize / 10.0);
+
+        newSizeUncompressed += othersUncompressed;
+        uncompressedLoad = StorageMetrics.uncompressedLoad.getCount();
+        assertEquals(newSizeUncompressed, uncompressedLoad, 
newSizeUncompressed / 10.0);
     }
 
     private void createSSTables(String ksname, String cfname, int numSSTables, 
int numRows)
@@ -112,7 +130,7 @@ public class IndexSummaryRedistributionTest
         cfs.truncateBlocking();
         cfs.disableAutoCompaction();
 
-        ArrayList<Future> futures = new ArrayList<>(numSSTables);
+        ArrayList<Future<CommitLogPosition>> futures = new 
ArrayList<>(numSSTables);
         ByteBuffer value = ByteBuffer.wrap(new byte[100]);
         for (int sstable = 0; sstable < numSSTables; sstable++)
         {
@@ -127,7 +145,7 @@ public class IndexSummaryRedistributionTest
             }
             
futures.add(cfs.forceFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS));
         }
-        for (Future future : futures)
+        for (Future<CommitLogPosition> future : futures)
         {
             try
             {
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index c88e0b09f7..363a000186 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -205,7 +205,9 @@ public class SSTableRewriterTest extends 
SSTableWriterTestBase
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
         long startStorageMetricsLoad = StorageMetrics.load.getCount();
+        long startUncompressedLoad = 
StorageMetrics.uncompressedLoad.getCount();
         long sBytesOnDisk = s.bytesOnDisk();
+        long sBytesOnDiskUncompressed = s.logicalBytesOnDisk();
         Set<SSTableReader> compacting = Sets.newHashSet(s);
 
         List<SSTableReader> sstables;
@@ -236,11 +238,15 @@ public class SSTableRewriterTest extends 
SSTableWriterTestBase
 
         LifecycleTransaction.waitForDeletions();
 
-        long sum = 0;
-        for (SSTableReader x : cfs.getLiveSSTables())
-            sum += x.bytesOnDisk();
+        long sum = 
cfs.getLiveSSTables().stream().mapToLong(SSTableReader::bytesOnDisk).sum();
         assertEquals(sum, cfs.metric.liveDiskSpaceUsed.getCount());
-        assertEquals(startStorageMetricsLoad - sBytesOnDisk + sum, 
StorageMetrics.load.getCount());
+        long endLoad = StorageMetrics.load.getCount();
+        assertEquals(startStorageMetricsLoad - sBytesOnDisk + sum, endLoad);
+
+        long uncompressedSum = cfs.getLiveSSTables().stream().mapToLong(t -> 
t.logicalBytesOnDisk()).sum();
+        long endUncompressedLoad = StorageMetrics.uncompressedLoad.getCount();
+        assertEquals(startUncompressedLoad - sBytesOnDiskUncompressed + 
uncompressedSum, endUncompressedLoad);
+
         assertEquals(files, sstables.size());
         assertEquals(files, cfs.getLiveSSTables().size());
         LifecycleTransaction.waitForDeletions();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to