Cleanup doesn't need to inspect sstables that contain only local data
patch by Tyler Hobbs; reviewed by jbellis for CASSANDRA-5722


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bc12d73a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bc12d73a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bc12d73a

Branch: refs/heads/trunk
Commit: bc12d73a5a0f31ab8258b3d2a35063b5750df91c
Parents: 16fcd15
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Thu Aug 15 15:17:13 2013 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Thu Aug 15 15:17:13 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../db/compaction/CompactionManager.java        |  58 ++++++++++
 .../org/apache/cassandra/dht/IPartitioner.java  |   3 +-
 .../cassandra/io/sstable/SSTableReader.java     |  44 +++++++-
 .../apache/cassandra/io/util/SegmentedFile.java |   2 +-
 .../db/compaction/CompactionsTest.java          | 112 +++++++++++++++++++
 6 files changed, 216 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc12d73a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 805dca2..4c9f9a1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,9 @@
  * Switch from crc32 to adler32 for compressed sstable checksums 
(CASSANDRA-5862)
  * Improve offheap memcpy performance (CASSANDRA-5884)
  * Use a range aware scanner for cleanup (CASSANDRA-2524)
+ * Cleanup doesn't need to inspect sstables that contain only local data 
+   (CASSANDRA-5722)
+
 
 2.0.0-rc2
  * enable vnodes by default (CASSANDRA-5869)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc12d73a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index ed6770f..8e8220f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -458,6 +458,59 @@ public class CompactionManager implements 
CompactionManagerMBean
     }
 
     /**
+     * Determines if a cleanup would actually remove any data in this SSTable 
based
+     * on a set of owned ranges.
+     */
+    static boolean needsCleanup(SSTableReader sstable, 
Collection<Range<Token>> ownedRanges)
+    {
+        assert !ownedRanges.isEmpty(); // cleanup checks for this
+
+        // unwrap and sort the ranges by LHS token
+        List<Range<Token>> sortedRanges = Range.normalize(ownedRanges);
+
+        // see if there are any keys LTE the token for the start of the first 
range
+        // (token range ownership is exclusive on the LHS.)
+        Range<Token> firstRange = sortedRanges.get(0);
+        if (sstable.first.token.compareTo(firstRange.left) <= 0)
+            return true;
+
+        // then, iterate over all owned ranges and see if the next key beyond 
the end of the owned
+        // range falls before the start of the next range
+        for (int i = 0; i < sortedRanges.size(); i++)
+        {
+            Range<Token> range = sortedRanges.get(i);
+            if (range.right.isMinimum())
+            {
+                // we split a wrapping range and this is the second half.
+                // there can't be any keys beyond this (and this is the last 
range)
+                return false;
+            }
+
+            DecoratedKey firstBeyondRange = 
sstable.firstKeyBeyond(range.right.maxKeyBound());
+            if (firstBeyondRange == null)
+            {
+                // we ran off the end of the sstable looking for the next key; 
we don't need to check any more ranges
+                return false;
+            }
+
+            if (i == (ownedRanges.size() - 1))
+            {
+                // we're at the last range and we found a key beyond the end 
of the range
+                return true;
+            }
+
+            Range<Token> nextRange = sortedRanges.get(i + 1);
+            if (!nextRange.contains(firstBeyondRange.token))
+            {
+                // we found a key in between the owned ranges
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
      * This function goes over each file and removes the keys that the node is 
not responsible for
      * and only keeps keys that this node is responsible for.
      *
@@ -484,6 +537,11 @@ public class CompactionManager implements 
CompactionManagerMBean
                 cfs.replaceCompactedSSTables(Arrays.asList(sstable), 
Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
                 continue;
             }
+            if (!needsCleanup(sstable, ranges))
+            {
+                logger.debug("Skipping {} for cleanup; all rows should be 
kept", sstable);
+                continue;
+            }
 
             CompactionController controller = new CompactionController(cfs, 
Collections.singleton(sstable), getDefaultGcBefore(cfs));
             long start = System.nanoTime();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc12d73a/src/java/org/apache/cassandra/dht/IPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java 
b/src/java/org/apache/cassandra/dht/IPartitioner.java
index 084a1e5..46165b8 100644
--- a/src/java/org/apache/cassandra/dht/IPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/IPartitioner.java
@@ -43,7 +43,8 @@ public interface IPartitioner<T extends Token>
     public Token midpoint(Token left, Token right);
 
     /**
-     * @return The minimum possible Token in the range that is being 
partitioned.
+     * @return A Token smaller than all others in the range that is being 
partitioned.
+     * Not legal to assign to a node or key.  (But legal to use in range 
scans.)
      */
     public T getMinimumToken();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc12d73a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index bbca089..9e3f774 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -68,9 +68,6 @@ public class SSTableReader extends SSTable
 {
     private static final Logger logger = 
LoggerFactory.getLogger(SSTableReader.class);
 
-    // guesstimated size of INDEX_INTERVAL index entries
-    private static final int INDEX_FILE_BUFFER_BYTES = 16 * 
CFMetaData.DEFAULT_INDEX_INTERVAL;
-
     /**
      * maxDataAge is a timestamp in local server time (e.g. 
System.currentTimeMilli) which represents an uppper bound
      * to the newest piece of data stored in the sstable. In other words, this 
sstable does not contain items created
@@ -878,7 +875,7 @@ public class SSTableReader extends SSTable
         // is lesser than the first key of next interval (and in that case we 
must return the position of the first key
         // of the next interval).
         int i = 0;
-        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition, 
INDEX_FILE_BUFFER_BYTES);
+        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
         while (segments.hasNext() && i <= indexSummary.getIndexInterval())
         {
             FileDataInput in = segments.next();
@@ -961,6 +958,45 @@ public class SSTableReader extends SSTable
     }
 
     /**
+     * Finds and returns the first key beyond a given token in this SSTable or 
null if no such key exists.
+     */
+    public DecoratedKey firstKeyBeyond(RowPosition token)
+    {
+        long sampledPosition = getIndexScanPosition(token);
+        if (sampledPosition == -1)
+            sampledPosition = 0;
+
+        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
+        while (segments.hasNext())
+        {
+            FileDataInput in = segments.next();
+            try
+            {
+                while (!in.isEOF())
+                {
+                    ByteBuffer indexKey = 
ByteBufferUtil.readWithShortLength(in);
+                    DecoratedKey indexDecoratedKey = 
partitioner.decorateKey(indexKey);
+                    if (indexDecoratedKey.compareTo(token) > 0)
+                        return indexDecoratedKey;
+
+                    RowIndexEntry.serializer.skip(in);
+                }
+            }
+            catch (IOException e)
+            {
+                markSuspect();
+                throw new CorruptSSTableException(e, in.getPath());
+            }
+            finally
+            {
+                FileUtils.closeQuietly(in);
+            }
+        }
+
+        return null;
+    }
+
+    /**
      * @return The length in bytes of the data for this SSTable. For
      * compressed files, this is not the same thing as the on disk size (see
      * onDiskLength())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc12d73a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java 
b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index 8681b03..6231fd7 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -82,7 +82,7 @@ public abstract class SegmentedFile
     /**
      * @return An Iterator over segments, beginning with the segment 
containing the given position: each segment must be closed after use.
      */
-    public Iterator<FileDataInput> iterator(long position, int bufferSize)
+    public Iterator<FileDataInput> iterator(long position)
     {
         return new SegmentIterator(position);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc12d73a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index a775988..bc89f4f 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -39,7 +39,9 @@ import 
org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.dht.BytesToken;
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableScanner;
@@ -343,4 +345,114 @@ public class CompactionsTest extends SchemaLoader
         cf = cfs.getColumnFamily(filter);
         assert cf == null || cf.getColumnCount() == 0 : "should be empty: " + 
cf;
     }
+
+    private static Range<Token> rangeFor(int start, int end)
+    {
+        return new Range<Token>(new BytesToken(String.format("%03d", 
start).getBytes()),
+                                new BytesToken(String.format("%03d", 
end).getBytes()));
+    }
+
+    private static Collection<Range<Token>> makeRanges(int ... keys)
+    {
+        Collection<Range<Token>> ranges = new 
ArrayList<Range<Token>>(keys.length / 2);
+        for (int i = 0; i < keys.length; i += 2)
+            ranges.add(rangeFor(keys[i], keys[i + 1]));
+        return ranges;
+    }
+
+    private static void insertRowWithKey(int key)
+    {
+        long timestamp = System.currentTimeMillis();
+        DecoratedKey decoratedKey = Util.dk(String.format("%03d", key));
+        RowMutation rm = new RowMutation(KEYSPACE1, decoratedKey.key);
+        rm.add("Standard1", ByteBufferUtil.bytes("col"), 
ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp, 1000);
+        rm.apply();
+    }
+
+    @Test
+    public void testNeedsCleanup() throws IOException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
+        store.clearUnsafe();
+
+        // disable compaction while flushing
+        store.disableAutoCompaction();
+
+        // write three groups of 9 keys: 001, 002, ... 008, 009
+        //                               101, 102, ... 108, 109
+        //                               201, 202, ... 208, 209
+        for (int i = 1; i < 10; i++)
+        {
+            insertRowWithKey(i);
+            insertRowWithKey(i + 100);
+            insertRowWithKey(i + 200);
+        }
+        store.forceBlockingFlush();
+
+        assertEquals(1, store.getSSTables().size());
+        SSTableReader sstable = store.getSSTables().iterator().next();
+
+
+        // contiguous range spans all data
+        assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 
209)));
+        assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 
210)));
+
+        // separate ranges span all data
+        assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 9,
+                                                                       100, 
109,
+                                                                       200, 
209)));
+        assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 109,
+                                                                       200, 
210)));
+        assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 9,
+                                                                       100, 
210)));
+
+        // one range is missing completely
+        assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(100, 109,
+                                                                      200, 
209)));
+        assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 9,
+                                                                      200, 
209)));
+        assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 9,
+                                                                      100, 
109)));
+
+
+        // the beginning of one range is missing
+        assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(1, 9,
+                                                                      100, 109,
+                                                                      200, 
209)));
+        assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 9,
+                                                                      101, 109,
+                                                                      200, 
209)));
+        assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 9,
+                                                                      100, 109,
+                                                                      201, 
209)));
+
+        // the end of one range is missing
+        assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 8,
+                                                                      100, 109,
+                                                                      200, 
209)));
+        assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 9,
+                                                                      100, 108,
+                                                                      200, 
209)));
+        assertTrue(CompactionManager.needsCleanup(sstable, makeRanges(0, 9,
+                                                                      100, 109,
+                                                                      200, 
208)));
+
+        // some ranges don't contain any data
+        assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 0,
+                                                                       0, 9,
+                                                                       50, 51,
+                                                                       100, 
109,
+                                                                       150, 
199,
+                                                                       200, 
209,
+                                                                       300, 
301)));
+        // same case, but with a middle range not covering some of the 
existing data
+        assertFalse(CompactionManager.needsCleanup(sstable, makeRanges(0, 0,
+                                                                       0, 9,
+                                                                       50, 51,
+                                                                       100, 
103,
+                                                                       150, 
199,
+                                                                       200, 
209,
+                                                                       300, 
301)));
+    }
 }

Reply via email to