This is an automated email from the ASF dual-hosted git repository. smiklosovic pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 26ff589f3d Expose current compaction throughput in nodetool 26ff589f3d is described below commit 26ff589f3da0a66c10c5ca16451a1c49fbb57ade Author: maoling <maol...@apache.org> AuthorDate: Wed Jun 12 23:14:00 2024 +0800 Expose current compaction throughput in nodetool patch by Ling Mao; reviewed by Jon Haddad, Stefan Miklosovic for CASSANDRA-13890 Co-authored-by: Jon Haddad <j...@jonhaddad.com> --- CHANGES.txt | 1 + .../cassandra/db/compaction/CompactionManager.java | 11 ++++++++--- .../apache/cassandra/db/compaction/CompactionTask.java | 2 +- .../org/apache/cassandra/metrics/CompactionMetrics.java | 3 +++ .../org/apache/cassandra/service/StorageService.java | 17 +++++++++++++++++ .../apache/cassandra/service/StorageServiceMBean.java | 1 + src/java/org/apache/cassandra/tools/NodeProbe.java | 5 +++++ .../cassandra/tools/nodetool/CompactionStats.java | 4 ++++ .../tools/nodetool/GetCompactionThroughput.java | 9 ++++++++- .../cassandra/tools/nodetool/CompactionStatsTest.java | 3 +++ .../tools/nodetool/SetGetCompactionThroughputTest.java | 15 +++++++++++++-- 11 files changed, 64 insertions(+), 7 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index e56329be91..87987f8046 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Expose current compaction throughput in nodetool (CASSANDRA-13890) * CEP-24 Password validation / generation (CASSANDRA-17457) * Reconfigure CMS after replacement, bootstrap and move operations (CASSANDRA-19705) * Support querying LocalStrategy tables with any partitioner (CASSANDRA-19692) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 09dbd872fc..ee20b28c50 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -57,7 +57,7 @@ import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import com.codahale.metrics.Meter; import net.openhft.chronicle.core.util.ThrowingSupplier; import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.concurrent.ExecutorFactory; @@ -120,7 +120,6 @@ import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.ImmediateFuture; import org.apache.cassandra.utils.concurrent.Refs; - import static java.util.Collections.singleton; import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; import static org.apache.cassandra.concurrent.FutureTask.callable; @@ -224,6 +223,11 @@ public class CompactionManager implements CompactionManagerMBean, ICompactionMan compactionRateLimiter.setRate(throughput); } + public Meter getCompactionThroughput() + { + return metrics.bytesCompactedThroughput; + } + /** * Call this whenever a compaction might be needed on the given columnfamily. * It's okay to over-call (within reason) if a call is unnecessary, it will @@ -1510,9 +1514,10 @@ public class CompactionManager implements CompactionManagerMBean, ICompactionMan } - static void compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio) + protected void compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio) { long lengthRead = (long) ((bytesScanned - lastBytesScanned) * compressionRatio) + 1; + metrics.bytesCompactedThroughput.mark(lengthRead); while (lengthRead >= Integer.MAX_VALUE) { limiter.acquire(Integer.MAX_VALUE); diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 4ca0e0f53f..79368f68d3 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -212,7 +212,7 @@ public class CompactionTask extends AbstractCompactionTask long bytesScanned = scanners.getTotalBytesScanned(); // Rate limit the scanners, and account for compression - CompactionManager.compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio); + CompactionManager.instance.compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio); lastBytesScanned = bytesScanned; diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java index 8bd48e520e..3bdd14a0bb 100644 --- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java @@ -53,6 +53,8 @@ public class CompactionMetrics public final Meter totalCompactionsCompleted; /** Total number of bytes compacted since server [re]start */ public final Counter bytesCompacted; + /** Recent/current throughput of compactions take */ + public final Meter bytesCompactedThroughput; /** Time spent redistributing index summaries */ public final Timer indexSummaryRedistributionTime; @@ -147,6 +149,7 @@ public class CompactionMetrics }); totalCompactionsCompleted = Metrics.meter(factory.createMetricName("TotalCompactionsCompleted")); bytesCompacted = Metrics.counter(factory.createMetricName("BytesCompacted")); + bytesCompactedThroughput = Metrics.meter(factory.createMetricName("BytesCompactedThroughput")); // compaction failure metrics compactionsReduced = Metrics.counter(factory.createMetricName("CompactionsReduced")); diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index d4767bc951..a652e39fc1 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -78,6 +78,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.codahale.metrics.Meter; import org.apache.cassandra.audit.AuditLogManager; import org.apache.cassandra.audit.AuditLogOptions; import org.apache.cassandra.auth.AuthCacheService; @@ -245,6 +246,7 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.REPLACE_AD import static org.apache.cassandra.config.CassandraRelevantProperties.TEST_WRITE_SURVEY; import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName; import static org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily; +import static org.apache.cassandra.io.util.FileUtils.ONE_MIB; import static org.apache.cassandra.schema.SchemaConstants.isLocalSystemKeyspace; import static org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus; import static org.apache.cassandra.service.ActiveRepairService.repairCommandExecutor; @@ -1415,6 +1417,21 @@ public class StorageService extends NotificationBroadcasterSupport implements IE value, oldValue); } + /** + * Get the Current Compaction Throughput + * key is 1/5/15minute time dimension for statistics + * value is the metric double string (unit is:mib/s) + */ + public Map<String, String> getCurrentCompactionThroughputMebibytesPerSec() + { + HashMap<String, String> result = new LinkedHashMap<>(); + Meter rate = CompactionManager.instance.getCompactionThroughput(); + result.put("1minute", String.format("%.3f", rate.getOneMinuteRate() / ONE_MIB)); + result.put("5minute", String.format("%.3f", rate.getFiveMinuteRate() / ONE_MIB)); + result.put("15minute", String.format("%.3f", rate.getFifteenMinuteRate() / ONE_MIB)); + return result; + } + public int getBatchlogReplayThrottleInKB() { return DatabaseDescriptor.getBatchlogReplayThrottleInKiB(); diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 6fb34b2f20..da4206416f 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -807,6 +807,7 @@ public interface StorageServiceMBean extends NotificationEmitter @Deprecated(since = "4.1") public int getCompactionThroughputMbPerSec(); public void setCompactionThroughputMbPerSec(int value); + Map<String, String> getCurrentCompactionThroughputMebibytesPerSec(); public int getBatchlogReplayThrottleInKB(); public void setBatchlogReplayThrottleInKB(int value); diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index b121cb3100..047448c208 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1426,6 +1426,11 @@ public class NodeProbe implements AutoCloseable return ssProxy.getCompactionThroughtputBytesPerSec(); } + public Map<String, String> getCurrentCompactionThroughputMiBPerSec() + { + return ssProxy.getCurrentCompactionThroughputMebibytesPerSec(); + } + public void setBatchlogReplayThrottle(int value) { ssProxy.setBatchlogReplayThrottleInKB(value); diff --git a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java index c80de91d97..f76d4d0191 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java +++ b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java @@ -111,6 +111,10 @@ public class CompactionStats extends NodeToolCmd double configured = probe.getStorageService().getCompactionThroughtputMibPerSecAsDouble(); tableBuilder.add("compaction throughput (MiB/s)", configured == 0 ? "throttling disabled (0)" : Double.toString(configured)); + Map<String, String> currentCompactionThroughputMetricsMap = probe.getCurrentCompactionThroughputMiBPerSec(); + tableBuilder.add("current compaction throughput (1 minute)", currentCompactionThroughputMetricsMap.get("1minute") + " MiB/s"); + tableBuilder.add("current compaction throughput (5 minute)", currentCompactionThroughputMetricsMap.get("5minute") + " MiB/s"); + tableBuilder.add("current compaction throughput (15 minute)", currentCompactionThroughputMetricsMap.get("15minute") + " MiB/s"); } public static void reportCompactionTable(List<Map<String,String>> compactions, long compactionThroughputInBytes, boolean humanReadable, PrintStream out, TableBuilder table) diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java index e71fe0adef..8296511c0a 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java +++ b/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.tools.nodetool; +import java.util.Map; + import com.google.common.math.DoubleMath; import io.airlift.airline.Command; @@ -44,7 +46,12 @@ public class GetCompactionThroughput extends NodeToolCmd if (!DoubleMath.isMathematicalInteger(throughput)) throw new RuntimeException("Use the -d flag to quiet this error and get the exact throughput in MiB/s"); - probe.output().out.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MB/s"); + probe.output().out.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MiB/s"); } + + Map<String, String> currentCompactionThroughputMetricsMap = probe.getCurrentCompactionThroughputMiBPerSec(); + probe.output().out.println("Current compaction throughput (1 minute): " + currentCompactionThroughputMetricsMap.get("1minute") + " MiB/s"); + probe.output().out.println("Current compaction throughput (5 minute): " + currentCompactionThroughputMetricsMap.get("5minute") + " MiB/s"); + probe.output().out.println("Current compaction throughput (15 minute): " + currentCompactionThroughputMetricsMap.get("15minute") + " MiB/s"); } } diff --git a/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java b/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java index 8758c3f8fd..65cc638531 100644 --- a/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java +++ b/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java @@ -140,6 +140,9 @@ public class CompactionStatsTest extends CQLTester assertThat(stdout).containsPattern("15 minute rate\\s+[0-9]*.[0-9]*[0-9]*/minute"); assertThat(stdout).containsPattern("mean rate\\s+[0-9]*.[0-9]*[0-9]*/hour"); assertThat(stdout).containsPattern("compaction throughput \\(MiB/s\\)\\s+throttling disabled \\(0\\)"); + assertThat(stdout).containsPattern("current compaction throughput \\(1 minute\\)\\s+[0-9]*.[0-9]*[0-9]* MiB/s"); + assertThat(stdout).containsPattern("current compaction throughput \\(5 minute\\)\\s+[0-9]*.[0-9]*[0-9]* MiB/s"); + assertThat(stdout).containsPattern("current compaction throughput \\(15 minute\\)\\s+[0-9]*.[0-9]*[0-9]* MiB/s"); CompactionManager.instance.active.finishCompaction(compactionHolder); waitForNumberOfPendingTasks(0, "compactionstats"); diff --git a/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java b/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java index 24ee9e5797..2771881dc2 100644 --- a/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java +++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java @@ -81,6 +81,17 @@ public class SetGetCompactionThroughputTest extends CQLTester assertPreciseMibFlagNeeded(); } + @Test + public void testCurrentCompactionThroughput() + { + ToolResult tool = invokeNodetool("getcompactionthroughput"); + tool.assertOnCleanExit(); + + assertThat(tool.getStdout()).containsPattern("Current compaction throughput \\(1 minute\\): \\d+\\.\\d+ MiB/s"); + assertThat(tool.getStdout()).containsPattern("Current compaction throughput \\(5 minute\\): \\d+\\.\\d+ MiB/s"); + assertThat(tool.getStdout()).containsPattern("Current compaction throughput \\(15 minute\\): \\d+\\.\\d+ MiB/s"); + } + private static void assertSetGetValidThroughput(int throughput) { ToolResult tool = invokeNodetool("setcompactionthroughput", String.valueOf(throughput)); @@ -129,9 +140,9 @@ public class SetGetCompactionThroughputTest extends CQLTester tool.assertOnCleanExit(); if (expected > 0) - assertThat(tool.getStdout()).contains("Current compaction throughput: " + expected + " MB/s"); + assertThat(tool.getStdout()).contains("Current compaction throughput: " + expected + " MiB/s"); else - assertThat(tool.getStdout()).contains("Current compaction throughput: 0 MB/s"); + assertThat(tool.getStdout()).contains("Current compaction throughput: 0 MiB/s"); } private static void assertGetThroughputDouble(double expected) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org