Repository: cassandra Updated Branches: refs/heads/trunk e73fccdcd -> 32ac6af2b
Fix ref counting race between SSTableScanner and SSTR Patch by jmckenzie; reviewed by marcuse for CASSANDRA-8399 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1fec4a42 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1fec4a42 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1fec4a42 Branch: refs/heads/trunk Commit: 1fec4a4281be94f8ef2f9f8a5eaccee56d70e87e Parents: 9871914 Author: Joshua McKenzie <jmcken...@apache.org> Authored: Tue Dec 16 14:37:07 2014 -0600 Committer: Joshua McKenzie <jmcken...@apache.org> Committed: Tue Dec 16 14:37:07 2014 -0600 ---------------------------------------------------------------------- .../compaction/AbstractCompactionIterable.java | 7 +- .../compaction/AbstractCompactionStrategy.java | 10 +-- .../db/compaction/CompactionIterable.java | 5 +- .../db/compaction/CompactionManager.java | 22 +++--- .../db/compaction/ICompactionScanner.java | 34 --------- .../compaction/LeveledCompactionStrategy.java | 7 +- .../compaction/WrappingCompactionStrategy.java | 4 +- .../cassandra/io/sstable/SSTableReader.java | 61 +++------------- .../cassandra/io/sstable/SSTableScanner.java | 73 +++++++++++++++++--- .../apache/cassandra/tools/SSTableExport.java | 2 +- .../db/compaction/AntiCompactionTest.java | 9 +-- .../db/compaction/CompactionsTest.java | 7 +- .../LeveledCompactionStrategyTest.java | 5 +- .../cassandra/db/compaction/TTLExpiryTest.java | 4 +- .../cassandra/io/sstable/SSTableReaderTest.java | 3 +- .../io/sstable/SSTableRewriterTest.java | 24 +++---- .../io/sstable/SSTableScannerTest.java | 17 +++-- .../cassandra/io/sstable/SSTableUtils.java | 4 +- 18 files changed, 130 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java index e9f063f..5ac2c8b 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction; import java.util.List; import java.util.concurrent.atomic.AtomicLong; +import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.utils.CloseableIterator; public abstract class AbstractCompactionIterable extends CompactionInfo.Holder implements Iterable<AbstractCompactedRow> @@ -28,7 +29,7 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i protected final CompactionController controller; protected final long totalBytes; protected volatile long bytesRead; - protected final List<ICompactionScanner> scanners; + protected final List<ISSTableScanner> scanners; /* * counters for merged rows. * array index represents (number of merged rows - 1), so index 0 is counter for no merge (1 row), @@ -36,7 +37,7 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i */ protected final AtomicLong[] mergeCounters; - public AbstractCompactionIterable(CompactionController controller, OperationType type, List<ICompactionScanner> scanners) + public AbstractCompactionIterable(CompactionController controller, OperationType type, List<ISSTableScanner> scanners) { this.controller = controller; this.type = type; @@ -44,7 +45,7 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i this.bytesRead = 0; long bytes = 0; - for (ICompactionScanner scanner : scanners) + for (ISSTableScanner scanner : scanners) bytes += scanner.getLengthInBytes(); this.totalBytes = bytes; mergeCounters = new AtomicLong[scanners.size()]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index bf136b9..337657d 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -22,7 +22,6 @@ import java.util.*; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; @@ -34,6 +33,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -269,7 +269,7 @@ public abstract class AbstractCompactionStrategy public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range) { RateLimiter limiter = CompactionManager.instance.getRateLimiter(); - ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(); + ArrayList<ISSTableScanner> scanners = new ArrayList<ISSTableScanner>(); try { for (SSTableReader sstable : sstables) @@ -306,8 +306,8 @@ public abstract class AbstractCompactionStrategy public static class ScannerList implements AutoCloseable { - public final List<ICompactionScanner> scanners; - public ScannerList(List<ICompactionScanner> scanners) + public final List<ISSTableScanner> scanners; + public ScannerList(List<ISSTableScanner> scanners) { this.scanners = scanners; } @@ -315,7 +315,7 @@ public abstract class AbstractCompactionStrategy public void close() { Throwable t = null; - for (ICompactionScanner scanner : scanners) + for (ISSTableScanner scanner : scanners) { try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java index 0c9b52a..fdcec6e 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java @@ -24,6 +24,7 @@ import java.util.List; import com.google.common.collect.ImmutableList; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.MergeIterator; @@ -37,7 +38,7 @@ public class CompactionIterable extends AbstractCompactionIterable } }; - public CompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller) + public CompactionIterable(OperationType type, List<ISSTableScanner> scanners, CompactionController controller) { super(controller, type, scanners); } @@ -77,7 +78,7 @@ public class CompactionIterable extends AbstractCompactionIterable { rows.clear(); long n = 0; - for (ICompactionScanner scanner : scanners) + for (ISSTableScanner scanner : scanners) n += scanner.getCurrentPosition(); bytesRead = n; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index d85ffd7..3977d9c 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -45,7 +44,6 @@ import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ConcurrentHashMultiset; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; @@ -73,11 +71,7 @@ import org.apache.cassandra.db.index.SecondaryIndexBuilder; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTableIdentityIterator; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.sstable.SSTableRewriter; -import org.apache.cassandra.io.sstable.SSTableWriter; +import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.CompactionMetrics; @@ -696,7 +690,7 @@ public class CompactionManager implements CompactionManagerMBean if (compactionFileLocation == null) throw new IOException("disk full"); - ICompactionScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter()); + ISSTableScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter()); CleanupInfo ci = new CleanupInfo(sstable, scanner); metrics.beginCompaction(ci); @@ -761,7 +755,7 @@ public class CompactionManager implements CompactionManagerMBean : new Bounded(cfs, ranges); } - public abstract ICompactionScanner getScanner(SSTableReader sstable, RateLimiter limiter); + public abstract ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter); public abstract SSTableIdentityIterator cleanup(SSTableIdentityIterator row); private static final class Bounded extends CleanupStrategy @@ -782,7 +776,7 @@ public class CompactionManager implements CompactionManagerMBean } @Override - public ICompactionScanner getScanner(SSTableReader sstable, RateLimiter limiter) + public ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter) { return sstable.getScanner(ranges, limiter); } @@ -808,7 +802,7 @@ public class CompactionManager implements CompactionManagerMBean } @Override - public ICompactionScanner getScanner(SSTableReader sstable, RateLimiter limiter) + public ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter) { return sstable.getScanner(limiter); } @@ -1122,7 +1116,7 @@ public class CompactionManager implements CompactionManagerMBean private static class ValidationCompactionIterable extends CompactionIterable { - public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ICompactionScanner> scanners, int gcBefore) + public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ISSTableScanner> scanners, int gcBefore) { super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore)); } @@ -1288,9 +1282,9 @@ public class CompactionManager implements CompactionManagerMBean private static class CleanupInfo extends CompactionInfo.Holder { private final SSTableReader sstable; - private final ICompactionScanner scanner; + private final ISSTableScanner scanner; - public CleanupInfo(SSTableReader sstable, ICompactionScanner scanner) + public CleanupInfo(SSTableReader sstable, ISSTableScanner scanner) { this.sstable = sstable; this.scanner = scanner; http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java b/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java deleted file mode 100644 index ebee3ed..0000000 --- a/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cassandra.db.compaction; - -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.utils.CloseableIterator; - -/** - * An ICompactionScanner is an abstraction allowing multiple SSTableScanners to be - * chained together under the hood. See LeveledCompactionStrategy.getScanners. - */ -public interface ICompactionScanner extends CloseableIterator<OnDiskAtomIterator> -{ - public long getLengthInBytes(); - public long getCurrentPosition(); - public String getBackingFiles(); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index a560234..dbb9a13 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -32,6 +32,7 @@ import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.SSTableReader; public class LeveledCompactionStrategy extends AbstractCompactionStrategy @@ -167,7 +168,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy byLevel.get(sstable.getSSTableLevel()).add(sstable); } - List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size()); + List<ISSTableScanner> scanners = new ArrayList<ISSTableScanner>(sstables.size()); try { for (Integer level : byLevel.keySet()) @@ -219,14 +220,14 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy // Lazily creates SSTableBoundedScanner for sstable that are assumed to be from the // same level (e.g. non overlapping) - see #4142 - private static class LeveledScanner extends AbstractIterator<OnDiskAtomIterator> implements ICompactionScanner + private static class LeveledScanner extends AbstractIterator<OnDiskAtomIterator> implements ISSTableScanner { private final Range<Token> range; private final List<SSTableReader> sstables; private final Iterator<SSTableReader> sstableIterator; private final long totalLength; - private ICompactionScanner currentScanner; + private ISSTableScanner currentScanner; private long positionOffset; public LeveledScanner(Collection<SSTableReader> sstables, Range<Token> range) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java index 1d713ef..84ef97f 100644 --- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +30,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.notifications.INotification; import org.apache.cassandra.notifications.INotificationConsumer; @@ -318,7 +320,7 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy unrepairedSSTables.add(sstable); ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range); ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range); - List<ICompactionScanner> scanners = new ArrayList<>(repairedScanners.scanners.size() + unrepairedScanners.scanners.size()); + List<ISSTableScanner> scanners = new ArrayList<>(repairedScanners.scanners.size() + unrepairedScanners.scanners.size()); scanners.addAll(repairedScanners.scanners); scanners.addAll(unrepairedScanners.scanners); return new ScannerList(scanners); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 0024f24..a8188ba 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -75,7 +75,6 @@ import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.commitlog.ReplayPosition; -import org.apache.cassandra.db.compaction.ICompactionScanner; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.IPartitioner; @@ -1706,23 +1705,23 @@ public class SSTableReader extends SSTable * @param dataRange filter to use when reading the columns * @return A Scanner for seeking over the rows of the SSTable. */ - public SSTableScanner getScanner(DataRange dataRange) + public ISSTableScanner getScanner(DataRange dataRange) { - return new SSTableScanner(this, dataRange, null); + return SSTableScanner.getScanner(this, dataRange, null); } /** * I/O SSTableScanner * @return A Scanner for seeking over the rows of the SSTable. */ - public SSTableScanner getScanner() + public ISSTableScanner getScanner() { return getScanner((RateLimiter) null); } - public SSTableScanner getScanner(RateLimiter limiter) + public ISSTableScanner getScanner(RateLimiter limiter) { - return new SSTableScanner(this, DataRange.allData(partitioner), limiter); + return SSTableScanner.getScanner(this, DataRange.allData(partitioner), limiter); } /** @@ -1731,7 +1730,7 @@ public class SSTableReader extends SSTable * @param range the range of keys to cover * @return A Scanner for seeking over the rows of the SSTable. */ - public ICompactionScanner getScanner(Range<Token> range, RateLimiter limiter) + public ISSTableScanner getScanner(Range<Token> range, RateLimiter limiter) { if (range == null) return getScanner(limiter); @@ -1744,14 +1743,9 @@ public class SSTableReader extends SSTable * @param ranges the range of keys to cover * @return A Scanner for seeking over the rows of the SSTable. */ - public ICompactionScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter) + public ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter) { - // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249) - List<Pair<Long, Long>> positions = getPositionsForRanges(Range.normalize(ranges)); - if (positions.isEmpty()) - return new EmptyCompactionScanner(getFilename()); - else - return new SSTableScanner(this, ranges, limiter); + return SSTableScanner.getScanner(this, ranges, limiter); } public FileDataInput getFileDataInput(long position) @@ -2058,45 +2052,6 @@ public class SSTableReader extends SSTable readMeter.mark(); } - protected class EmptyCompactionScanner implements ICompactionScanner - { - private final String filename; - - public EmptyCompactionScanner(String filename) - { - this.filename = filename; - } - - public long getLengthInBytes() - { - return 0; - } - - public long getCurrentPosition() - { - return 0; - } - - public String getBackingFiles() - { - return filename; - } - - public boolean hasNext() - { - return false; - } - - public OnDiskAtomIterator next() - { - return null; - } - - public void close() throws IOException { } - - public void remove() { } - } - public static class SizeComparator implements Comparator<SSTableReader> { public int compare(SSTableReader o1, SSTableReader o2) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java index 3f1f1f0..5499195 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java @@ -18,10 +18,7 @@ package org.apache.cassandra.io.sstable; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; +import java.util.*; import com.google.common.collect.AbstractIterator; import com.google.common.util.concurrent.RateLimiter; @@ -33,7 +30,6 @@ import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.db.columniterator.IColumnIteratorFactory; import org.apache.cassandra.db.columniterator.LazyColumnIterator; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.db.compaction.ICompactionScanner; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Range; @@ -41,8 +37,9 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; -public class SSTableScanner implements ICompactionScanner +public class SSTableScanner implements ISSTableScanner { protected final RandomAccessReader dfile; protected final RandomAccessReader ifile; @@ -55,15 +52,31 @@ public class SSTableScanner implements ICompactionScanner protected Iterator<OnDiskAtomIterator> iterator; + // We can race with the sstable for deletion during compaction. If it's been ref counted to 0, skip + public static ISSTableScanner getScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter) + { + return sstable.acquireReference() + ? new SSTableScanner(sstable, dataRange, limiter) + : new SSTableScanner.EmptySSTableScanner(sstable.getFilename()); + } + public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter) + { + // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249) + List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(Range.normalize(tokenRanges)); + if (positions.isEmpty() || !sstable.acquireReference()) + return new EmptySSTableScanner(sstable.getFilename()); + + return new SSTableScanner(sstable, tokenRanges, limiter); + } + /** * @param sstable SSTable to scan; must not be null * @param dataRange a single range to scan; must not be null * @param limiter background i/o RateLimiter; may be null */ - SSTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter) + private SSTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter) { assert sstable != null; - sstable.acquireReference(); this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter); this.ifile = sstable.openIndexReader(); @@ -90,10 +103,9 @@ public class SSTableScanner implements ICompactionScanner * @param tokenRanges A set of token ranges to scan * @param limiter background i/o RateLimiter; may be null */ - SSTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter) + private SSTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter) { assert sstable != null; - sstable.acquireReference(); this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter); this.ifile = sstable.openIndexReader(); @@ -294,4 +306,45 @@ public class SSTableScanner implements ICompactionScanner " sstable=" + sstable + ")"; } + + public static class EmptySSTableScanner implements ISSTableScanner + { + private final String filename; + + public EmptySSTableScanner(String filename) + { + this.filename = filename; + } + + public long getLengthInBytes() + { + return 0; + } + + public long getCurrentPosition() + { + return 0; + } + + public String getBackingFiles() + { + return filename; + } + + public boolean hasNext() + { + return false; + } + + public OnDiskAtomIterator next() + { + return null; + } + + public void close() throws IOException { } + + public void remove() { } + } + + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/tools/SSTableExport.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java index e178145..22aebdb 100644 --- a/src/java/org/apache/cassandra/tools/SSTableExport.java +++ b/src/java/org/apache/cassandra/tools/SSTableExport.java @@ -321,7 +321,7 @@ public class SSTableExport excludeSet = new HashSet<String>(Arrays.asList(excludes)); SSTableIdentityIterator row; - SSTableScanner scanner = reader.getScanner(); + ISSTableScanner scanner = reader.getScanner(); try { outs.println("["); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index 090839e..a09d8b4 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -40,10 +40,7 @@ import org.apache.cassandra.db.Mutation; import org.apache.cassandra.dht.BytesToken; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.SSTableIdentityIterator; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.sstable.SSTableScanner; -import org.apache.cassandra.io.sstable.SSTableWriter; +import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; @@ -58,7 +55,7 @@ public class AntiCompactionTest extends SchemaLoader private static final String CF = "Standard1"; @Test - public void antiCompactOne() throws InterruptedException, ExecutionException, IOException + public void antiCompactOne() throws Exception { ColumnFamilyStore store = prepareColumnFamilyStore(); Collection<SSTableReader> sstables = store.getUnrepairedSSTables(); @@ -75,7 +72,7 @@ public class AntiCompactionTest extends SchemaLoader int nonRepairedKeys = 0; for (SSTableReader sstable : store.getSSTables()) { - try (SSTableScanner scanner = sstable.getScanner()) + try (ISSTableScanner scanner = sstable.getScanner()) { while (scanner.hasNext()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index a1ecfab..4659b5c 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -41,10 +41,7 @@ import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.dht.BytesToken; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.Component; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.sstable.SSTableScanner; -import org.apache.cassandra.io.sstable.SSTableWriter; +import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.service.StorageService; @@ -161,7 +158,7 @@ public class CompactionsTest extends SchemaLoader // check that the shadowed column is gone SSTableReader sstable = cfs.getSSTables().iterator().next(); Range keyRange = new Range<RowPosition>(key, sstable.partitioner.getMinimumToken().maxKeyBound()); - SSTableScanner scanner = sstable.getScanner(DataRange.forKeyRange(keyRange)); + ISSTableScanner scanner = sstable.getScanner(DataRange.forKeyRange(keyRange)); OnDiskAtomIterator iter = scanner.next(); assertEquals(key, iter.getKey()); assert iter.next() instanceof RangeTombstone; http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index ebc6e86..4c2236b 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -34,6 +34,7 @@ import org.apache.cassandra.Util; import org.apache.cassandra.db.*; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.notifications.SSTableAddedNotification; import org.apache.cassandra.notifications.SSTableRepairStatusChanged; @@ -145,9 +146,9 @@ public class LeveledCompactionStrategyTest extends SchemaLoader // get LeveledScanner for level 1 sstables Collection<SSTableReader> sstables = strategy.manifest.getLevel(1); - List<ICompactionScanner> scanners = strategy.getScanners(sstables).scanners; + List<ISSTableScanner> scanners = strategy.getScanners(sstables).scanners; assertEquals(1, scanners.size()); // should be one per level - ICompactionScanner scanner = scanners.get(0); + ISSTableScanner scanner = scanners.get(0); // scan through to the end while (scanner.hasNext()) scanner.next(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java index 4fe5cfb..678601b 100644 --- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java @@ -29,8 +29,8 @@ import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.sstable.SSTableScanner; import org.apache.cassandra.utils.ByteBufferUtil; import java.util.Collections; @@ -181,7 +181,7 @@ public class TTLExpiryTest extends SchemaLoader cfs.enableAutoCompaction(true); assertEquals(1, cfs.getSSTables().size()); SSTableReader sstable = cfs.getSSTables().iterator().next(); - SSTableScanner scanner = sstable.getScanner(DataRange.allData(sstable.partitioner)); + ISSTableScanner scanner = sstable.getScanner(DataRange.allData(sstable.partitioner)); assertTrue(scanner.hasNext()); while(scanner.hasNext()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index a99aa0c..03b5553 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -56,7 +56,6 @@ import org.apache.cassandra.db.Row; import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.compaction.ICompactionScanner; import org.apache.cassandra.db.composites.Composites; import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.dht.LocalToken; @@ -347,7 +346,7 @@ public class SSTableReaderTest extends SchemaLoader boolean foundScanner = false; for (SSTableReader s : store.getSSTables()) { - ICompactionScanner scanner = s.getScanner(new Range<Token>(t(0), t(1), s.partitioner), null); + ISSTableScanner scanner = s.getScanner(new Range<Token>(t(0), t(1), s.partitioner), null); scanner.next(); // throws exception pre 5407 foundScanner = true; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index c0a017e..ecf97c3 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -21,7 +21,6 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -30,17 +29,14 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ArrayBackedSortedColumns; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.compaction.AbstractCompactedRow; import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; import org.apache.cassandra.db.compaction.CompactionController; -import org.apache.cassandra.db.compaction.ICompactionScanner; import org.apache.cassandra.db.compaction.LazilyCompactedRow; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.compaction.SSTableSplitter; @@ -75,7 +71,7 @@ public class SSTableRewriterTest extends SchemaLoader SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false); try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);) { - ICompactionScanner scanner = scanners.scanners.get(0); + ISSTableScanner scanner = scanners.scanners.get(0); CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); while(scanner.hasNext()) @@ -107,7 +103,7 @@ public class SSTableRewriterTest extends SchemaLoader SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false); try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);) { - ICompactionScanner scanner = scanners.scanners.get(0); + ISSTableScanner scanner = scanners.scanners.get(0); CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); while (scanner.hasNext()) @@ -172,7 +168,7 @@ public class SSTableRewriterTest extends SchemaLoader rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); int files = 1; - try (ICompactionScanner scanner = s.getScanner(); + try (ISSTableScanner scanner = s.getScanner(); CompactionController controller = new CompactionController(cfs, compacting, 0)) { while(scanner.hasNext()) @@ -221,7 +217,7 @@ public class SSTableRewriterTest extends SchemaLoader rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); int files = 1; - try (ICompactionScanner scanner = s.getScanner(); + try (ISSTableScanner scanner = s.getScanner(); CompactionController controller = new CompactionController(cfs, compacting, 0)) { while(scanner.hasNext()) @@ -264,7 +260,7 @@ public class SSTableRewriterTest extends SchemaLoader rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); int files = 1; - try (ICompactionScanner scanner = s.getScanner(); + try (ISSTableScanner scanner = s.getScanner(); CompactionController controller = new CompactionController(cfs, compacting, 0)) { while(scanner.hasNext()) @@ -307,7 +303,7 @@ public class SSTableRewriterTest extends SchemaLoader rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); int files = 1; - try (ICompactionScanner scanner = s.getScanner(); + try (ISSTableScanner scanner = s.getScanner(); CompactionController controller = new CompactionController(cfs, compacting, 0)) { while(scanner.hasNext()) @@ -352,7 +348,7 @@ public class SSTableRewriterTest extends SchemaLoader rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); int files = 1; - try (ICompactionScanner scanner = s.getScanner(); + try (ISSTableScanner scanner = s.getScanner(); CompactionController controller = new CompactionController(cfs, compacting, 0)) { while(scanner.hasNext()) @@ -395,7 +391,7 @@ public class SSTableRewriterTest extends SchemaLoader rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); int files = 1; - try (ICompactionScanner scanner = s.getScanner(); + try (ISSTableScanner scanner = s.getScanner(); CompactionController controller = new CompactionController(cfs, compacting, 0)) { while(scanner.hasNext()) @@ -434,7 +430,7 @@ public class SSTableRewriterTest extends SchemaLoader rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); int files = 1; - try (ICompactionScanner scanner = s.getScanner(); + try (ISSTableScanner scanner = s.getScanner(); CompactionController controller = new CompactionController(cfs, compacting, 0)) { while(scanner.hasNext()) @@ -513,7 +509,7 @@ public class SSTableRewriterTest extends SchemaLoader SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline); SSTableWriter w = getWriter(cfs, s.descriptor.directory); rewriter.switchWriter(w); - try (ICompactionScanner scanner = compacting.iterator().next().getScanner(); + try (ISSTableScanner scanner = compacting.iterator().next().getScanner(); CompactionController controller = new CompactionController(cfs, compacting, 0)) { while (scanner.hasNext()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java index ff1a305..ff60481 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java @@ -27,7 +27,6 @@ import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; -import org.apache.cassandra.db.compaction.ICompactionScanner; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.BytesToken; import org.apache.cassandra.dht.Range; @@ -78,7 +77,7 @@ public class SSTableScannerTest extends SchemaLoader private static void assertScanMatches(SSTableReader sstable, int scanStart, int scanEnd, int expectedStart, int expectedEnd) { - SSTableScanner scanner = sstable.getScanner(new DataRange(boundsFor(scanStart, scanEnd), new IdentityQueryFilter())); + ISSTableScanner scanner = sstable.getScanner(new DataRange(boundsFor(scanStart, scanEnd), new IdentityQueryFilter())); for (int i = expectedStart; i <= expectedEnd; i++) assertEquals(toKey(i), new String(scanner.next().getKey().getKey().array())); assertFalse(scanner.hasNext()); @@ -86,7 +85,7 @@ public class SSTableScannerTest extends SchemaLoader private static void assertScanEmpty(SSTableReader sstable, int scanStart, int scanEnd) { - SSTableScanner scanner = sstable.getScanner(new DataRange(boundsFor(scanStart, scanEnd), new IdentityQueryFilter())); + ISSTableScanner scanner = sstable.getScanner(new DataRange(boundsFor(scanStart, scanEnd), new IdentityQueryFilter())); assertFalse(String.format("scan of (%03d, %03d] should be empty", scanStart, scanEnd), scanner.hasNext()); } @@ -108,7 +107,7 @@ public class SSTableScannerTest extends SchemaLoader SSTableReader sstable = store.getSSTables().iterator().next(); // full range scan - SSTableScanner scanner = sstable.getScanner(); + ISSTableScanner scanner = sstable.getScanner(); for (int i = 2; i < 10; i++) assertEquals(toKey(i), new String(scanner.next().getKey().getKey().array())); @@ -135,7 +134,7 @@ public class SSTableScannerTest extends SchemaLoader assertScanEmpty(sstable, 10, 11); } - private static void assertScanContainsRanges(ICompactionScanner scanner, int ... rangePairs) + private static void assertScanContainsRanges(ISSTableScanner scanner, int ... rangePairs) { assert rangePairs.length % 2 == 0; @@ -172,7 +171,7 @@ public class SSTableScannerTest extends SchemaLoader SSTableReader sstable = store.getSSTables().iterator().next(); // full range scan - SSTableScanner fullScanner = sstable.getScanner(); + ISSTableScanner fullScanner = sstable.getScanner(); assertScanContainsRanges(fullScanner, 2, 9, 102, 109, @@ -180,7 +179,7 @@ public class SSTableScannerTest extends SchemaLoader // scan all three ranges separately - ICompactionScanner scanner = sstable.getScanner(makeRanges(1, 9, + ISSTableScanner scanner = sstable.getScanner(makeRanges(1, 9, 101, 109, 201, 209), null); @@ -302,11 +301,11 @@ public class SSTableScannerTest extends SchemaLoader SSTableReader sstable = store.getSSTables().iterator().next(); // full range scan - SSTableScanner fullScanner = sstable.getScanner(); + ISSTableScanner fullScanner = sstable.getScanner(); assertScanContainsRanges(fullScanner, 205, 205); // scan three ranges separately - ICompactionScanner scanner = sstable.getScanner(makeRanges(101, 109, + ISSTableScanner scanner = sstable.getScanner(makeRanges(101, 109, 201, 209), null); // this will currently fail http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java index d39f968..b9a3821 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java @@ -69,8 +69,8 @@ public class SSTableUtils public static void assertContentEquals(SSTableReader lhs, SSTableReader rhs) { - SSTableScanner slhs = lhs.getScanner(); - SSTableScanner srhs = rhs.getScanner(); + ISSTableScanner slhs = lhs.getScanner(); + ISSTableScanner srhs = rhs.getScanner(); while (slhs.hasNext()) { OnDiskAtomIterator ilhs = slhs.next();