This is an automated email from the ASF dual-hosted git repository.
jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new bd2ecb3454 Add metrics around storage usage and compression
bd2ecb3454 is described below
commit bd2ecb3454649d8c84cf0b1ce8c1e94ed1e06e74
Author: Josh McKenzie <[email protected]>
AuthorDate: Thu Sep 15 14:22:34 2022 -0400
Add metrics around storage usage and compression
Patch by Caleb Rackliffe; reviewed by Abe Ratnofsky and Josh McKenzie for
CASSANDRA-17898
Co-authored-by: Caleb Rackliffe <[email protected]>
Co-authored-by: Josh McKenzie <[email protected]>
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/lifecycle/Tracker.java | 10 ++
.../io/sstable/IndexSummaryRedistribution.java | 18 ++-
.../org/apache/cassandra/io/sstable/SSTable.java | 10 --
.../cassandra/io/sstable/format/SSTableReader.java | 29 ++++
.../apache/cassandra/metrics/KeyspaceMetrics.java | 24 ++-
.../apache/cassandra/metrics/StorageMetrics.java | 20 +++
.../org/apache/cassandra/metrics/TableMetrics.java | 3 +
.../apache/cassandra/service/StorageService.java | 8 +-
.../cassandra/service/StorageServiceMBean.java | 3 +
src/java/org/apache/cassandra/tools/NodeProbe.java | 5 +
.../org/apache/cassandra/tools/nodetool/Info.java | 2 +
.../distributed/test/ClusterStorageUsageTest.java | 165 +++++++++++++++++++++
.../cassandra/distributed/test/NodeToolTest.java | 17 +++
.../apache/cassandra/io/DiskSpaceMetricsTest.java | 33 ++++-
.../io/sstable/IndexSummaryRedistributionTest.java | 26 +++-
.../cassandra/io/sstable/SSTableRewriterTest.java | 14 +-
17 files changed, 356 insertions(+), 32 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index c59faa6fe1..0439b08b82 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Add metrics around storage usage and compression (CASSANDRA-17898)
* Remove usage of deprecated javax certificate classes (CASSANDRA-17867)
* Make sure preview repairs don't optimise streams unless configured to
(CASSANDRA-17865)
* Optionally avoid hint transfer during decommission (CASSANDRA-17808)
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index ab8a74bd1a..66ecf1c8d8 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -152,6 +152,8 @@ public class Tracker
return accumulate;
long add = 0;
+ long addUncompressed = 0;
+
for (SSTableReader sstable : newSSTables)
{
if (logger.isTraceEnabled())
@@ -159,13 +161,17 @@ public class Tracker
try
{
add += sstable.bytesOnDisk();
+ addUncompressed += sstable.logicalBytesOnDisk();
}
catch (Throwable t)
{
accumulate = merge(accumulate, t);
}
}
+
long subtract = 0;
+ long subtractUncompressed = 0;
+
for (SSTableReader sstable : oldSSTables)
{
if (logger.isTraceEnabled())
@@ -173,6 +179,7 @@ public class Tracker
try
{
subtract += sstable.bytesOnDisk();
+ subtractUncompressed += sstable.logicalBytesOnDisk();
}
catch (Throwable t)
{
@@ -181,7 +188,10 @@ public class Tracker
}
StorageMetrics.load.inc(add - subtract);
+ StorageMetrics.uncompressedLoad.inc(addUncompressed -
subtractUncompressed);
+
cfstore.metric.liveDiskSpaceUsed.inc(add - subtract);
+ cfstore.metric.uncompressedLiveDiskSpaceUsed.inc(addUncompressed -
subtractUncompressed);
// we don't subtract from total until the sstable is deleted, see
TransactionLogs.SSTableTidier
cfstore.metric.totalDiskSpaceUsed.inc(add);
diff --git
a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index 8bbe709e13..74ae43daaf 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@ -262,11 +262,15 @@ public class IndexSummaryRedistribution extends
CompactionInfo.Holder
entry.newSamplingLevel,
Downsampling.BASE_SAMPLING_LEVEL);
ColumnFamilyStore cfs =
Keyspace.open(sstable.metadata().keyspace).getColumnFamilyStore(sstable.metadata().id);
long oldSize = sstable.bytesOnDisk();
+ long oldSizeUncompressed = sstable.logicalBytesOnDisk();
+
SSTableReader replacement =
sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel);
long newSize = replacement.bytesOnDisk();
+ long newSizeUncompressed = replacement.logicalBytesOnDisk();
+
newSSTables.add(replacement);
transactions.get(sstable.metadata().id).update(replacement, true);
- addHooks(cfs, transactions, oldSize, newSize);
+ addHooks(cfs, transactions, oldSize, newSize, oldSizeUncompressed,
newSizeUncompressed);
}
return newSSTables;
@@ -276,20 +280,28 @@ public class IndexSummaryRedistribution extends
CompactionInfo.Holder
* Add hooks to correctly update the storage load metrics once the
transaction is closed/aborted
*/
@SuppressWarnings("resource") // Transactions are closed in finally
outside of this method
- private void addHooks(ColumnFamilyStore cfs, Map<TableId,
LifecycleTransaction> transactions, long oldSize, long newSize)
+ private void addHooks(ColumnFamilyStore cfs, Map<TableId,
LifecycleTransaction> transactions, long oldSize, long newSize, long
oldSizeUncompressed, long newSizeUncompressed)
{
LifecycleTransaction txn = transactions.get(cfs.metadata.id);
txn.runOnCommit(() -> {
// The new size will be added in Transactional.commit() as an
updated SSTable, more details: CASSANDRA-13738
StorageMetrics.load.dec(oldSize);
+ StorageMetrics.uncompressedLoad.dec(oldSizeUncompressed);
+
cfs.metric.liveDiskSpaceUsed.dec(oldSize);
+ cfs.metric.uncompressedLiveDiskSpaceUsed.dec(oldSizeUncompressed);
cfs.metric.totalDiskSpaceUsed.dec(oldSize);
});
txn.runOnAbort(() -> {
- // the local disk was modified but book keeping couldn't be
commited, apply the delta
+ // the local disk was modified but bookkeeping couldn't be
commited, apply the delta
long delta = oldSize - newSize; // if new is larger this will be
negative, so dec will become a inc
+ long deltaUncompressed = oldSizeUncompressed - newSizeUncompressed;
+
StorageMetrics.load.dec(delta);
+ StorageMetrics.uncompressedLoad.dec(deltaUncompressed);
+
cfs.metric.liveDiskSpaceUsed.dec(delta);
+ cfs.metric.uncompressedLiveDiskSpaceUsed.dec(deltaUncompressed);
cfs.metric.totalDiskSpaceUsed.dec(delta);
});
}
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java
b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 5194abb101..3c4f5cd0a9 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -320,16 +320,6 @@ public abstract class SSTable
return estimatedRows;
}
- public long bytesOnDisk()
- {
- long bytes = 0;
- for (Component component : components)
- {
- bytes += new File(descriptor.filenameFor(component)).length();
- }
- return bytes;
- }
-
@Override
public String toString()
{
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index f26cf65c93..d7dad42b16 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -2360,4 +2360,33 @@ public abstract class SSTableReader extends SSTable
implements UnfilteredSource,
ExecutorUtils.shutdownNowAndWait(timeout, unit, syncExecutor);
resetTidying();
}
+
+ /**
+ * @return the physical size on disk of all components for this SSTable in
bytes
+ */
+ public long bytesOnDisk()
+ {
+ return bytesOnDisk(false);
+ }
+
+ /**
+ * @return the total logical/uncompressed size in bytes of all components
for this SSTable
+ */
+ public long logicalBytesOnDisk()
+ {
+ return bytesOnDisk(true);
+ }
+
+ private long bytesOnDisk(boolean logical)
+ {
+ long bytes = 0;
+ for (Component component : components)
+ {
+ // Only the data file is compressable.
+ bytes += logical && component == Component.DATA && compression
+ ? getCompressionMetadata().dataLength
+ : new File(descriptor.filenameFor(component)).length();
+ }
+ return bytes;
+ }
}
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 776027e395..bba4cd3213 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -59,9 +59,14 @@ public class KeyspaceMetrics
public final Gauge<Long> pendingFlushes;
/** Estimate of number of pending compactios for this CF */
public final Gauge<Long> pendingCompactions;
- /** Disk space used by SSTables belonging to this CF */
+ /** Disk space used by SSTables belonging to tables in this keyspace */
public final Gauge<Long> liveDiskSpaceUsed;
- /** Total disk space used by SSTables belonging to this CF, including
obsolete ones waiting to be GC'd */
+ /** Disk space used by SSTables belonging to tables in this keyspace,
scaled down by replication factor */
+ public final Gauge<Long> unreplicatedLiveDiskSpaceUsed;
+ /** Uncompressed/logical size of SSTables belonging to tables in this
keyspace */
+ public final Gauge<Long> uncompressedLiveDiskSpaceUsed;
+ /** Uncompressed/logical size of SSTables belonging to tables in this
keyspace, scaled down by replication factor */
+ public final Gauge<Long> unreplicatedUncompressedLiveDiskSpaceUsed;
public final Gauge<Long> totalDiskSpaceUsed;
/** Disk space used by bloom filter */
public final Gauge<Long> bloomFilterDiskSpaceUsed;
@@ -169,7 +174,7 @@ public class KeyspaceMetrics
public final Histogram rowIndexSize;
public final MetricNameFactory factory;
- private Keyspace keyspace;
+ private final Keyspace keyspace;
/** set containing names of all the metrics stored here, for releasing
later */
private Set<ReleasableMetric> allMetrics = Sets.newHashSet();
@@ -201,8 +206,15 @@ public class KeyspaceMetrics
metric -> metric.memtableSwitchCount.getCount());
pendingCompactions = createKeyspaceGauge("PendingCompactions", metric
-> metric.pendingCompactions.getValue());
pendingFlushes = createKeyspaceGauge("PendingFlushes", metric ->
metric.pendingFlushes.getCount());
+
liveDiskSpaceUsed = createKeyspaceGauge("LiveDiskSpaceUsed", metric ->
metric.liveDiskSpaceUsed.getCount());
+ uncompressedLiveDiskSpaceUsed =
createKeyspaceGauge("UncompressedLiveDiskSpaceUsed", metric ->
metric.uncompressedLiveDiskSpaceUsed.getCount());
+ unreplicatedLiveDiskSpaceUsed =
createKeyspaceGauge("UnreplicatedLiveDiskSpaceUsed",
+ metric ->
metric.liveDiskSpaceUsed.getCount() /
keyspace.getReplicationStrategy().getReplicationFactor().fullReplicas);
+ unreplicatedUncompressedLiveDiskSpaceUsed =
createKeyspaceGauge("UnreplicatedUncompressedLiveDiskSpaceUsed",
+ metric
-> metric.uncompressedLiveDiskSpaceUsed.getCount() /
keyspace.getReplicationStrategy().getReplicationFactor().fullReplicas);
totalDiskSpaceUsed = createKeyspaceGauge("TotalDiskSpaceUsed", metric
-> metric.totalDiskSpaceUsed.getCount());
+
bloomFilterDiskSpaceUsed =
createKeyspaceGauge("BloomFilterDiskSpaceUsed",
metric -> metric.bloomFilterDiskSpaceUsed.getValue());
bloomFilterOffHeapMemoryUsed =
createKeyspaceGauge("BloomFilterOffHeapMemoryUsed",
@@ -280,8 +292,10 @@ public class KeyspaceMetrics
/**
* Creates a gauge that will sum the current value of a metric for all
column families in this keyspace
- * @param name
- * @param extractor
+ *
+ * @param name the name of the metric being created
+ * @param extractor a function that produces a specified metric value for
a given table
+ *
* @return Gauge>Long> that computes sum of MetricValue.getValue()
*/
private Gauge<Long> createKeyspaceGauge(String name, final
ToLongFunction<TableMetrics> extractor)
diff --git a/src/java/org/apache/cassandra/metrics/StorageMetrics.java
b/src/java/org/apache/cassandra/metrics/StorageMetrics.java
index 9399ba6789..d86a2144f2 100644
--- a/src/java/org/apache/cassandra/metrics/StorageMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/StorageMetrics.java
@@ -17,7 +17,12 @@
*/
package org.apache.cassandra.metrics;
+import java.util.function.ToLongFunction;
+import java.util.stream.StreamSupport;
+
import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import org.apache.cassandra.db.Keyspace;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
@@ -29,8 +34,23 @@ public class StorageMetrics
private static final MetricNameFactory factory = new
DefaultNameFactory("Storage");
public static final Counter load =
Metrics.counter(factory.createMetricName("Load"));
+ public static final Counter uncompressedLoad =
Metrics.counter(factory.createMetricName("UncompressedLoad"));
+
+ public static final Gauge<Long> unreplicatedLoad =
+ createSummingGauge("UnreplicatedLoad", metric ->
metric.unreplicatedLiveDiskSpaceUsed.getValue());
+ public static final Gauge<Long> unreplicatedUncompressedLoad =
+ createSummingGauge("UnreplicatedUncompressedLoad", metric ->
metric.unreplicatedUncompressedLiveDiskSpaceUsed.getValue());
+
public static final Counter uncaughtExceptions =
Metrics.counter(factory.createMetricName("Exceptions"));
public static final Counter totalHintsInProgress =
Metrics.counter(factory.createMetricName("TotalHintsInProgress"));
public static final Counter totalHints =
Metrics.counter(factory.createMetricName("TotalHints"));
public static final Counter repairExceptions =
Metrics.counter(factory.createMetricName("RepairExceptions"));
+
+ private static Gauge<Long> createSummingGauge(String name,
ToLongFunction<KeyspaceMetrics> extractor)
+ {
+ return Metrics.register(factory.createMetricName(name),
+ () ->
StreamSupport.stream(Keyspace.all().spliterator(), false)
+ .mapToLong(keyspace ->
extractor.applyAsLong(keyspace.metric))
+ .sum());
+ }
}
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java
b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 5e7ab78b29..8f3645dd1e 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -122,6 +122,8 @@ public class TableMetrics
public final Gauge<Integer> oldVersionSSTableCount;
/** Disk space used by SSTables belonging to this table */
public final Counter liveDiskSpaceUsed;
+ /** Uncompressed/logical disk space used by SSTables belonging to this
table */
+ public final Counter uncompressedLiveDiskSpaceUsed;
/** Total disk space used by SSTables belonging to this table, including
obsolete ones waiting to be GC'd */
public final Counter totalDiskSpaceUsed;
/** Size of the smallest compacted partition */
@@ -605,6 +607,7 @@ public class TableMetrics
}
});
liveDiskSpaceUsed = createTableCounter("LiveDiskSpaceUsed");
+ uncompressedLiveDiskSpaceUsed =
createTableCounter("UncompressedLiveDiskSpaceUsed");
totalDiskSpaceUsed = createTableCounter("TotalDiskSpaceUsed");
minPartitionSize = createTableGauge("MinPartitionSize", "MinRowSize",
new Gauge<Long>()
{
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index 47fad4a7e0..70f0f17203 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3553,12 +3553,18 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
updateNetVersion(endpoint, netVersion);
}
-
+ @Override
public String getLoadString()
{
return FileUtils.stringifyFileSize(StorageMetrics.load.getCount());
}
+ @Override
+ public String getUncompressedLoadString()
+ {
+ return
FileUtils.stringifyFileSize(StorageMetrics.uncompressedLoad.getCount());
+ }
+
public Map<String, String> getLoadMapWithPort()
{
return getLoadMap(true);
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 7e512cdaf7..d530dd6b3a 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -209,6 +209,9 @@ public interface StorageServiceMBean extends
NotificationEmitter
/** Human-readable load value */
public String getLoadString();
+ /** Human-readable uncompressed load value */
+ public String getUncompressedLoadString();
+
/** Human-readable load value. Keys are IP addresses. */
@Deprecated public Map<String, String> getLoadMap();
public Map<String, String> getLoadMapWithPort();
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index daf4eb2518..60edad3d1f 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -770,6 +770,11 @@ public class NodeProbe implements AutoCloseable
return ssProxy.getLoadString();
}
+ public String getUncompressedLoadString()
+ {
+ return ssProxy.getUncompressedLoadString();
+ }
+
public String getReleaseVersion()
{
return ssProxy.getReleaseVersion();
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Info.java
b/src/java/org/apache/cassandra/tools/nodetool/Info.java
index 69661571a9..90c9e9d8bd 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Info.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Info.java
@@ -51,6 +51,8 @@ public class Info extends NodeToolCmd
out.printf("%-23s: %s%n", "Gossip active", gossipInitialized);
out.printf("%-23s: %s%n", "Native Transport active",
probe.isNativeTransportRunning());
out.printf("%-23s: %s%n", "Load", probe.getLoadString());
+ out.printf("%-23s: %s%n", "Uncompressed load",
probe.getUncompressedLoadString());
+
if (gossipInitialized)
out.printf("%-23s: %s%n", "Generation No",
probe.getCurrentGenerationNumber());
else
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ClusterStorageUsageTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ClusterStorageUsageTest.java
new file mode 100644
index 0000000000..9c379d1d50
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/ClusterStorageUsageTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import
org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableFunction;
+import org.apache.cassandra.metrics.DefaultNameFactory;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * This class verifies the behavior of our global disk usage metrics across
different cluster and replication
+ * configurations. In addition, it verifies that they are properly exposed via
the public metric registry.
+ *
+ * Disk usage metrics are characterized by how they handle compression and
replication:
+ *
+ * "compressed" -> indicates raw disk usage
+ * "uncompressed" -> indicates uncompressed file size (which is equivalent to
"compressed" with no compression enabled)
+ * "replicated" -> includes disk usage for data outside the node's primary
range
+ * "unreplicated" -> indicates disk usage scaled down by replication factor
across the entire cluster
+ */
+public class ClusterStorageUsageTest extends TestBaseImpl
+{
+ private static final DefaultNameFactory FACTORY = new
DefaultNameFactory("Storage");
+ private static final int MUTATIONS = 1000;
+
+ @Test
+ public void testNoReplication() throws Throwable
+ {
+ // With a replication factor of 1 for our only user keyspace, system
keyspaces using local replication, and
+ // empty distributed system tables, replicated and unreplicated
versions of our compressed and uncompressed
+ // metrics should be equivalent.
+
+ try (Cluster cluster = init(builder().withNodes(2).start(), 1))
+ {
+ populateUserKeyspace(cluster);
+ verifyLoadMetricsWithoutReplication(cluster.get(1));
+ verifyLoadMetricsWithoutReplication(cluster.get(2));
+ }
+ }
+
+ private void verifyLoadMetricsWithoutReplication(IInvokableInstance node)
+ {
+ long compressedLoad = getLoad(node);
+ long uncompressedLoad = getUncompressedLoad(node);
+ assertThat(compressedLoad).isEqualTo(getUnreplicatedLoad(node));
+
assertThat(uncompressedLoad).isEqualTo(getUnreplicatedUncompressedLoad(node));
+ assertThat(uncompressedLoad).isGreaterThan(compressedLoad);
+ }
+
+ @Test
+ public void testSimpleReplication() throws Throwable
+ {
+ // With a replication factor of 2 for our only user keyspace, disk
space used by that keyspace should
+ // be scaled down by a factor of 2, while contributions from system
keyspaces are unaffected.
+
+ try (Cluster cluster = init(builder().withNodes(3).start(), 2))
+ {
+ populateUserKeyspace(cluster);
+
+ verifyLoadMetricsWithReplication(cluster.get(1));
+ verifyLoadMetricsWithReplication(cluster.get(2));
+ verifyLoadMetricsWithReplication(cluster.get(3));
+ }
+ }
+
+ @Test
+ public void testMultiDatacenterReplication() throws Throwable
+ {
+ // With a replication factor of 1 for our only user keyspace in two
DCs, disk space used by that keyspace should
+ // be scaled down by a factor of 2, while contributions from system
keyspaces are unaffected.
+
+ try (Cluster cluster = builder().withDC("DC1", 2).withDC("DC2",
2).start())
+ {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH
replication = {'class': 'NetworkTopologyStrategy', 'DC1': 1, 'DC2': 1};");
+ populateUserKeyspace(cluster);
+
+ verifyLoadMetricsWithReplication(cluster.get(1));
+ verifyLoadMetricsWithReplication(cluster.get(2));
+ verifyLoadMetricsWithReplication(cluster.get(3));
+ verifyLoadMetricsWithReplication(cluster.get(4));
+ }
+ }
+
+ private void populateUserKeyspace(Cluster cluster)
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, v
text, PRIMARY KEY (pk));"));
+
+ for (int i = 0; i < MUTATIONS; i++) {
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl
(pk, v) VALUES (?,?)"), ConsistencyLevel.ALL, i, "compressable");
+ }
+
+ cluster.forEach((i) -> i.flush(KEYSPACE));
+ }
+
+ private void verifyLoadMetricsWithReplication(IInvokableInstance node)
+ {
+ long unreplicatedLoad = getUnreplicatedLoad(node);
+ long expectedUnreplicatedLoad = computeUnreplicatedMetric(node, table
-> table.metric.liveDiskSpaceUsed.getCount());
+ assertThat(expectedUnreplicatedLoad).isEqualTo(unreplicatedLoad);
+ assertThat(getLoad(node)).isGreaterThan(unreplicatedLoad);
+
+ long unreplicatedUncompressedLoad =
getUnreplicatedUncompressedLoad(node);
+ long expectedUnreplicatedUncompressedLoad =
computeUnreplicatedMetric(node, table ->
table.metric.uncompressedLiveDiskSpaceUsed.getCount());
+
assertThat(expectedUnreplicatedUncompressedLoad).isEqualTo(unreplicatedUncompressedLoad);
+
assertThat(getUncompressedLoad(node)).isGreaterThan(unreplicatedUncompressedLoad);
+ }
+
+ private long getLoad(IInvokableInstance node)
+ {
+ return
node.metrics().getCounter(FACTORY.createMetricName("Load").getMetricName());
+ }
+
+ private long getUncompressedLoad(IInvokableInstance node1)
+ {
+ return
node1.metrics().getCounter(FACTORY.createMetricName("UncompressedLoad").getMetricName());
+ }
+
+ private long getUnreplicatedLoad(IInvokableInstance node)
+ {
+ return (Long)
node.metrics().getGauge(FACTORY.createMetricName("UnreplicatedLoad").getMetricName());
+ }
+
+ private long getUnreplicatedUncompressedLoad(IInvokableInstance node)
+ {
+ return (Long)
node.metrics().getGauge(FACTORY.createMetricName("UnreplicatedUncompressedLoad").getMetricName());
+ }
+
+ private long computeUnreplicatedMetric(IInvokableInstance node,
SerializableFunction<ColumnFamilyStore, Long> metric)
+ {
+ return node.callOnInstance(() ->
+ {
+ long sum = 0;
+
+ for (Keyspace keyspace : Keyspace.all())
+ for (ColumnFamilyStore table :
keyspace.getColumnFamilyStores())
+ sum += metric.apply(table) /
keyspace.getReplicationStrategy().getReplicationFactor().fullReplicas;
+
+ return sum;
+ });
+ }
+}
\ No newline at end of file
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
b/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
index 9087f96296..486d9f5618 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
@@ -116,4 +116,21 @@ public class NodeToolTest extends TestBaseImpl
ringResult.asserts().stderrContains("is not permitted as this
cache is disabled");
}
}
+
+ @Test
+ public void testInfoOutput() throws Throwable
+ {
+ try (ICluster<?> cluster = init(builder().withNodes(1).start()))
+ {
+ NodeToolResult ringResult = cluster.get(1).nodetoolResult("info");
+ ringResult.asserts().stdoutContains("ID");
+ ringResult.asserts().stdoutContains("Gossip active");
+ ringResult.asserts().stdoutContains("Native Transport active");
+ ringResult.asserts().stdoutContains("Load");
+ ringResult.asserts().stdoutContains("Uncompressed load");
+ ringResult.asserts().stdoutContains("Generation");
+ ringResult.asserts().stdoutContains("Uptime");
+ ringResult.asserts().stdoutContains("Heap Memory");
+ }
+ }
}
diff --git a/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java
b/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java
index be8b162766..f07936593a 100644
--- a/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java
+++ b/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.io;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.ImmutableMap;
@@ -38,9 +39,13 @@ import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.io.sstable.IndexSummaryManager;
import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.utils.FBUtilities;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+
public class DiskSpaceMetricsTest extends CQLTester
{
/**
@@ -102,18 +107,36 @@ public class DiskSpaceMetricsTest extends CQLTester
private void assertDiskSpaceEqual(ColumnFamilyStore cfs)
{
+ Set<SSTableReader> liveSSTables =
cfs.getTracker().getView().liveSSTables();
long liveDiskSpaceUsed = cfs.metric.liveDiskSpaceUsed.getCount();
- long actual = 0;
- for (SSTableReader sstable : cfs.getTracker().getView().liveSSTables())
- actual += sstable.bytesOnDisk();
+ long actual =
liveSSTables.stream().mapToLong(SSTableReader::bytesOnDisk).sum();
+ long uncompressedLiveDiskSpaceUsed =
cfs.metric.uncompressedLiveDiskSpaceUsed.getCount();
+ long actualUncompressed =
liveSSTables.stream().mapToLong(SSTableReader::logicalBytesOnDisk).sum();
+
+ assertEquals("bytes on disk does not match current metric
LiveDiskSpaceUsed", actual, liveDiskSpaceUsed);
+ assertEquals("bytes on disk does not match current metric
UncompressedLiveDiskSpaceUsed", actualUncompressed,
uncompressedLiveDiskSpaceUsed);
+
+ // Keyspace-level metrics should be equivalent to table-level metrics,
as there is only one table.
+
assertEquals(cfs.keyspace.metric.liveDiskSpaceUsed.getValue().longValue(),
liveDiskSpaceUsed);
+
assertEquals(cfs.keyspace.metric.uncompressedLiveDiskSpaceUsed.getValue().longValue(),
uncompressedLiveDiskSpaceUsed);
+
assertEquals(cfs.keyspace.metric.unreplicatedLiveDiskSpaceUsed.getValue().longValue(),
liveDiskSpaceUsed);
+
assertEquals(cfs.keyspace.metric.unreplicatedUncompressedLiveDiskSpaceUsed.getValue().longValue(),
uncompressedLiveDiskSpaceUsed);
+
+ // Global load metrics should be internally consistent, given there is
no replication, but slightly greater
+ // than table and keyspace-level metrics, given the global versions
account for non-user tables.
+ long globalLoad = StorageMetrics.load.getCount();
+ assertEquals(globalLoad,
StorageMetrics.unreplicatedLoad.getValue().longValue());
+ assertThat(globalLoad).isGreaterThan(liveDiskSpaceUsed);
- Assert.assertEquals("bytes on disk does not match current metric
liveDiskSpaceUsed", actual, liveDiskSpaceUsed);
+ long globalUncompressedLoad =
StorageMetrics.uncompressedLoad.getCount();
+ assertEquals(globalUncompressedLoad,
StorageMetrics.unreplicatedUncompressedLoad.getValue().longValue());
+
assertThat(globalUncompressedLoad).isGreaterThan(uncompressedLiveDiskSpaceUsed);
// totalDiskSpaceUsed is based off SStable delete, which is async:
LogTransaction's tidy enqueues in ScheduledExecutors.nonPeriodicTasks
// wait for there to be no more pending sstable releases
LifecycleTransaction.waitForDeletions();
long totalDiskSpaceUsed = cfs.metric.totalDiskSpaceUsed.getCount();
- Assert.assertEquals("bytes on disk does not match current metric
totalDiskSpaceUsed", actual, totalDiskSpaceUsed);
+ assertEquals("bytes on disk does not match current metric
totalDiskSpaceUsed", actual, totalDiskSpaceUsed);
}
private static void indexDownsampleCancelLastSSTable(ColumnFamilyStore cfs)
diff --git
a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryRedistributionTest.java
b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryRedistributionTest.java
index 57e4d4e4bd..9c121bf7bf 100644
---
a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryRedistributionTest.java
+++
b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryRedistributionTest.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.metrics.RestorableMeter;
@@ -68,8 +69,12 @@ public class IndexSummaryRedistributionTest
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
int numSSTables = 1;
int numRows = 1024 * 10;
+
long load = StorageMetrics.load.getCount();
StorageMetrics.load.dec(load); // reset the load metric
+ long uncompressedLoad = StorageMetrics.uncompressedLoad.getCount();
+ StorageMetrics.uncompressedLoad.dec(uncompressedLoad); // reset the
uncompressed load metric
+
createSSTables(ksname, cfname, numSSTables, numRows);
List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
@@ -77,32 +82,45 @@ public class IndexSummaryRedistributionTest
sstable.overrideReadMeter(new RestorableMeter(100.0, 100.0));
long oldSize = 0;
+ long oldSizeUncompressed = 0;
+
for (SSTableReader sstable : sstables)
{
assertEquals(cfs.metadata().params.minIndexInterval,
sstable.getEffectiveIndexInterval(), 0.001);
oldSize += sstable.bytesOnDisk();
+ oldSizeUncompressed += sstable.logicalBytesOnDisk();
}
load = StorageMetrics.load.getCount();
long others = load - oldSize; // Other SSTables size, e.g. schema and
other system SSTables
+ uncompressedLoad = StorageMetrics.uncompressedLoad.getCount();
+ long othersUncompressed = uncompressedLoad - oldSizeUncompressed;
+
int originalMinIndexInterval = cfs.metadata().params.minIndexInterval;
// double the min_index_interval
SchemaTestUtil.announceTableUpdate(cfs.metadata().unbuild().minIndexInterval(originalMinIndexInterval
* 2).build());
IndexSummaryManager.instance.redistributeSummaries();
long newSize = 0;
+ long newSizeUncompressed = 0;
+
for (SSTableReader sstable : cfs.getLiveSSTables())
{
assertEquals(cfs.metadata().params.minIndexInterval,
sstable.getEffectiveIndexInterval(), 0.001);
assertEquals(numRows / cfs.metadata().params.minIndexInterval,
sstable.getIndexSummarySize());
newSize += sstable.bytesOnDisk();
+ newSizeUncompressed += sstable.logicalBytesOnDisk();
}
+
newSize += others;
load = StorageMetrics.load.getCount();
-
// new size we calculate should be almost the same as the load in
metrics
- assertEquals(newSize, load, newSize / 10);
+ assertEquals(newSize, load, newSize / 10.0);
+
+ newSizeUncompressed += othersUncompressed;
+ uncompressedLoad = StorageMetrics.uncompressedLoad.getCount();
+ assertEquals(newSizeUncompressed, uncompressedLoad,
newSizeUncompressed / 10.0);
}
private void createSSTables(String ksname, String cfname, int numSSTables,
int numRows)
@@ -112,7 +130,7 @@ public class IndexSummaryRedistributionTest
cfs.truncateBlocking();
cfs.disableAutoCompaction();
- ArrayList<Future> futures = new ArrayList<>(numSSTables);
+ ArrayList<Future<CommitLogPosition>> futures = new
ArrayList<>(numSSTables);
ByteBuffer value = ByteBuffer.wrap(new byte[100]);
for (int sstable = 0; sstable < numSSTables; sstable++)
{
@@ -127,7 +145,7 @@ public class IndexSummaryRedistributionTest
}
futures.add(cfs.forceFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS));
}
- for (Future future : futures)
+ for (Future<CommitLogPosition> future : futures)
{
try
{
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index c88e0b09f7..363a000186 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -205,7 +205,9 @@ public class SSTableRewriterTest extends
SSTableWriterTestBase
SSTableReader s = writeFile(cfs, 1000);
cfs.addSSTable(s);
long startStorageMetricsLoad = StorageMetrics.load.getCount();
+ long startUncompressedLoad =
StorageMetrics.uncompressedLoad.getCount();
long sBytesOnDisk = s.bytesOnDisk();
+ long sBytesOnDiskUncompressed = s.logicalBytesOnDisk();
Set<SSTableReader> compacting = Sets.newHashSet(s);
List<SSTableReader> sstables;
@@ -236,11 +238,15 @@ public class SSTableRewriterTest extends
SSTableWriterTestBase
LifecycleTransaction.waitForDeletions();
- long sum = 0;
- for (SSTableReader x : cfs.getLiveSSTables())
- sum += x.bytesOnDisk();
+ long sum =
cfs.getLiveSSTables().stream().mapToLong(SSTableReader::bytesOnDisk).sum();
assertEquals(sum, cfs.metric.liveDiskSpaceUsed.getCount());
- assertEquals(startStorageMetricsLoad - sBytesOnDisk + sum,
StorageMetrics.load.getCount());
+ long endLoad = StorageMetrics.load.getCount();
+ assertEquals(startStorageMetricsLoad - sBytesOnDisk + sum, endLoad);
+
+ long uncompressedSum = cfs.getLiveSSTables().stream().mapToLong(t ->
t.logicalBytesOnDisk()).sum();
+ long endUncompressedLoad = StorageMetrics.uncompressedLoad.getCount();
+ assertEquals(startUncompressedLoad - sBytesOnDiskUncompressed +
uncompressedSum, endUncompressedLoad);
+
assertEquals(files, sstables.size());
assertEquals(files, cfs.getLiveSSTables().size());
LifecycleTransaction.waitForDeletions();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]