Author: slebresne Date: Mon Apr 11 08:45:07 2011 New Revision: 1090979 URL: http://svn.apache.org/viewvc?rev=1090979&view=rev Log: Compaction throttling patch by stuhood; reviewed by slebresne for CASSANDRA-2156
Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/conf/cassandra.yaml cassandra/trunk/src/java/org/apache/cassandra/config/Config.java cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1090979&r1=1090978&r2=1090979&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Mon Apr 11 08:45:07 2011 @@ -20,7 +20,7 @@ * push replication_factor into strategy_options (CASSANDRA-1263) * give snapshots the same name on each node (CASSANDRA-1791) * multithreaded compaction (CASSANDRA-2191) - + * compaction throttling (CASSANDRA-2156) 0.7.5 * Avoid seeking when sstable2json exports the entire file (CASSANDRA-2318) Modified: cassandra/trunk/conf/cassandra.yaml URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1090979&r1=1090978&r2=1090979&view=diff ============================================================================== --- cassandra/trunk/conf/cassandra.yaml (original) +++ cassandra/trunk/conf/cassandra.yaml Mon Apr 11 08:45:07 2011 @@ -250,9 +250,17 @@ column_index_size_in_kb: 64 in_memory_compaction_limit_in_mb: 64 # Enables multiple compactions to execute at once. This is highly recommended -# for preserving read performance in a mixed read/write workload. +# for preserving read performance in a mixed read/write workload as this +# avoids sstables from accumulating during long running compactions. compaction_multithreading: true +# Throttles compaction to the given total throughput across the entire +# system. The faster you insert data, the faster you need to compact in +# order to keep the sstable count down, but in general, setting this to +# 16 to 32 times the rate you are inserting data is more than sufficient. +# Setting this to 0 disables throttling. +compaction_throughput_mb_per_sec: 16 + # Track cached row keys during compaction, and re-cache their new # positions in the compacted sstable. Disable if you use really large # key caches. Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1090979&r1=1090978&r2=1090979&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Mon Apr 11 08:45:07 2011 @@ -83,6 +83,7 @@ public class Config public Integer column_index_size_in_kb = 64; public Integer in_memory_compaction_limit_in_mb = 256; public Boolean compaction_multithreading = true; + public Integer compaction_throughput_mb_per_sec = 16; public String[] data_file_directories; Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1090979&r1=1090978&r2=1090979&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Mon Apr 11 08:45:07 2011 @@ -344,6 +344,9 @@ public class DatabaseDescriptor if (conf.compaction_multithreading == null) conf.compaction_multithreading = true; + if (conf.compaction_throughput_mb_per_sec == null) + conf.compaction_throughput_mb_per_sec = 16; + /* data file and commit log directories. they get created later, when they're needed. */ if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null) { @@ -731,6 +734,16 @@ public class DatabaseDescriptor return conf.compaction_multithreading; } + public static int getCompactionThroughputMbPerSec() + { + return conf.compaction_throughput_mb_per_sec; + } + + public static void setCompactionThroughputMbPerSec(int value) + { + conf.compaction_throughput_mb_per_sec = value; + } + public static String[] getAllDataFileLocations() { return conf.data_file_directories; Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1090979&r1=1090978&r2=1090979&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Mon Apr 11 08:45:07 2011 @@ -1184,6 +1184,11 @@ public class CompactionManager implement } } + public int getActiveCompactions() + { + return executor.getActiveCount(); + } + private static class CompactionExecutor extends DebuggableThreadPoolExecutor { // a synchronized identity set of running tasks to their compaction info Modified: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1090979&r1=1090978&r2=1090979&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Mon Apr 11 08:45:07 2011 @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.CompactionManager; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableScanner; @@ -59,6 +60,14 @@ implements Closeable, CompactionInfo.Hol private long bytesRead; private long row; + // the bytes that had been compacted the last time we delayed to throttle, + // and the time in milliseconds when we last throttled + private long bytesAtLastDelay; + private long timeAtLastDelay; + + // current target bytes to compact per millisecond + private int targetBytesPerMS = -1; + public CompactionIterator(String type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException { this(type, getCollatingIterator(sstables), controller); @@ -140,6 +149,7 @@ implements Closeable, CompactionInfo.Hol { bytesRead += scanner.getFilePointer(); } + throttle(); } } } @@ -161,6 +171,42 @@ implements Closeable, CompactionInfo.Hol return new PrecompactedRow(controller, rows); } + private void throttle() + { + if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1) + // throttling disabled + return; + int totalBytesPerMS = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000; + + // bytes compacted and time passed since last delay + long bytesSinceLast = bytesRead - bytesAtLastDelay; + long msSinceLast = System.currentTimeMillis() - timeAtLastDelay; + + // determine the current target + int newTarget = totalBytesPerMS / + Math.max(1, CompactionManager.instance.getActiveCompactions()); + if (newTarget != targetBytesPerMS) + logger.info(String.format("%s now compacting at %d bytes/ms.", + this, + newTarget)); + targetBytesPerMS = newTarget; + + // the excess bytes that were compacted in this period + long excessBytes = bytesSinceLast - msSinceLast * targetBytesPerMS; + + // the time to delay to recap the deficit + long timeToDelay = excessBytes / Math.max(1, targetBytesPerMS); + if (timeToDelay > 0) + { + if (logger.isTraceEnabled()) + logger.trace(String.format("Compacted %d bytes in %d ms: throttling for %d ms", + bytesSinceLast, msSinceLast, timeToDelay)); + try { Thread.sleep(timeToDelay); } catch (InterruptedException e) { throw new AssertionError(e); } + } + bytesAtLastDelay = bytesRead; + timeAtLastDelay = System.currentTimeMillis(); + } + public void close() throws IOException { FileUtils.close(getScanners()); Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1090979&r1=1090978&r2=1090979&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Apr 11 08:45:07 2011 @@ -506,6 +506,10 @@ public class StorageService implements I return joined; } + public void setCompactionThroughputMbPerSec(int value) { + DatabaseDescriptor.setCompactionThroughputMbPerSec(value); + } + private void setMode(String m, boolean log) { operationMode = m; Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1090979&r1=1090978&r2=1090979&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Mon Apr 11 08:45:07 2011 @@ -295,4 +295,6 @@ public interface StorageServiceMBean // allows a node that have been started without joining the ring to join it public void joinRing() throws IOException, org.apache.cassandra.config.ConfigurationException; public boolean isJoined(); + + public void setCompactionThroughputMbPerSec(int value); } Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1090979&r1=1090978&r2=1090979&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Mon Apr 11 08:45:07 2011 @@ -79,7 +79,7 @@ public class NodeCmd DECOMMISSION, MOVE, LOADBALANCE, REMOVETOKEN, REPAIR, CLEANUP, COMPACT, SCRUB, SETCACHECAPACITY, GETCOMPACTIONTHRESHOLD, SETCOMPACTIONTHRESHOLD, NETSTATS, CFHISTOGRAMS, COMPACTIONSTATS, DISABLEGOSSIP, ENABLEGOSSIP, INVALIDATEKEYCACHE, INVALIDATEROWCACHE, - DISABLETHRIFT, ENABLETHRIFT, JOIN + DISABLETHRIFT, ENABLETHRIFT, JOIN, SETCOMPACTIONTHROUGHPUT } @@ -111,6 +111,7 @@ public class NodeCmd addCmdHelp(header, "netstats [host]", "Print network information on provided host (connecting node by default)"); addCmdHelp(header, "move <new token>", "Move node on the token ring to a new token"); addCmdHelp(header, "removetoken status|force|<token>", "Show status of current token removal, force completion of pending removal or remove providen token"); + addCmdHelp(header, "setcompactionthroughput <value_in_mb>", "Set the MB/s throughput cap for compaction in the system, or 0 to disable throttling."); // Two args addCmdHelp(header, "snapshot [keyspaces...] -t [snapshotName]", "Take a snapshot of the specified keyspaces using optional name snapshotName"); @@ -592,6 +593,11 @@ public class NodeCmd probe.joinRing(); break; + case SETCOMPACTIONTHROUGHPUT : + if (arguments.length != 2) { badUse("Missing value argument."); } + probe.setCompactionThroughput(Integer.valueOf(arguments[1])); + break; + case REMOVETOKEN : if (arguments.length != 1) { badUse("Missing an argument for removetoken (either status, force, or a token)"); } else if (arguments[0].equals("status")) { nodeCmd.printRemovalStatus(System.out); } Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1090979&r1=1090978&r2=1090979&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Mon Apr 11 08:45:07 2011 @@ -550,6 +550,11 @@ public class NodeProbe { return ssProxy.isInitialized(); } + + public void setCompactionThroughput(int value) + { + ssProxy.setCompactionThroughputMbPerSec(value); + } } class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>