fix compaction throttling bursty-ness patch by yukim and jbellis for CASSANDRA-4316
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2b0797b2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2b0797b2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2b0797b2 Branch: refs/heads/trunk Commit: 2b0797b24e2d4a433c0e17506a0d8bb812f8f2dd Parents: 927c4a4 Author: Jonathan Ellis <jbel...@apache.org> Authored: Tue Apr 30 14:09:25 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Tue Apr 30 15:36:57 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/AbstractCompactionStrategy.java | 4 +- .../db/compaction/CompactionController.java | 23 ++------- .../db/compaction/CompactionIterable.java | 12 ++--- .../cassandra/db/compaction/CompactionManager.java | 25 ++++++++- .../db/compaction/LeveledCompactionStrategy.java | 7 ++- .../db/compaction/ParallelCompactionIterable.java | 13 ++---- .../apache/cassandra/db/compaction/Scrubber.java | 8 +-- .../io/compress/CompressedRandomAccessReader.java | 2 +- .../io/compress/CompressedThrottledReader.java | 38 +++++++++++++++ .../io/sstable/SSTableBoundedScanner.java | 6 ++- .../apache/cassandra/io/sstable/SSTableReader.java | 25 +++++++--- .../cassandra/io/sstable/SSTableScanner.java | 7 ++- .../apache/cassandra/io/util/ThrottledReader.java | 35 +++++++++++++ .../org/apache/cassandra/tools/SSTableExport.java | 5 +- .../apache/cassandra/io/sstable/SSTableUtils.java | 4 +- 16 files changed, 151 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 93198f0..bfece4f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2.5 + * fix compaction throttling bursty-ness (CASSANDRA-4316) * reduce memory consumption of IndexSummary (CASSANDRA-5506) * remove per-row column name bloom filters (CASSANDRA-5492) * Include fatal errors in trace events (CASSANDRA-5447) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index a588216..636cb0d 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction; import java.util.*; +import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -150,9 +151,10 @@ public abstract class AbstractCompactionStrategy */ public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range) { + RateLimiter limiter = CompactionManager.instance.getRateLimiter(); ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(); for (SSTableReader sstable : sstables) - scanners.add(sstable.getDirectScanner(range)); + scanners.add(sstable.getDirectScanner(range, limiter)); return scanners; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index f3198ff..f91c7a5 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -22,6 +22,8 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +38,8 @@ import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.AlwaysPresentFilter; import org.apache.cassandra.utils.Throttle; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.Throttle; /** * Manage compaction options. @@ -50,20 +54,6 @@ public class CompactionController public final int gcBefore; public final int mergeShardBefore; - private final Throttle throttle = new Throttle("Cassandra_Throttle", new Throttle.ThroughputFunction() - { - /** @return Instantaneous throughput target in bytes per millisecond. */ - public int targetThroughput() - { - if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || StorageService.instance.isBootstrapMode()) - // throttling disabled - return 0; - // total throughput - int totalBytesPerMS = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000; - // per stream throughput (target bytes per MS) - return totalBytesPerMS / Math.max(1, CompactionManager.instance.getActiveCompactions()); - } - }); public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore) { @@ -176,11 +166,6 @@ public class CompactionController return getCompactedRow(Collections.singletonList(row)); } - public void mayThrottle(long currentBytes) - { - throttle.throttle(currentBytes); - } - public void close() { SSTableReader.releaseReferences(overlappingSSTables); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java index 32b4942..3614ed1 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java @@ -78,14 +78,10 @@ public class CompactionIterable extends AbstractCompactionIterable finally { rows.clear(); - if ((row++ % 1000) == 0) - { - long n = 0; - for (ICompactionScanner scanner : scanners) - n += scanner.getCurrentPosition(); - bytesRead = n; - controller.mayThrottle(bytesRead); - } + long n = 0; + for (ICompactionScanner scanner : scanners) + n += scanner.getCurrentPosition(); + bytesRead = n; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 1d273b6..96c3011 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -31,6 +31,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.ConcurrentHashMultiset; import com.google.common.collect.Multiset; import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,6 +119,26 @@ public class CompactionManager implements CompactionManagerMBean private final CompactionMetrics metrics = new CompactionMetrics(executor, validationExecutor); private final Multiset<ColumnFamilyStore> compactingCF = ConcurrentHashMultiset.create(); + private final RateLimiter compactionRateLimiter = RateLimiter.create(Double.MAX_VALUE); + + /** + * Gets compaction rate limiter. When compaction_throughput_mb_per_sec is 0 or node is bootstrapping, + * this returns rate limiter with the rate of Double.MAX_VALUE bytes per second. + * Rate unit is bytes per sec. + * + * @return RateLimiter with rate limit set + */ + public RateLimiter getRateLimiter() + { + double currentThroughput = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024; + // if throughput is set to 0, throttling is disabled + if (currentThroughput == 0 || StorageService.instance.isBootstrapMode()) + currentThroughput = Double.MAX_VALUE; + if (compactionRateLimiter.getRate() != currentThroughput) + compactionRateLimiter.setRate(currentThroughput); + return compactionRateLimiter; + } + /** * @return A lock, for which acquisition means no compactions can run. */ @@ -568,7 +589,7 @@ public class CompactionManager implements CompactionManagerMBean if (compactionFileLocation == null) throw new IOException("disk full"); - SSTableScanner scanner = sstable.getDirectScanner(); + SSTableScanner scanner = sstable.getDirectScanner(getRateLimiter()); long rowsRead = 0; List<IColumn> indexedColumnsInRow = null; @@ -628,8 +649,6 @@ public class CompactionManager implements CompactionManagerMBean } } } - if ((rowsRead++ % 1000) == 0) - controller.mayThrottle(scanner.getCurrentPosition()); } if (writer != null) newSstable = writer.closeAndOpenReader(sstable.maxDataAge); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 9a73299..f964297 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.collect.*; import com.google.common.primitives.Doubles; +import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -178,7 +179,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem { // L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each for (SSTableReader sstable : byLevel.get(level)) - scanners.add(sstable.getDirectScanner(range)); + scanners.add(sstable.getDirectScanner(range, CompactionManager.instance.getRateLimiter())); } else { @@ -208,7 +209,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem this.sstables = new ArrayList<SSTableReader>(sstables); Collections.sort(this.sstables, SSTable.sstableComparator); sstableIterator = this.sstables.iterator(); - currentScanner = sstableIterator.next().getDirectScanner(range); + currentScanner = sstableIterator.next().getDirectScanner(range, CompactionManager.instance.getRateLimiter()); long length = 0; for (SSTableReader sstable : sstables) @@ -233,7 +234,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem currentScanner = null; return endOfData(); } - currentScanner = sstableIterator.next().getDirectScanner(range); + currentScanner = sstableIterator.next().getDirectScanner(range, CompactionManager.instance.getRateLimiter()); } } catch (IOException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java index e91846d..0f9407f 100644 --- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java @@ -117,7 +117,6 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable private class Reducer extends MergeIterator.Reducer<RowContainer, CompactedRowContainer> { private final List<RowContainer> rows = new ArrayList<RowContainer>(); - private int row = 0; private final ThreadPoolExecutor executor = new DebuggableThreadPoolExecutor(FBUtilities.getAvailableProcessors(), Integer.MAX_VALUE, @@ -137,14 +136,10 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable ParallelCompactionIterable.this.updateCounterFor(rows.size()); CompactedRowContainer compacted = getCompactedRow(rows); rows.clear(); - if ((row++ % 1000) == 0) - { - long n = 0; - for (ICompactionScanner scanner : scanners) - n += scanner.getCurrentPosition(); - bytesRead = n; - controller.mayThrottle(bytesRead); - } + long n = 0; + for (ICompactionScanner scanner : scanners) + n += scanner.getCurrentPosition(); + bytesRead = n; return compacted; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index 0601857..cb529cb 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -45,8 +45,6 @@ public class Scrubber implements Closeable private final RandomAccessReader indexFile; private final ScrubInfo scrubInfo; - private long rowsRead; - private SSTableWriter writer; private SSTableReader newSstable; private SSTableReader newInOrderSstable; @@ -94,7 +92,9 @@ public class Scrubber implements Closeable // we'll also loop through the index at the same time, using the position from the index to recover if the // row header (key or data size) is corrupt. (This means our position in the index file will be one row // "ahead" of the data file.) - this.dataFile = sstable.openDataReader(true); + this.dataFile = isOffline + ? sstable.openDataReader(true) + : sstable.openDataReader(CompactionManager.instance.getRateLimiter()); this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true); this.scrubInfo = new ScrubInfo(dataFile, sstable); } @@ -249,8 +249,6 @@ public class Scrubber implements Closeable badRows++; } } - if ((rowsRead++ % 1000) == 0) - controller.mayThrottle(dataFile.getFilePointer()); } if (writer.getFilePointer() > 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java index f245851..9da1c97 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java @@ -70,7 +70,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader // raw checksum bytes private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]); - private CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache, PoolingSegmentedFile owner) throws FileNotFoundException + protected CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache, PoolingSegmentedFile owner) throws FileNotFoundException { super(new File(dataFilePath), metadata.chunkLength(), skipIOCache, owner); this.metadata = metadata; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java new file mode 100644 index 0000000..1b7b7a4 --- /dev/null +++ b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java @@ -0,0 +1,38 @@ +package org.apache.cassandra.io.compress; + +import java.io.File; +import java.io.FileNotFoundException; + +import com.google.common.util.concurrent.RateLimiter; + +import org.apache.cassandra.io.util.PoolingSegmentedFile; +import org.apache.cassandra.io.util.RandomAccessReader; + +public class CompressedThrottledReader extends CompressedRandomAccessReader +{ + private final RateLimiter limiter; + + public CompressedThrottledReader(String file, CompressionMetadata metadata, RateLimiter limiter) throws FileNotFoundException + { + super(file, metadata, true, null); + this.limiter = limiter; + } + + protected void reBuffer() + { + limiter.acquire(buffer.length); + super.reBuffer(); + } + + public static CompressedThrottledReader open(String file, CompressionMetadata metadata, RateLimiter limiter) + { + try + { + return new CompressedThrottledReader(file, metadata, limiter); + } + catch (FileNotFoundException e) + { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java index 56be212..a3c6bbb 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java @@ -20,6 +20,8 @@ package org.apache.cassandra.io.sstable; import java.util.Arrays; import java.util.Iterator; +import com.google.common.util.concurrent.RateLimiter; + import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.utils.Pair; @@ -32,9 +34,9 @@ public class SSTableBoundedScanner extends SSTableScanner private final Iterator<Pair<Long, Long>> rangeIterator; private Pair<Long, Long> currentRange; - SSTableBoundedScanner(SSTableReader sstable, boolean skipCache, Iterator<Pair<Long, Long>> rangeIterator) + SSTableBoundedScanner(SSTableReader sstable, boolean skipCache, Iterator<Pair<Long, Long>> rangeIterator, RateLimiter limiter) { - super(sstable, skipCache); + super(sstable, skipCache, limiter); this.rangeIterator = rangeIterator; assert rangeIterator.hasNext(); // use EmptyCompactionScanner otherwise currentRange = rangeIterator.next(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index e4a2fe1..6b71223 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.*; +import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +48,7 @@ import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.compress.CompressedRandomAccessReader; +import org.apache.cassandra.io.compress.CompressedThrottledReader; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.util.*; import org.apache.cassandra.service.CacheService; @@ -946,10 +949,10 @@ public class SSTableReader extends SSTable * Direct I/O SSTableScanner * @return A Scanner for seeking over the rows of the SSTable. */ - public SSTableScanner getDirectScanner() - { - return new SSTableScanner(this, true); - } + public SSTableScanner getDirectScanner(RateLimiter limiter) + { + return new SSTableScanner(this, true, limiter); + } /** * Direct I/O SSTableScanner over a defined range of tokens. @@ -957,14 +960,14 @@ public class SSTableReader extends SSTable * @param range the range of keys to cover * @return A Scanner for seeking over the rows of the SSTable. */ - public ICompactionScanner getDirectScanner(Range<Token> range) + public ICompactionScanner getDirectScanner(Range<Token> range, RateLimiter limiter) { if (range == null) - return getDirectScanner(); + return getDirectScanner(limiter); Iterator<Pair<Long, Long>> rangeIterator = getPositionsForRanges(Collections.singletonList(range)).iterator(); return rangeIterator.hasNext() - ? new SSTableBoundedScanner(this, true, rangeIterator) + ? new SSTableBoundedScanner(this, true, rangeIterator, limiter) : new EmptyCompactionScanner(getFilename()); } @@ -1117,6 +1120,14 @@ public class SSTableReader extends SSTable return sstableMetadata.ancestors; } + public RandomAccessReader openDataReader(RateLimiter limiter) + { + assert limiter != null; + return compression + ? CompressedThrottledReader.open(getFilename(), getCompressionMetadata(), limiter) + : ThrottledReader.open(new File(getFilename()), limiter); + } + public RandomAccessReader openDataReader(boolean skipIOCache) { return compression http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java index 22ac485..1df5842 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.Arrays; import java.util.Iterator; +import com.google.common.util.concurrent.RateLimiter; + import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.RowPosition; @@ -45,10 +47,11 @@ public class SSTableScanner implements ICompactionScanner /** * @param sstable SSTable to scan. + * @param limiter */ - SSTableScanner(SSTableReader sstable, boolean skipCache) + SSTableScanner(SSTableReader sstable, boolean skipCache, RateLimiter limiter) { - this.dfile = sstable.openDataReader(skipCache); + this.dfile = limiter == null ? sstable.openDataReader(skipCache) : sstable.openDataReader(limiter); this.ifile = sstable.openIndexReader(skipCache); this.sstable = sstable; this.filter = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/io/util/ThrottledReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ThrottledReader.java b/src/java/org/apache/cassandra/io/util/ThrottledReader.java new file mode 100644 index 0000000..d67550a --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/ThrottledReader.java @@ -0,0 +1,35 @@ +package org.apache.cassandra.io.util; + +import java.io.File; +import java.io.FileNotFoundException; + +import com.google.common.util.concurrent.RateLimiter; + +public class ThrottledReader extends RandomAccessReader +{ + private final RateLimiter limiter; + + protected ThrottledReader(File file, RateLimiter limiter) throws FileNotFoundException + { + super(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, true, null); + this.limiter = limiter; + } + + protected void reBuffer() + { + limiter.acquire(buffer.length); + super.reBuffer(); + } + + public static ThrottledReader open(File file, RateLimiter limiter) + { + try + { + return new ThrottledReader(file, limiter); + } + catch (FileNotFoundException e) + { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/tools/SSTableExport.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java index 51cdc72..90274d1 100644 --- a/src/java/org/apache/cassandra/tools/SSTableExport.java +++ b/src/java/org/apache/cassandra/tools/SSTableExport.java @@ -32,6 +32,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import com.google.common.util.concurrent.RateLimiter; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Option; @@ -349,7 +350,7 @@ public class SSTableExport public static void export(Descriptor desc, PrintStream outs, Collection<String> toExport, String[] excludes) throws IOException { SSTableReader reader = SSTableReader.open(desc); - SSTableScanner scanner = reader.getDirectScanner(); + SSTableScanner scanner = reader.getDirectScanner(null); IPartitioner<?> partitioner = reader.partitioner; @@ -406,7 +407,7 @@ public class SSTableExport SSTableIdentityIterator row; - SSTableScanner scanner = reader.getDirectScanner(); + SSTableScanner scanner = reader.getDirectScanner(null); outs.println("["); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java index 2b0a13a..0b8fd25 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java @@ -72,8 +72,8 @@ public class SSTableUtils public static void assertContentEquals(SSTableReader lhs, SSTableReader rhs) throws IOException { - SSTableScanner slhs = lhs.getDirectScanner(); - SSTableScanner srhs = rhs.getDirectScanner(); + SSTableScanner slhs = lhs.getDirectScanner(null); + SSTableScanner srhs = rhs.getDirectScanner(null); while (slhs.hasNext()) { OnDiskAtomIterator ilhs = slhs.next();