Updated Branches:
  refs/heads/trunk 71a50b73d -> 2ce8274c6

fixes for small-sstable compaction
patch by slebresne; reviewed by jbellis for CASSANDRA-4341


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2ce8274c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2ce8274c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2ce8274c

Branch: refs/heads/trunk
Commit: 2ce8274c600dba00228c4c14116840a8ec6dceb0
Parents: 71a50b7
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Fri Jun 22 13:15:51 2012 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Fri Jun 22 13:15:51 2012 -0500

----------------------------------------------------------------------
 .../cassandra/db/compaction/CompactionTask.java    |    4 +-
 .../db/compaction/LeveledCompactionTask.java       |    4 +-
 .../cassandra/db/compaction/LeveledManifest.java   |   21 ++++++++++-----
 .../io/compress/CompressedSequentialWriter.java    |    6 ++++
 .../apache/cassandra/io/sstable/SSTableWriter.java |    5 +++
 .../apache/cassandra/io/util/SequentialWriter.java |   12 ++++++++
 6 files changed, 41 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce8274c/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 4e167f3..468c9a9 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -166,7 +166,7 @@ public class CompactionTask extends AbstractCompactionTask
                         }
                     }
                 }
-                if (!nni.hasNext() || 
newSSTableSegmentThresholdReached(writer, indexEntry.position))
+                if (!nni.hasNext() || 
newSSTableSegmentThresholdReached(writer))
                 {
                     SSTableReader toIndex = 
writer.closeAndOpenReader(getMaxDataAge(toCompact));
                     cachedKeyMap.put(toIndex, cachedKeys);
@@ -226,7 +226,7 @@ public class CompactionTask extends AbstractCompactionTask
     }
 
     //extensibility point for other strategies that may want to limit the 
upper bounds of the sstable segment size
-    protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer, 
long position)
+    protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) 
throws IOException
     {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce8274c/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index 59cd55f..ef290f9 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -62,9 +62,9 @@ public class LeveledCompactionTask extends CompactionTask
     }
 
     @Override
-    protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer, 
long position)
+    protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) 
throws IOException
     {
-        return position > sstableSizeInMB * 1024L * 1024L;
+        return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce8274c/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index b8297c7..eb82e0d 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -176,7 +176,7 @@ public class LeveledManifest
             return;
 
         int newLevel;
-        if (minimumLevel == 0 && maximumLevel == 0 && 
SSTable.getTotalBytes(removed) < maxSSTableSizeInBytes)
+        if (minimumLevel == 0 && maximumLevel == 0 && 
SSTable.getTotalBytes(removed) <= maxSSTableSizeInBytes)
         {
             // special case for tiny L0 sstables; see CASSANDRA-4341
             newLevel = 0;
@@ -278,7 +278,7 @@ public class LeveledManifest
             logger.debug("Compaction score for level {} is {}", i, score);
 
             // L0 gets a special case that if we don't have anything more 
important to do,
-            // we'll go ahead and compact even just one sstable
+            // we'll go ahead and compact if we have more than one sstable
             if (score > 1.001 || (i == 0 && sstables.size() > 1))
             {
                 Collection<SSTableReader> candidates = getCandidatesFor(i);
@@ -355,9 +355,9 @@ public class LeveledManifest
         sstableGenerations.put(sstable, Integer.valueOf(level));
     }
 
-    private static List<SSTableReader> overlapping(SSTableReader sstable, 
Iterable<SSTableReader> candidates)
+    private static Set<SSTableReader> overlapping(SSTableReader sstable, 
Iterable<SSTableReader> candidates)
     {
-        List<SSTableReader> overlapped = new ArrayList<SSTableReader>();
+        Set<SSTableReader> overlapped = new HashSet<SSTableReader>();
         overlapped.add(sstable);
 
         Range<Token> promotedRange = new Range<Token>(sstable.first.token, 
sstable.last.token);
@@ -383,7 +383,7 @@ public class LeveledManifest
             // 1a. add sstables to the candidate set until we have at least 
maxSSTableSizeInMB
             // 1b. prefer choosing older sstables as candidates, to newer ones
             // 1c. any L0 sstables that overlap a candidate, will also become 
candidates
-            // 2. At most MAX_COMPACTING_L0 sstables will be compacted at once
+            // 2. At most MAX_COMPACTING_L0 sstables from L0 will be compacted 
at once
             // 3. If total candidate size is less than maxSSTableSizeInMB, we 
won't bother compacting with L1,
             //    and the result of the compaction will stay in L0 instead of 
being promoted (see promote())
             //
@@ -413,7 +413,14 @@ public class LeveledManifest
                     // limit to only the MAX_COMPACTING_L0 oldest candidates
                     List<SSTableReader> ageSortedCandidates = new 
ArrayList<SSTableReader>(candidates);
                     Collections.sort(ageSortedCandidates, 
SSTable.maxTimestampComparator);
-                    return ageSortedCandidates.subList(0, MAX_COMPACTING_L0);
+                    candidates = new 
HashSet<SSTableReader>(ageSortedCandidates.subList(0, MAX_COMPACTING_L0));
+                    if (SSTable.getTotalBytes(candidates) > 
maxSSTableSizeInBytes)
+                    {
+                        // add sstables from L1 that overlap candidates
+                        for (SSTableReader candidate : new 
ArrayList<SSTableReader>(candidates))
+                            candidates.addAll(overlapping(candidate, 
generations[1]));
+                    }
+                    return candidates;
                 }
 
                 if (SSTable.getTotalBytes(candidates) > maxSSTableSizeInBytes)
@@ -448,7 +455,7 @@ public class LeveledManifest
         while (true)
         {
             SSTableReader sstable = generations[level].get(i);
-            List<SSTableReader> candidates = overlapping(sstable, 
generations[(level + 1)]);
+            Set<SSTableReader> candidates = overlapping(sstable, 
generations[(level + 1)]);
             for (SSTableReader candidate : candidates)
             {
                 if (candidate.isMarkedSuspect())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce8274c/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java 
b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 67ec3f0..fdcf1b8 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -68,6 +68,12 @@ public class CompressedSequentialWriter extends 
SequentialWriter
     }
 
     @Override
+    public long getOnDiskFilePointer() throws IOException
+    {
+        return out.getFilePointer();
+    }
+
+    @Override
     public void sync() throws IOException
     {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce8274c/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 4219f64..2fc9771 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -373,6 +373,11 @@ public class SSTableWriter extends SSTable
         return dataFile.getFilePointer();
     }
 
+    public long getOnDiskFilePointer() throws IOException
+    {
+        return dataFile.getOnDiskFilePointer();
+    }
+
     /**
      * Encapsulates writing the index and filter for an SSTable. The state of 
this object is not valid until it has been closed.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ce8274c/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java 
b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 961cc48..8b78730 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -244,6 +244,18 @@ public class SequentialWriter extends OutputStream
         return current;
     }
 
+    /**
+     * Return the current file pointer of the underlying on-disk file.
+     * Note that since write works by buffering data, the value of this will 
increase by buffer
+     * size and not every write to the writer will modify this value.
+     * Furthermore, for compressed files, this value refers to compressed 
data, while the
+     * writer getFilePointer() refers to uncompressedFile
+     */
+    public long getOnDiskFilePointer() throws IOException
+    {
+        return getFilePointer();
+    }
+
     public long length() throws IOException
     {
         return Math.max(Math.max(current, out.length()), bufferOffset + 
validBufferBytes);

Reply via email to