Updated Branches: refs/heads/cassandra-1.1 a846cd659 -> 80d7d43e2
fixes for small-sstable compaction patch by slebresne; reviewed by jbellis for CASSANDRA-4341 Conflicts: src/java/org/apache/cassandra/db/compaction/CompactionTask.java Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/80d7d43e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/80d7d43e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/80d7d43e Branch: refs/heads/cassandra-1.1 Commit: 80d7d43e2952d7612ab4406bca800c2b6f30c85f Parents: a846cd6 Author: Jonathan Ellis <jbel...@apache.org> Authored: Fri Jun 22 13:15:51 2012 -0500 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Tue Jun 26 17:53:28 2012 +0200 ---------------------------------------------------------------------- .../cassandra/db/compaction/CompactionTask.java | 4 +- .../db/compaction/LeveledCompactionTask.java | 4 +- .../cassandra/db/compaction/LeveledManifest.java | 21 ++++++++++----- .../io/compress/CompressedSequentialWriter.java | 6 ++++ .../apache/cassandra/io/sstable/SSTableWriter.java | 5 +++ .../apache/cassandra/io/util/SequentialWriter.java | 12 ++++++++ 6 files changed, 41 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/80d7d43e/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index a697b1f..e9fcdcd 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -170,7 +170,7 @@ public class CompactionTask extends AbstractCompactionTask } } } - if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer, position)) + if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer)) { SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact)); cachedKeyMap.put(toIndex, cachedKeys); @@ -230,7 +230,7 @@ public class CompactionTask extends AbstractCompactionTask } //extensibility point for other strategies that may want to limit the upper bounds of the sstable segment size - protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer, long position) + protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) throws IOException { return false; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/80d7d43e/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java index 2512f9d..3e58379 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java @@ -66,9 +66,9 @@ public class LeveledCompactionTask extends CompactionTask } @Override - protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer, long position) + protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer) throws IOException { - return position > sstableSizeInMB * 1024L * 1024L; + return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L; } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/80d7d43e/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index a53d519..dca8b7d 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -179,7 +179,7 @@ public class LeveledManifest return; int newLevel; - if (minimumLevel == 0 && maximumLevel == 0 && SSTable.getTotalBytes(removed) < maxSSTableSizeInBytes) + if (minimumLevel == 0 && maximumLevel == 0 && SSTable.getTotalBytes(removed) <= maxSSTableSizeInBytes) { // special case for tiny L0 sstables; see CASSANDRA-4341 newLevel = 0; @@ -273,7 +273,7 @@ public class LeveledManifest logger.debug("Compaction score for level {} is {}", i, score); // L0 gets a special case that if we don't have anything more important to do, - // we'll go ahead and compact even just one sstable + // we'll go ahead and compact if we have more than one sstable if (score > 1.001 || (i == 0 && sstables.size() > 1)) { Collection<SSTableReader> candidates = getCandidatesFor(i); @@ -350,9 +350,9 @@ public class LeveledManifest sstableGenerations.put(sstable, Integer.valueOf(level)); } - private static List<SSTableReader> overlapping(SSTableReader sstable, Iterable<SSTableReader> candidates) + private static Set<SSTableReader> overlapping(SSTableReader sstable, Iterable<SSTableReader> candidates) { - List<SSTableReader> overlapped = new ArrayList<SSTableReader>(); + Set<SSTableReader> overlapped = new HashSet<SSTableReader>(); overlapped.add(sstable); Range<Token> promotedRange = new Range<Token>(sstable.first.token, sstable.last.token); @@ -378,7 +378,7 @@ public class LeveledManifest // 1a. add sstables to the candidate set until we have at least maxSSTableSizeInMB // 1b. prefer choosing older sstables as candidates, to newer ones // 1c. any L0 sstables that overlap a candidate, will also become candidates - // 2. At most MAX_COMPACTING_L0 sstables will be compacted at once + // 2. At most MAX_COMPACTING_L0 sstables from L0 will be compacted at once // 3. If total candidate size is less than maxSSTableSizeInMB, we won't bother compacting with L1, // and the result of the compaction will stay in L0 instead of being promoted (see promote()) // @@ -408,7 +408,14 @@ public class LeveledManifest // limit to only the MAX_COMPACTING_L0 oldest candidates List<SSTableReader> ageSortedCandidates = new ArrayList<SSTableReader>(candidates); Collections.sort(ageSortedCandidates, SSTable.maxTimestampComparator); - return ageSortedCandidates.subList(0, MAX_COMPACTING_L0); + candidates = new HashSet<SSTableReader>(ageSortedCandidates.subList(0, MAX_COMPACTING_L0)); + if (SSTable.getTotalBytes(candidates) > maxSSTableSizeInBytes) + { + // add sstables from L1 that overlap candidates + for (SSTableReader candidate : new ArrayList<SSTableReader>(candidates)) + candidates.addAll(overlapping(candidate, generations[1])); + } + return candidates; } if (SSTable.getTotalBytes(candidates) > maxSSTableSizeInBytes) @@ -443,7 +450,7 @@ public class LeveledManifest while (true) { SSTableReader sstable = generations[level].get(i); - List<SSTableReader> candidates = overlapping(sstable, generations[(level + 1)]); + Set<SSTableReader> candidates = overlapping(sstable, generations[(level + 1)]); for (SSTableReader candidate : candidates) { if (candidate.isMarkedSuspect()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/80d7d43e/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index 6a466f7..7c924fe 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -69,6 +69,12 @@ public class CompressedSequentialWriter extends SequentialWriter } @Override + public long getOnDiskFilePointer() throws IOException + { + return out.getFilePointer(); + } + + @Override public void sync() throws IOException { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/80d7d43e/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index d74514f..3e7e7a0 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -383,6 +383,11 @@ public class SSTableWriter extends SSTable return dataFile.getFilePointer(); } + public long getOnDiskFilePointer() throws IOException + { + return dataFile.getOnDiskFilePointer(); + } + /** * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/80d7d43e/src/java/org/apache/cassandra/io/util/SequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index 70098cb..0f0bc9e 100644 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@ -245,6 +245,18 @@ public class SequentialWriter extends OutputStream return current; } + /** + * Return the current file pointer of the underlying on-disk file. + * Note that since write works by buffering data, the value of this will increase by buffer + * size and not every write to the writer will modify this value. + * Furthermore, for compressed files, this value refers to compressed data, while the + * writer getFilePointer() refers to uncompressedFile + */ + public long getOnDiskFilePointer() throws IOException + { + return getFilePointer(); + } + public long length() throws IOException { return Math.max(Math.max(current, out.length()), bufferOffset + validBufferBytes);