Cleanup doesn't need to inspect sstables that contain only local data patch by Tyler Hobbs; reviewed by jbellis for CASSANDRA-5722
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bc12d73a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bc12d73a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bc12d73a Branch: refs/heads/trunk Commit: bc12d73a5a0f31ab8258b3d2a35063b5750df91c Parents: 16fcd15 Author: Jonathan Ellis <jbel...@apache.org> Authored: Thu Aug 15 15:17:13 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Thu Aug 15 15:17:13 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../db/compaction/CompactionManager.java | 58 ++++++++++ .../org/apache/cassandra/dht/IPartitioner.java | 3 +- .../cassandra/io/sstable/SSTableReader.java | 44 +++++++- .../apache/cassandra/io/util/SegmentedFile.java | 2 +- .../db/compaction/CompactionsTest.java | 112 +++++++++++++++++++ 6 files changed, 216 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc12d73a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 805dca2..4c9f9a1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,9 @@ * Switch from crc32 to adler32 for compressed sstable checksums (CASSANDRA-5862) * Improve offheap memcpy performance (CASSANDRA-5884) * Use a range aware scanner for cleanup (CASSANDRA-2524) + * Cleanup doesn't need to inspect sstables that contain only local data + (CASSANDRA-5722) + 2.0.0-rc2 * enable vnodes by default (CASSANDRA-5869) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc12d73a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index ed6770f..8e8220f 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -458,6 +458,59 @@ public class CompactionManager implements CompactionManagerMBean } /** + * Determines if a cleanup would actually remove any data in this SSTable based + * on a set of owned ranges. + */ + static boolean needsCleanup(SSTableReader sstable, Collection<Range<Token>> ownedRanges) + { + assert !ownedRanges.isEmpty(); // cleanup checks for this + + // unwrap and sort the ranges by LHS token + List<Range<Token>> sortedRanges = Range.normalize(ownedRanges); + + // see if there are any keys LTE the token for the start of the first range + // (token range ownership is exclusive on the LHS.) + Range<Token> firstRange = sortedRanges.get(0); + if (sstable.first.token.compareTo(firstRange.left) <= 0) + return true; + + // then, iterate over all owned ranges and see if the next key beyond the end of the owned + // range falls before the start of the next range + for (int i = 0; i < sortedRanges.size(); i++) + { + Range<Token> range = sortedRanges.get(i); + if (range.right.isMinimum()) + { + // we split a wrapping range and this is the second half. + // there can't be any keys beyond this (and this is the last range) + return false; + } + + DecoratedKey firstBeyondRange = sstable.firstKeyBeyond(range.right.maxKeyBound()); + if (firstBeyondRange == null) + { + // we ran off the end of the sstable looking for the next key; we don't need to check any more ranges + return false; + } + + if (i == (ownedRanges.size() - 1)) + { + // we're at the last range and we found a key beyond the end of the range + return true; + } + + Range<Token> nextRange = sortedRanges.get(i + 1); + if (!nextRange.contains(firstBeyondRange.token)) + { + // we found a key in between the owned ranges + return true; + } + } + + return false; + } + + /** * This function goes over each file and removes the keys that the node is not responsible for * and only keeps keys that this node is responsible for. * @@ -484,6 +537,11 @@ public class CompactionManager implements CompactionManagerMBean cfs.replaceCompactedSSTables(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP); continue; } + if (!needsCleanup(sstable, ranges)) + { + logger.debug("Skipping {} for cleanup; all rows should be kept", sstable); + continue; + } CompactionController controller = new CompactionController(cfs, Collections.singleton(sstable), getDefaultGcBefore(cfs)); long start = System.nanoTime(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc12d73a/src/java/org/apache/cassandra/dht/IPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java b/src/java/org/apache/cassandra/dht/IPartitioner.java index 084a1e5..46165b8 100644 --- a/src/java/org/apache/cassandra/dht/IPartitioner.java +++ b/src/java/org/apache/cassandra/dht/IPartitioner.java @@ -43,7 +43,8 @@ public interface IPartitioner<T extends Token> public Token midpoint(Token left, Token right); /** - * @return The minimum possible Token in the range that is being partitioned. + * @return A Token smaller than all others in the range that is being partitioned. + * Not legal to assign to a node or key. (But legal to use in range scans.) */ public T getMinimumToken(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc12d73a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index bbca089..9e3f774 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -68,9 +68,6 @@ public class SSTableReader extends SSTable { private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class); - // guesstimated size of INDEX_INTERVAL index entries - private static final int INDEX_FILE_BUFFER_BYTES = 16 * CFMetaData.DEFAULT_INDEX_INTERVAL; - /** * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an uppper bound * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created @@ -878,7 +875,7 @@ public class SSTableReader extends SSTable // is lesser than the first key of next interval (and in that case we must return the position of the first key // of the next interval). int i = 0; - Iterator<FileDataInput> segments = ifile.iterator(sampledPosition, INDEX_FILE_BUFFER_BYTES); + Iterator<FileDataInput> segments = ifile.iterator(sampledPosition); while (segments.hasNext() && i <= indexSummary.getIndexInterval()) { FileDataInput in = segments.next(); @@ -961,6 +958,45 @@ public class SSTableReader extends SSTable } /** + * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists. + */ + public DecoratedKey firstKeyBeyond(RowPosition token) + { + long sampledPosition = getIndexScanPosition(token); + if (sampledPosition == -1) + sampledPosition = 0; + + Iterator<FileDataInput> segments = ifile.iterator(sampledPosition); + while (segments.hasNext()) + { + FileDataInput in = segments.next(); + try + { + while (!in.isEOF()) + { + ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); + DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey); + if (indexDecoratedKey.compareTo(token) > 0) + return indexDecoratedKey; + + RowIndexEntry.serializer.skip(in); + } + } + catch (IOException e) + { + markSuspect(); + throw new CorruptSSTableException(e, in.getPath()); + } + finally + { + FileUtils.closeQuietly(in); + } + } + + return null; + } + + /** * @return The length in bytes of the data for this SSTable. For * compressed files, this is not the same thing as the on disk size (see * onDiskLength()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc12d73a/src/java/org/apache/cassandra/io/util/SegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java index 8681b03..6231fd7 100644 --- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java @@ -82,7 +82,7 @@ public abstract class SegmentedFile /** * @return An Iterator over segments, beginning with the segment containing the given position: each segment must be closed after use. */ - public Iterator<FileDataInput> iterator(long position, int bufferSize) + public Iterator<FileDataInput> iterator(long position) { return new SegmentIterator(position); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc12d73a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index a775988..bc89f4f 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -39,7 +39,9 @@ import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.dht.BytesToken; import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableScanner; @@ -343,4 +345,114 @@ public class CompactionsTest extends SchemaLoader cf = cfs.getColumnFamily(filter); assert cf == null || cf.getColumnCount() == 0 : "should be empty: " + cf; } + + private static Range<Token> rangeFor(int start, int end) + { + return new Range<Token>(new BytesToken(String.format("%03d", start).getBytes()), + new BytesToken(String.format("%03d", end).getBytes())); + } + + private static Collection<Range<Token>> makeRanges(int ... keys) + { + Collection<Range<Token>> ranges = new ArrayList<Range<Token>>(keys.length / 2); + for (int i = 0; i < keys.length; i += 2) + ranges.add(rangeFor(keys[i], keys[i + 1])); + return ranges; + } + + private static void insertRowWithKey(int key) + { + long timestamp = System.currentTimeMillis(); + DecoratedKey decoratedKey = Util.dk(String.format("%03d", key)); + RowMutation rm = new RowMutation(KEYSPACE1, decoratedKey.key); + rm.add("Standard1", ByteBufferUtil.bytes("col"), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp, 1000); + rm.apply(); + } + + @Test + public void testNeedsCleanup() throws IOException + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1"); + store.clearUnsafe(); + + // disable compaction while flushing + store.disableAutoCompaction(); + + // write three groups of 9 keys: 001, 002, ... 008, 009 + // 101, 102, ... 108, 109 + // 201, 202, ... 208, 209 + for (int i = 1; i < 10; i++) + { + insertRowWithKey(i); + insertRowWithKey(i + 100); + insertRowWithKey(i + 200); + } + store.forceBlockingFlush(); + + assertEquals(1, store.getSSTables().size()); + SSTableReader sstable = store.getSSTables().iterator().next(); + + + // contiguous range spans all data + assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 209))); + assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 210))); + + // separate ranges span all data + assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 9, + 100, 109, + 200, 209))); + assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 109, + 200, 210))); + assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 9, + 100, 210))); + + // one range is missing completely + assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(100, 109, + 200, 209))); + assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 9, + 200, 209))); + assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 9, + 100, 109))); + + + // the beginning of one range is missing + assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(1, 9, + 100, 109, + 200, 209))); + assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 9, + 101, 109, + 200, 209))); + assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 9, + 100, 109, + 201, 209))); + + // the end of one range is missing + assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 8, + 100, 109, + 200, 209))); + assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 9, + 100, 108, + 200, 209))); + assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 9, + 100, 109, + 200, 208))); + + // some ranges don't contain any data + assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 0, + 0, 9, + 50, 51, + 100, 109, + 150, 199, + 200, 209, + 300, 301))); + // same case, but with a middle range not covering some of the existing data + assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 0, + 0, 9, + 50, 51, + 100, 103, + 150, 199, + 200, 209, + 300, 301))); + } }