fix compaction throttling bursty-ness
patch by yukim and jbellis for CASSANDRA-4316


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2b0797b2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2b0797b2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2b0797b2

Branch: refs/heads/trunk
Commit: 2b0797b24e2d4a433c0e17506a0d8bb812f8f2dd
Parents: 927c4a4
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Tue Apr 30 14:09:25 2013 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Tue Apr 30 15:36:57 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../db/compaction/AbstractCompactionStrategy.java  |    4 +-
 .../db/compaction/CompactionController.java        |   23 ++-------
 .../db/compaction/CompactionIterable.java          |   12 ++---
 .../cassandra/db/compaction/CompactionManager.java |   25 ++++++++-
 .../db/compaction/LeveledCompactionStrategy.java   |    7 ++-
 .../db/compaction/ParallelCompactionIterable.java  |   13 ++----
 .../apache/cassandra/db/compaction/Scrubber.java   |    8 +--
 .../io/compress/CompressedRandomAccessReader.java  |    2 +-
 .../io/compress/CompressedThrottledReader.java     |   38 +++++++++++++++
 .../io/sstable/SSTableBoundedScanner.java          |    6 ++-
 .../apache/cassandra/io/sstable/SSTableReader.java |   25 +++++++---
 .../cassandra/io/sstable/SSTableScanner.java       |    7 ++-
 .../apache/cassandra/io/util/ThrottledReader.java  |   35 +++++++++++++
 .../org/apache/cassandra/tools/SSTableExport.java  |    5 +-
 .../apache/cassandra/io/sstable/SSTableUtils.java  |    4 +-
 16 files changed, 151 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 93198f0..bfece4f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.5
+ * fix compaction throttling bursty-ness (CASSANDRA-4316)
  * reduce memory consumption of IndexSummary (CASSANDRA-5506)
  * remove per-row column name bloom filters (CASSANDRA-5492)
  * Include fatal errors in trace events (CASSANDRA-5447)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index a588216..636cb0d 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.*;
 
+import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -150,9 +151,10 @@ public abstract class AbstractCompactionStrategy
      */
     public List<ICompactionScanner> getScanners(Collection<SSTableReader> 
sstables, Range<Token> range)
     {
+        RateLimiter limiter = CompactionManager.instance.getRateLimiter();
         ArrayList<ICompactionScanner> scanners = new 
ArrayList<ICompactionScanner>();
         for (SSTableReader sstable : sstables)
-            scanners.add(sstable.getDirectScanner(range));
+            scanners.add(sstable.getDirectScanner(range, limiter));
         return scanners;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionController.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index f3198ff..f91c7a5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -22,6 +22,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,6 +38,8 @@ import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
 import org.apache.cassandra.utils.Throttle;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Throttle;
 
 /**
  * Manage compaction options.
@@ -50,20 +54,6 @@ public class CompactionController
 
     public final int gcBefore;
     public final int mergeShardBefore;
-    private final Throttle throttle = new Throttle("Cassandra_Throttle", new 
Throttle.ThroughputFunction()
-    {
-        /** @return Instantaneous throughput target in bytes per millisecond. 
*/
-        public int targetThroughput()
-        {
-            if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || 
StorageService.instance.isBootstrapMode())
-                // throttling disabled
-                return 0;
-            // total throughput
-            int totalBytesPerMS = 
DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000;
-            // per stream throughput (target bytes per MS)
-            return totalBytesPerMS / Math.max(1, 
CompactionManager.instance.getActiveCompactions());
-        }
-    });
 
     public CompactionController(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, int gcBefore)
     {
@@ -176,11 +166,6 @@ public class CompactionController
         return getCompactedRow(Collections.singletonList(row));
     }
 
-    public void mayThrottle(long currentBytes)
-    {
-        throttle.throttle(currentBytes);
-    }
-
     public void close()
     {
         SSTableReader.releaseReferences(overlappingSSTables);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index 32b4942..3614ed1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -78,14 +78,10 @@ public class CompactionIterable extends 
AbstractCompactionIterable
             finally
             {
                 rows.clear();
-                if ((row++ % 1000) == 0)
-                {
-                    long n = 0;
-                    for (ICompactionScanner scanner : scanners)
-                        n += scanner.getCurrentPosition();
-                    bytesRead = n;
-                    controller.mayThrottle(bytesRead);
-                }
+                long n = 0;
+                for (ICompactionScanner scanner : scanners)
+                    n += scanner.getCurrentPosition();
+                bytesRead = n;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 1d273b6..96c3011 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -31,6 +31,7 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.ConcurrentHashMultiset;
 import com.google.common.collect.Multiset;
 import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -118,6 +119,26 @@ public class CompactionManager implements 
CompactionManagerMBean
     private final CompactionMetrics metrics = new CompactionMetrics(executor, 
validationExecutor);
     private final Multiset<ColumnFamilyStore> compactingCF = 
ConcurrentHashMultiset.create();
 
+    private final RateLimiter compactionRateLimiter = 
RateLimiter.create(Double.MAX_VALUE);
+
+    /**
+     * Gets compaction rate limiter. When compaction_throughput_mb_per_sec is 
0 or node is bootstrapping,
+     * this returns rate limiter with the rate of Double.MAX_VALUE bytes per 
second.
+     * Rate unit is bytes per sec.
+     *
+     * @return RateLimiter with rate limit set
+     */
+    public RateLimiter getRateLimiter()
+    {
+        double currentThroughput = 
DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024;
+        // if throughput is set to 0, throttling is disabled
+        if (currentThroughput == 0 || 
StorageService.instance.isBootstrapMode())
+            currentThroughput = Double.MAX_VALUE;
+        if (compactionRateLimiter.getRate() != currentThroughput)
+            compactionRateLimiter.setRate(currentThroughput);
+        return compactionRateLimiter;
+    }
+
     /**
      * @return A lock, for which acquisition means no compactions can run.
      */
@@ -568,7 +589,7 @@ public class CompactionManager implements 
CompactionManagerMBean
             if (compactionFileLocation == null)
                 throw new IOException("disk full");
 
-            SSTableScanner scanner = sstable.getDirectScanner();
+            SSTableScanner scanner = 
sstable.getDirectScanner(getRateLimiter());
             long rowsRead = 0;
             List<IColumn> indexedColumnsInRow = null;
 
@@ -628,8 +649,6 @@ public class CompactionManager implements 
CompactionManagerMBean
                             }
                         }
                     }
-                    if ((rowsRead++ % 1000) == 0)
-                        controller.mayThrottle(scanner.getCurrentPosition());
                 }
                 if (writer != null)
                     newSstable = writer.closeAndOpenReader(sstable.maxDataAge);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 9a73299..f964297 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.collect.*;
 import com.google.common.primitives.Doubles;
+import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -178,7 +179,7 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy implem
             {
                 // L0 makes no guarantees about overlapping-ness.  Just create 
a direct scanner for each
                 for (SSTableReader sstable : byLevel.get(level))
-                    scanners.add(sstable.getDirectScanner(range));
+                    scanners.add(sstable.getDirectScanner(range, 
CompactionManager.instance.getRateLimiter()));
             }
             else
             {
@@ -208,7 +209,7 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy implem
             this.sstables = new ArrayList<SSTableReader>(sstables);
             Collections.sort(this.sstables, SSTable.sstableComparator);
             sstableIterator = this.sstables.iterator();
-            currentScanner = sstableIterator.next().getDirectScanner(range);
+            currentScanner = sstableIterator.next().getDirectScanner(range, 
CompactionManager.instance.getRateLimiter());
 
             long length = 0;
             for (SSTableReader sstable : sstables)
@@ -233,7 +234,7 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy implem
                         currentScanner = null;
                         return endOfData();
                     }
-                    currentScanner = 
sstableIterator.next().getDirectScanner(range);
+                    currentScanner = 
sstableIterator.next().getDirectScanner(range, 
CompactionManager.instance.getRateLimiter());
                 }
             }
             catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java 
b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index e91846d..0f9407f 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -117,7 +117,6 @@ public class ParallelCompactionIterable extends 
AbstractCompactionIterable
     private class Reducer extends MergeIterator.Reducer<RowContainer, 
CompactedRowContainer>
     {
         private final List<RowContainer> rows = new ArrayList<RowContainer>();
-        private int row = 0;
 
         private final ThreadPoolExecutor executor = new 
DebuggableThreadPoolExecutor(FBUtilities.getAvailableProcessors(),
                                                                                
      Integer.MAX_VALUE,
@@ -137,14 +136,10 @@ public class ParallelCompactionIterable extends 
AbstractCompactionIterable
             ParallelCompactionIterable.this.updateCounterFor(rows.size());
             CompactedRowContainer compacted = getCompactedRow(rows);
             rows.clear();
-            if ((row++ % 1000) == 0)
-            {
-                long n = 0;
-                for (ICompactionScanner scanner : scanners)
-                    n += scanner.getCurrentPosition();
-                bytesRead = n;
-                controller.mayThrottle(bytesRead);
-            }
+            long n = 0;
+            for (ICompactionScanner scanner : scanners)
+                n += scanner.getCurrentPosition();
+            bytesRead = n;
             return compacted;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java 
b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 0601857..cb529cb 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -45,8 +45,6 @@ public class Scrubber implements Closeable
     private final RandomAccessReader indexFile;
     private final ScrubInfo scrubInfo;
 
-    private long rowsRead;
-
     private SSTableWriter writer;
     private SSTableReader newSstable;
     private SSTableReader newInOrderSstable;
@@ -94,7 +92,9 @@ public class Scrubber implements Closeable
         // we'll also loop through the index at the same time, using the 
position from the index to recover if the
         // row header (key or data size) is corrupt. (This means our position 
in the index file will be one row
         // "ahead" of the data file.)
-        this.dataFile = sstable.openDataReader(true);
+        this.dataFile = isOffline
+                        ? sstable.openDataReader(true)
+                        : 
sstable.openDataReader(CompactionManager.instance.getRateLimiter());
         this.indexFile = RandomAccessReader.open(new 
File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
         this.scrubInfo = new ScrubInfo(dataFile, sstable);
     }
@@ -249,8 +249,6 @@ public class Scrubber implements Closeable
                         badRows++;
                     }
                 }
-                if ((rowsRead++ % 1000) == 0)
-                    controller.mayThrottle(dataFile.getFilePointer());
             }
 
             if (writer.getFilePointer() > 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java 
b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index f245851..9da1c97 100644
--- 
a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ 
b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -70,7 +70,7 @@ public class CompressedRandomAccessReader extends 
RandomAccessReader
     // raw checksum bytes
     private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]);
 
-    private CompressedRandomAccessReader(String dataFilePath, 
CompressionMetadata metadata, boolean skipIOCache, PoolingSegmentedFile owner) 
throws FileNotFoundException
+    protected CompressedRandomAccessReader(String dataFilePath, 
CompressionMetadata metadata, boolean skipIOCache, PoolingSegmentedFile owner) 
throws FileNotFoundException
     {
         super(new File(dataFilePath), metadata.chunkLength(), skipIOCache, 
owner);
         this.metadata = metadata;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java 
b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
new file mode 100644
index 0000000..1b7b7a4
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
@@ -0,0 +1,38 @@
+package org.apache.cassandra.io.compress;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.io.util.PoolingSegmentedFile;
+import org.apache.cassandra.io.util.RandomAccessReader;
+
+public class CompressedThrottledReader extends CompressedRandomAccessReader
+{
+    private final RateLimiter limiter;
+
+    public CompressedThrottledReader(String file, CompressionMetadata 
metadata, RateLimiter limiter) throws FileNotFoundException
+    {
+        super(file, metadata, true, null);
+        this.limiter = limiter;
+    }
+
+    protected void reBuffer()
+    {
+        limiter.acquire(buffer.length);
+        super.reBuffer();
+    }
+
+    public static CompressedThrottledReader open(String file, 
CompressionMetadata metadata, RateLimiter limiter)
+    {
+        try
+        {
+            return new CompressedThrottledReader(file, metadata, limiter);
+        }
+        catch (FileNotFoundException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
index 56be212..a3c6bbb 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.io.sstable;
 import java.util.Arrays;
 import java.util.Iterator;
 
+import com.google.common.util.concurrent.RateLimiter;
+
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.utils.Pair;
@@ -32,9 +34,9 @@ public class SSTableBoundedScanner extends SSTableScanner
     private final Iterator<Pair<Long, Long>> rangeIterator;
     private Pair<Long, Long> currentRange;
 
-    SSTableBoundedScanner(SSTableReader sstable, boolean skipCache, 
Iterator<Pair<Long, Long>> rangeIterator)
+    SSTableBoundedScanner(SSTableReader sstable, boolean skipCache, 
Iterator<Pair<Long, Long>> rangeIterator, RateLimiter limiter)
     {
-        super(sstable, skipCache);
+        super(sstable, skipCache, limiter);
         this.rangeIterator = rangeIterator;
         assert rangeIterator.hasNext(); // use EmptyCompactionScanner otherwise
         currentRange = rangeIterator.next();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index e4a2fe1..6b71223 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.*;
 
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +48,7 @@ import org.apache.cassandra.dht.LocalPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
+import org.apache.cassandra.io.compress.CompressedThrottledReader;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.service.CacheService;
@@ -946,10 +949,10 @@ public class SSTableReader extends SSTable
     * Direct I/O SSTableScanner
     * @return A Scanner for seeking over the rows of the SSTable.
     */
-    public SSTableScanner getDirectScanner()
-    {
-        return new SSTableScanner(this, true);
-    }
+   public SSTableScanner getDirectScanner(RateLimiter limiter)
+   {
+       return new SSTableScanner(this, true, limiter);
+   }
 
    /**
     * Direct I/O SSTableScanner over a defined range of tokens.
@@ -957,14 +960,14 @@ public class SSTableReader extends SSTable
     * @param range the range of keys to cover
     * @return A Scanner for seeking over the rows of the SSTable.
     */
-    public ICompactionScanner getDirectScanner(Range<Token> range)
+    public ICompactionScanner getDirectScanner(Range<Token> range, RateLimiter 
limiter)
     {
         if (range == null)
-            return getDirectScanner();
+            return getDirectScanner(limiter);
 
         Iterator<Pair<Long, Long>> rangeIterator = 
getPositionsForRanges(Collections.singletonList(range)).iterator();
         return rangeIterator.hasNext()
-               ? new SSTableBoundedScanner(this, true, rangeIterator)
+               ? new SSTableBoundedScanner(this, true, rangeIterator, limiter)
                : new EmptyCompactionScanner(getFilename());
     }
 
@@ -1117,6 +1120,14 @@ public class SSTableReader extends SSTable
         return sstableMetadata.ancestors;
     }
 
+    public RandomAccessReader openDataReader(RateLimiter limiter)
+    {
+        assert limiter != null;
+        return compression
+               ? CompressedThrottledReader.open(getFilename(), 
getCompressionMetadata(), limiter)
+               : ThrottledReader.open(new File(getFilename()), limiter);
+    }
+
     public RandomAccessReader openDataReader(boolean skipIOCache)
     {
         return compression

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 22ac485..1df5842 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Iterator;
 
+import com.google.common.util.concurrent.RateLimiter;
+
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.RowPosition;
@@ -45,10 +47,11 @@ public class SSTableScanner implements ICompactionScanner
 
     /**
      * @param sstable SSTable to scan.
+     * @param limiter
      */
-    SSTableScanner(SSTableReader sstable, boolean skipCache)
+    SSTableScanner(SSTableReader sstable, boolean skipCache, RateLimiter 
limiter)
     {
-        this.dfile = sstable.openDataReader(skipCache);
+        this.dfile = limiter == null ? sstable.openDataReader(skipCache) : 
sstable.openDataReader(limiter);
         this.ifile = sstable.openIndexReader(skipCache);
         this.sstable = sstable;
         this.filter = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/io/util/ThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ThrottledReader.java 
b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
new file mode 100644
index 0000000..d67550a
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
@@ -0,0 +1,35 @@
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+public class ThrottledReader extends RandomAccessReader
+{
+    private final RateLimiter limiter;
+
+    protected ThrottledReader(File file, RateLimiter limiter) throws 
FileNotFoundException
+    {
+        super(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, true, null);
+        this.limiter = limiter;
+    }
+
+    protected void reBuffer()
+    {
+        limiter.acquire(buffer.length);
+        super.reBuffer();
+    }
+
+    public static ThrottledReader open(File file, RateLimiter limiter)
+    {
+        try
+        {
+            return new ThrottledReader(file, limiter);
+        }
+        catch (FileNotFoundException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java 
b/src/java/org/apache/cassandra/tools/SSTableExport.java
index 51cdc72..90274d1 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@ -32,6 +32,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.util.concurrent.RateLimiter;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Option;
@@ -349,7 +350,7 @@ public class SSTableExport
     public static void export(Descriptor desc, PrintStream outs, 
Collection<String> toExport, String[] excludes) throws IOException
     {
         SSTableReader reader = SSTableReader.open(desc);
-        SSTableScanner scanner = reader.getDirectScanner();
+        SSTableScanner scanner = reader.getDirectScanner(null);
 
         IPartitioner<?> partitioner = reader.partitioner;
 
@@ -406,7 +407,7 @@ public class SSTableExport
 
 
         SSTableIdentityIterator row;
-        SSTableScanner scanner = reader.getDirectScanner();
+        SSTableScanner scanner = reader.getDirectScanner(null);
 
         outs.println("[");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 2b0a13a..0b8fd25 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -72,8 +72,8 @@ public class SSTableUtils
 
     public static void assertContentEquals(SSTableReader lhs, SSTableReader 
rhs) throws IOException
     {
-        SSTableScanner slhs = lhs.getDirectScanner();
-        SSTableScanner srhs = rhs.getDirectScanner();
+        SSTableScanner slhs = lhs.getDirectScanner(null);
+        SSTableScanner srhs = rhs.getDirectScanner(null);
         while (slhs.hasNext())
         {
             OnDiskAtomIterator ilhs = slhs.next();

Reply via email to