Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 0e96d3e52 -> 9b48a0bf4 refs/heads/trunk c83729f41 -> 0541597e7
Only open one sstable scanner per sstable Patch by marcuse; reviewed by Paulo Motta for CASSANDRA-11412 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9b48a0bf Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9b48a0bf Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9b48a0bf Branch: refs/heads/cassandra-3.0 Commit: 9b48a0bf430b995332e1a4dde20ba7482175ef99 Parents: 0e96d3e Author: Marcus Eriksson <marc...@apache.org> Authored: Thu Mar 31 16:32:11 2016 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Apr 20 06:28:55 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../compaction/AbstractCompactionStrategy.java | 11 ++++-- .../compaction/CompactionStrategyManager.java | 21 +++------- .../compaction/LeveledCompactionStrategy.java | 41 ++++++++++++-------- .../io/sstable/format/big/BigTableReader.java | 5 ++- 5 files changed, 43 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b48a0bf/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6586299..cc50a23 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.6 + * Only open one sstable scanner per sstable (CASSANDRA-11412) * Option to specify ProtocolVersion in cassandra-stress (CASSANDRA-11410) * ArithmeticException in avgFunctionForDecimal (CASSANDRA-11485) * LogAwareFileLister should only use OLD sstable files in current folder to determine disk consistency (CASSANDRA-11470) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b48a0bf/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index ae8839e..c205d5c 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -263,6 +263,11 @@ public abstract class AbstractCompactionStrategy }); } + + public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range) + { + return range == null ? getScanners(sstables, (Collection<Range<Token>>)null) : getScanners(sstables, Collections.singleton(range)); + } /** * Returns a list of KeyScanners given sstables and a range on which to scan. * The default implementation simply grab one SSTableScanner per-sstable, but overriding this method @@ -270,14 +275,14 @@ public abstract class AbstractCompactionStrategy * LeveledCompactionStrategy for instance). */ @SuppressWarnings("resource") - public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range) + public ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges) { RateLimiter limiter = CompactionManager.instance.getRateLimiter(); ArrayList<ISSTableScanner> scanners = new ArrayList<ISSTableScanner>(); try { for (SSTableReader sstable : sstables) - scanners.add(sstable.getScanner(range, limiter)); + scanners.add(sstable.getScanner(ranges, limiter)); } catch (Throwable t) { @@ -349,7 +354,7 @@ public abstract class AbstractCompactionStrategy public ScannerList getScanners(Collection<SSTableReader> toCompact) { - return getScanners(toCompact, null); + return getScanners(toCompact, (Collection<Range<Token>>)null); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b48a0bf/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index bd72c64..82fd872 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -353,7 +353,7 @@ public class CompactionStrategyManager implements INotificationConsumer * * Delegates the call to the compaction strategies to allow LCS to create a scanner * @param sstables - * @param range + * @param ranges * @return */ @SuppressWarnings("resource") @@ -370,25 +370,16 @@ public class CompactionStrategyManager implements INotificationConsumer } Set<ISSTableScanner> scanners = new HashSet<>(sstables.size()); - - for (Range<Token> range : ranges) - { - AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range); - AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range); - - for (ISSTableScanner scanner : Iterables.concat(repairedScanners.scanners, unrepairedScanners.scanners)) - { - if (!scanners.add(scanner)) - scanner.close(); - } - } - + AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, ranges); + AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, ranges); + scanners.addAll(repairedScanners.scanners); + scanners.addAll(unrepairedScanners.scanners); return new AbstractCompactionStrategy.ScannerList(new ArrayList<>(scanners)); } public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables) { - return getScanners(sstables, Collections.singleton(null)); + return getScanners(sstables, null); } public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b48a0bf/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 953971a..5a5cbef 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -216,7 +216,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy return maxSSTableSizeInMB * 1024L * 1024L; } - public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range) + public ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges) { Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create(); for (SSTableReader sstable : sstables) @@ -235,16 +235,16 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy { // L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each for (SSTableReader sstable : byLevel.get(level)) - scanners.add(sstable.getScanner(range, CompactionManager.instance.getRateLimiter())); + scanners.add(sstable.getScanner(ranges, CompactionManager.instance.getRateLimiter())); } else { // Create a LeveledScanner that only opens one sstable at a time, in sorted order - List<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), range); + Collection<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), ranges); if (!intersecting.isEmpty()) { @SuppressWarnings("resource") // The ScannerList will be in charge of closing (and we close properly on errors) - ISSTableScanner scanner = new LeveledScanner(intersecting, range); + ISSTableScanner scanner = new LeveledScanner(intersecting, ranges); scanners.add(scanner); } } @@ -288,7 +288,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy // same level (e.g. non overlapping) - see #4142 private static class LeveledScanner extends AbstractIterator<UnfilteredRowIterator> implements ISSTableScanner { - private final Range<Token> range; + private final Collection<Range<Token>> ranges; private final List<SSTableReader> sstables; private final Iterator<SSTableReader> sstableIterator; private final long totalLength; @@ -296,9 +296,9 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy private ISSTableScanner currentScanner; private long positionOffset; - public LeveledScanner(Collection<SSTableReader> sstables, Range<Token> range) + public LeveledScanner(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges) { - this.range = range; + this.ranges = ranges; // add only sstables that intersect our range, and estimate how much data that involves this.sstables = new ArrayList<>(sstables.size()); @@ -309,8 +309,8 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy long estimatedKeys = sstable.estimatedKeys(); double estKeysInRangeRatio = 1.0; - if (estimatedKeys > 0 && range != null) - estKeysInRangeRatio = ((double) sstable.estimatedKeysForRanges(Collections.singleton(range))) / estimatedKeys; + if (estimatedKeys > 0 && ranges != null) + estKeysInRangeRatio = ((double) sstable.estimatedKeysForRanges(ranges)) / estimatedKeys; length += sstable.uncompressedLength() * estKeysInRangeRatio; } @@ -319,21 +319,28 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy Collections.sort(this.sstables, SSTableReader.sstableComparator); sstableIterator = this.sstables.iterator(); assert sstableIterator.hasNext(); // caller should check intersecting first - currentScanner = sstableIterator.next().getScanner(range, CompactionManager.instance.getRateLimiter()); + currentScanner = sstableIterator.next().getScanner(ranges, CompactionManager.instance.getRateLimiter()); } - public static List<SSTableReader> intersecting(Collection<SSTableReader> sstables, Range<Token> range) + public static Collection<SSTableReader> intersecting(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges) { - ArrayList<SSTableReader> filtered = new ArrayList<>(); - for (SSTableReader sstable : sstables) + if (ranges == null) + return Lists.newArrayList(sstables); + + Set<SSTableReader> filtered = new HashSet<>(); + for (Range<Token> range : ranges) { - Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken()); - if (range == null || sstableRange.intersects(range)) - filtered.add(sstable); + for (SSTableReader sstable : sstables) + { + Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken()); + if (range == null || sstableRange.intersects(range)) + filtered.add(sstable); + } } return filtered; } + public boolean isForThrift() { return false; @@ -362,7 +369,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy currentScanner = null; return endOfData(); } - currentScanner = sstableIterator.next().getScanner(range, CompactionManager.instance.getRateLimiter()); + currentScanner = sstableIterator.next().getScanner(ranges, CompactionManager.instance.getRateLimiter()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b48a0bf/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java index dbab0f4..1fbf1f2 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java @@ -110,7 +110,10 @@ public class BigTableReader extends SSTableReader */ public ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter) { - return BigTableScanner.getScanner(this, ranges, limiter); + if (ranges != null) + return BigTableScanner.getScanner(this, ranges, limiter); + else + return getScanner(limiter); }