This is an automated email from the ASF dual-hosted git repository.

smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 26ff589f3d Expose current compaction throughput in nodetool
26ff589f3d is described below

commit 26ff589f3da0a66c10c5ca16451a1c49fbb57ade
Author: maoling <maol...@apache.org>
AuthorDate: Wed Jun 12 23:14:00 2024 +0800

    Expose current compaction throughput in nodetool
    
    patch by Ling Mao; reviewed by Jon Haddad, Stefan Miklosovic for 
CASSANDRA-13890
    
    Co-authored-by: Jon Haddad <j...@jonhaddad.com>
---
 CHANGES.txt                                             |  1 +
 .../cassandra/db/compaction/CompactionManager.java      | 11 ++++++++---
 .../apache/cassandra/db/compaction/CompactionTask.java  |  2 +-
 .../org/apache/cassandra/metrics/CompactionMetrics.java |  3 +++
 .../org/apache/cassandra/service/StorageService.java    | 17 +++++++++++++++++
 .../apache/cassandra/service/StorageServiceMBean.java   |  1 +
 src/java/org/apache/cassandra/tools/NodeProbe.java      |  5 +++++
 .../cassandra/tools/nodetool/CompactionStats.java       |  4 ++++
 .../tools/nodetool/GetCompactionThroughput.java         |  9 ++++++++-
 .../cassandra/tools/nodetool/CompactionStatsTest.java   |  3 +++
 .../tools/nodetool/SetGetCompactionThroughputTest.java  | 15 +++++++++++++--
 11 files changed, 64 insertions(+), 7 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index e56329be91..87987f8046 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Expose current compaction throughput in nodetool (CASSANDRA-13890)
  * CEP-24 Password validation / generation (CASSANDRA-17457)
  * Reconfigure CMS after replacement, bootstrap and move operations 
(CASSANDRA-19705)
  * Support querying LocalStrategy tables with any partitioner (CASSANDRA-19692)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 09dbd872fc..ee20b28c50 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -57,7 +57,7 @@ import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import com.codahale.metrics.Meter;
 import net.openhft.chronicle.core.util.ThrowingSupplier;
 import org.apache.cassandra.cache.AutoSavingCache;
 import org.apache.cassandra.concurrent.ExecutorFactory;
@@ -120,7 +120,6 @@ import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 import org.apache.cassandra.utils.concurrent.Refs;
 
-
 import static java.util.Collections.singleton;
 import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 import static org.apache.cassandra.concurrent.FutureTask.callable;
@@ -224,6 +223,11 @@ public class CompactionManager implements 
CompactionManagerMBean, ICompactionMan
             compactionRateLimiter.setRate(throughput);
     }
 
+    public Meter getCompactionThroughput()
+    {
+        return metrics.bytesCompactedThroughput;
+    }
+
     /**
      * Call this whenever a compaction might be needed on the given 
columnfamily.
      * It's okay to over-call (within reason) if a call is unnecessary, it will
@@ -1510,9 +1514,10 @@ public class CompactionManager implements 
CompactionManagerMBean, ICompactionMan
 
     }
 
-    static void compactionRateLimiterAcquire(RateLimiter limiter, long 
bytesScanned, long lastBytesScanned, double compressionRatio)
+    protected void compactionRateLimiterAcquire(RateLimiter limiter, long 
bytesScanned, long lastBytesScanned, double compressionRatio)
     {
         long lengthRead = (long) ((bytesScanned - lastBytesScanned) * 
compressionRatio) + 1;
+        metrics.bytesCompactedThroughput.mark(lengthRead);
         while (lengthRead >= Integer.MAX_VALUE)
         {
             limiter.acquire(Integer.MAX_VALUE);
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 4ca0e0f53f..79368f68d3 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -212,7 +212,7 @@ public class CompactionTask extends AbstractCompactionTask
                         long bytesScanned = scanners.getTotalBytesScanned();
 
                         // Rate limit the scanners, and account for compression
-                        
CompactionManager.compactionRateLimiterAcquire(limiter, bytesScanned, 
lastBytesScanned, compressionRatio);
+                        
CompactionManager.instance.compactionRateLimiterAcquire(limiter, bytesScanned, 
lastBytesScanned, compressionRatio);
 
                         lastBytesScanned = bytesScanned;
 
diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java 
b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
index 8bd48e520e..3bdd14a0bb 100644
--- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
@@ -53,6 +53,8 @@ public class CompactionMetrics
     public final Meter totalCompactionsCompleted;
     /** Total number of bytes compacted since server [re]start */
     public final Counter bytesCompacted;
+    /** Recent/current throughput of compactions take */
+    public final Meter bytesCompactedThroughput;
     /** Time spent redistributing index summaries */
     public final Timer indexSummaryRedistributionTime;
 
@@ -147,6 +149,7 @@ public class CompactionMetrics
         });
         totalCompactionsCompleted = 
Metrics.meter(factory.createMetricName("TotalCompactionsCompleted"));
         bytesCompacted = 
Metrics.counter(factory.createMetricName("BytesCompacted"));
+        bytesCompactedThroughput = 
Metrics.meter(factory.createMetricName("BytesCompactedThroughput"));
 
         // compaction failure metrics
         compactionsReduced = 
Metrics.counter(factory.createMetricName("CompactionsReduced"));
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index d4767bc951..a652e39fc1 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -78,6 +78,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.codahale.metrics.Meter;
 import org.apache.cassandra.audit.AuditLogManager;
 import org.apache.cassandra.audit.AuditLogOptions;
 import org.apache.cassandra.auth.AuthCacheService;
@@ -245,6 +246,7 @@ import static 
org.apache.cassandra.config.CassandraRelevantProperties.REPLACE_AD
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.TEST_WRITE_SURVEY;
 import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName;
 import static 
org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily;
+import static org.apache.cassandra.io.util.FileUtils.ONE_MIB;
 import static 
org.apache.cassandra.schema.SchemaConstants.isLocalSystemKeyspace;
 import static 
org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
 import static 
org.apache.cassandra.service.ActiveRepairService.repairCommandExecutor;
@@ -1415,6 +1417,21 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                     value, oldValue);
     }
 
+    /**
+     * Get the Current Compaction Throughput
+     * key is 1/5/15minute time dimension for statistics
+     * value is the metric double string (unit is:mib/s)
+     */
+    public Map<String, String> getCurrentCompactionThroughputMebibytesPerSec()
+    {
+        HashMap<String, String> result = new LinkedHashMap<>();
+        Meter rate = CompactionManager.instance.getCompactionThroughput();
+        result.put("1minute", String.format("%.3f", rate.getOneMinuteRate() / 
ONE_MIB));
+        result.put("5minute", String.format("%.3f", rate.getFiveMinuteRate() / 
ONE_MIB));
+        result.put("15minute", String.format("%.3f", 
rate.getFifteenMinuteRate() / ONE_MIB));
+        return result;
+    }
+
     public int getBatchlogReplayThrottleInKB()
     {
         return DatabaseDescriptor.getBatchlogReplayThrottleInKiB();
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 6fb34b2f20..da4206416f 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -807,6 +807,7 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     @Deprecated(since = "4.1")
     public int getCompactionThroughputMbPerSec();
     public void setCompactionThroughputMbPerSec(int value);
+    Map<String, String> getCurrentCompactionThroughputMebibytesPerSec();
 
     public int getBatchlogReplayThrottleInKB();
     public void setBatchlogReplayThrottleInKB(int value);
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index b121cb3100..047448c208 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1426,6 +1426,11 @@ public class NodeProbe implements AutoCloseable
         return ssProxy.getCompactionThroughtputBytesPerSec();
     }
 
+    public Map<String, String> getCurrentCompactionThroughputMiBPerSec()
+    {
+        return ssProxy.getCurrentCompactionThroughputMebibytesPerSec();
+    }
+
     public void setBatchlogReplayThrottle(int value)
     {
         ssProxy.setBatchlogReplayThrottleInKB(value);
diff --git a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java 
b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
index c80de91d97..f76d4d0191 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
@@ -111,6 +111,10 @@ public class CompactionStats extends NodeToolCmd
 
         double configured = 
probe.getStorageService().getCompactionThroughtputMibPerSecAsDouble();
         tableBuilder.add("compaction throughput (MiB/s)", configured == 0 ? 
"throttling disabled (0)" : Double.toString(configured));
+        Map<String, String> currentCompactionThroughputMetricsMap = 
probe.getCurrentCompactionThroughputMiBPerSec();
+        tableBuilder.add("current compaction throughput (1 minute)", 
currentCompactionThroughputMetricsMap.get("1minute") + " MiB/s");
+        tableBuilder.add("current compaction throughput (5 minute)", 
currentCompactionThroughputMetricsMap.get("5minute") + " MiB/s");
+        tableBuilder.add("current compaction throughput (15 minute)", 
currentCompactionThroughputMetricsMap.get("15minute") + " MiB/s");
     }
 
     public static void reportCompactionTable(List<Map<String,String>> 
compactions, long compactionThroughputInBytes, boolean humanReadable, 
PrintStream out, TableBuilder table)
diff --git 
a/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java 
b/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java
index e71fe0adef..8296511c0a 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.tools.nodetool;
 
+import java.util.Map;
+
 import com.google.common.math.DoubleMath;
 
 import io.airlift.airline.Command;
@@ -44,7 +46,12 @@ public class GetCompactionThroughput extends NodeToolCmd
             if (!DoubleMath.isMathematicalInteger(throughput))
                 throw new RuntimeException("Use the -d flag to quiet this 
error and get the exact throughput in MiB/s");
 
-            probe.output().out.println("Current compaction throughput: " + 
probe.getCompactionThroughput() + " MB/s");
+            probe.output().out.println("Current compaction throughput: " + 
probe.getCompactionThroughput() + " MiB/s");
         }
+
+        Map<String, String> currentCompactionThroughputMetricsMap = 
probe.getCurrentCompactionThroughputMiBPerSec();
+        probe.output().out.println("Current compaction throughput (1 minute): 
" + currentCompactionThroughputMetricsMap.get("1minute") + " MiB/s");
+        probe.output().out.println("Current compaction throughput (5 minute): 
" + currentCompactionThroughputMetricsMap.get("5minute") + " MiB/s");
+        probe.output().out.println("Current compaction throughput (15 minute): 
" + currentCompactionThroughputMetricsMap.get("15minute") + " MiB/s");
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java 
b/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java
index 8758c3f8fd..65cc638531 100644
--- a/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java
+++ b/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java
@@ -140,6 +140,9 @@ public class CompactionStatsTest extends CQLTester
         assertThat(stdout).containsPattern("15 minute 
rate\\s+[0-9]*.[0-9]*[0-9]*/minute");
         assertThat(stdout).containsPattern("mean 
rate\\s+[0-9]*.[0-9]*[0-9]*/hour");
         assertThat(stdout).containsPattern("compaction throughput 
\\(MiB/s\\)\\s+throttling disabled \\(0\\)");
+        assertThat(stdout).containsPattern("current compaction throughput \\(1 
minute\\)\\s+[0-9]*.[0-9]*[0-9]* MiB/s");
+        assertThat(stdout).containsPattern("current compaction throughput \\(5 
minute\\)\\s+[0-9]*.[0-9]*[0-9]* MiB/s");
+        assertThat(stdout).containsPattern("current compaction throughput 
\\(15 minute\\)\\s+[0-9]*.[0-9]*[0-9]* MiB/s");
 
         CompactionManager.instance.active.finishCompaction(compactionHolder);
         waitForNumberOfPendingTasks(0, "compactionstats");
diff --git 
a/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java
 
b/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java
index 24ee9e5797..2771881dc2 100644
--- 
a/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java
+++ 
b/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java
@@ -81,6 +81,17 @@ public class SetGetCompactionThroughputTest extends CQLTester
         assertPreciseMibFlagNeeded();
     }
 
+    @Test
+    public void testCurrentCompactionThroughput()
+    {
+        ToolResult tool = invokeNodetool("getcompactionthroughput");
+        tool.assertOnCleanExit();
+
+        assertThat(tool.getStdout()).containsPattern("Current compaction 
throughput \\(1 minute\\): \\d+\\.\\d+ MiB/s");
+        assertThat(tool.getStdout()).containsPattern("Current compaction 
throughput \\(5 minute\\): \\d+\\.\\d+ MiB/s");
+        assertThat(tool.getStdout()).containsPattern("Current compaction 
throughput \\(15 minute\\): \\d+\\.\\d+ MiB/s");
+    }
+
     private static void assertSetGetValidThroughput(int throughput)
     {
         ToolResult tool = invokeNodetool("setcompactionthroughput", 
String.valueOf(throughput));
@@ -129,9 +140,9 @@ public class SetGetCompactionThroughputTest extends 
CQLTester
         tool.assertOnCleanExit();
 
         if (expected > 0)
-            assertThat(tool.getStdout()).contains("Current compaction 
throughput: " + expected + " MB/s");
+            assertThat(tool.getStdout()).contains("Current compaction 
throughput: " + expected + " MiB/s");
         else
-            assertThat(tool.getStdout()).contains("Current compaction 
throughput: 0 MB/s");
+            assertThat(tool.getStdout()).contains("Current compaction 
throughput: 0 MiB/s");
     }
 
     private static void assertGetThroughputDouble(double expected)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to