Reduce SSTableLoader memory usage patch by jbellis; reviewed by dbrosius for CASSANDRA-5555
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/079ae68f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/079ae68f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/079ae68f Branch: refs/heads/trunk Commit: 079ae68fb7086259439491e6a10bc2d8a947f52c Parents: 2f72f8b Author: Jonathan Ellis <jbel...@apache.org> Authored: Thu May 30 23:14:40 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Thu May 30 23:14:40 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 2 +- .../apache/cassandra/io/sstable/SSTableLoader.java | 2 +- .../apache/cassandra/io/sstable/SSTableReader.java | 150 ++++++++++----- 4 files changed, 108 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/079ae68f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9d53d17..a746c09 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2.6 + * Reduce SSTableLoader memory usage (CASSANDRA-5555) * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272) * (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536) * Fix dealing with ridiculously large max sstable sizes in LCS (CASSANDRA-5589) http://git-wip-us.apache.org/repos/asf/cassandra/blob/079ae68f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 429859e..81ced05 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -232,7 +232,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (loadSSTables) { Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true); - Collection<SSTableReader> sstables = SSTableReader.batchOpen(sstableFiles.list().entrySet(), metadata, this.partitioner); + Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata, this.partitioner); if (metadata.getDefaultValidator().isCommutative()) { // Filter non-compacted sstables, remove compacted ones http://git-wip-us.apache.org/repos/asf/cassandra/blob/079ae68f/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 9965138..67c6a02 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -95,7 +95,7 @@ public class SSTableLoader try { - sstables.add(SSTableReader.open(desc, components, null, client.getPartitioner())); + sstables.add(SSTableReader.openForBatch(desc, components, client.getPartitioner())); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/079ae68f/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index ea9c451..574465d 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.*; -import com.google.common.primitives.Longs; import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,41 +153,33 @@ public class SSTableReader extends SSTable return open(descriptor, components, metadata, partitioner, true); } + public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, IPartitioner partitioner) throws IOException + { + SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner); + SSTableReader sstable = new SSTableReader(descriptor, + components, + null, + partitioner, + System.currentTimeMillis(), + sstableMetadata); + sstable.bf = new AlwaysPresentFilter(); + sstable.loadForBatch(); + return sstable; + } + private static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, boolean validate) throws IOException { - assert partitioner != null; - // Minimum components without which we can't do anything - assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor; - assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor; - long start = System.currentTimeMillis(); - logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length()); - - SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor); - - // Check if sstable is created using same partitioner. - // Partitioner can be null, which indicates older version of sstable or no stats available. - // In that case, we skip the check. - String partitionerName = partitioner.getClass().getCanonicalName(); - if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner)) - { - logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.", - descriptor, sstableMetadata.partitioner, partitionerName)); - System.exit(1); - } + SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner); SSTableReader sstable = new SSTableReader(descriptor, components, metadata, partitioner, - null, - null, - null, - null, System.currentTimeMillis(), sstableMetadata); // versions before 'c' encoded keys as utf-16 before hashing to the filter @@ -214,6 +205,30 @@ public class SSTableReader extends SSTable return sstable; } + private static SSTableMetadata openMetadata(Descriptor descriptor, Set<Component> components, IPartitioner partitioner) throws IOException + { + assert partitioner != null; + // Minimum components without which we can't do anything + assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor; + assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor; + + logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length()); + + SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor); + + // Check if sstable is created using same partitioner. + // Partitioner can be null, which indicates older version of sstable or no stats available. + // In that case, we skip the check. + String partitionerName = partitioner.getClass().getCanonicalName(); + if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner)) + { + logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.", + descriptor, sstableMetadata.partitioner, partitionerName)); + System.exit(1); + } + return sstableMetadata; + } + public static void logOpenException(Descriptor descriptor, IOException e) { if (e instanceof FileNotFoundException) @@ -222,9 +237,9 @@ public class SSTableReader extends SSTable logger.error("Corrupt sstable " + descriptor + "; skipped", e); } - public static Collection<SSTableReader> batchOpen(Set<Map.Entry<Descriptor, Set<Component>>> entries, - final CFMetaData metadata, - final IPartitioner partitioner) + public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries, + final CFMetaData metadata, + final IPartitioner partitioner) { final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>(); @@ -295,6 +310,20 @@ public class SSTableReader extends SSTable Set<Component> components, CFMetaData metadata, IPartitioner partitioner, + long maxDataAge, + SSTableMetadata sstableMetadata) + { + super(desc, components, metadata, partitioner); + this.sstableMetadata = sstableMetadata; + this.maxDataAge = maxDataAge; + + this.deletingTask = new SSTableDeletingTask(this); + } + + private SSTableReader(Descriptor desc, + Set<Component> components, + CFMetaData metadata, + IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary indexSummary, @@ -302,15 +331,12 @@ public class SSTableReader extends SSTable long maxDataAge, SSTableMetadata sstableMetadata) { - super(desc, components, metadata, partitioner); - this.sstableMetadata = sstableMetadata; - this.maxDataAge = maxDataAge; + this(desc, components, metadata, partitioner, maxDataAge, sstableMetadata); this.ifile = ifile; this.dfile = dfile; this.indexSummary = indexSummary; this.bf = bloomFilter; - this.deletingTask = new SSTableDeletingTask(this); } public void setTrackedBy(DataTracker tracker) @@ -349,16 +375,56 @@ public class SSTableReader extends SSTable { SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); SegmentedFile.Builder dbuilder = compression - ? SegmentedFile.getCompressedBuilder() - : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); + ? SegmentedFile.getCompressedBuilder() + : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); + + boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder); + if (recreatebloom || !summaryLoaded) + buildSummary(recreatebloom, ibuilder, dbuilder, summaryLoaded); + + ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); + dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); + if (recreatebloom || !summaryLoaded) // save summary information to disk + saveSummary(this, ibuilder, dbuilder); + } + + /** + * A simplified load that creates a minimal partition index + */ + private void loadForBatch() throws IOException + { + // force buffered i/o in non-compressed mode so we don't need to worry about mmap segments + SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder(); + SegmentedFile.Builder dbuilder = compression + ? SegmentedFile.getCompressedBuilder() + : new BufferedSegmentedFile.Builder(); + + // build a bare-bones IndexSummary + IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(1); + RandomAccessReader in = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true); + try + { + ByteBuffer key = ByteBufferUtil.readWithShortLength(in); + first = decodeKey(partitioner, descriptor, key); + summaryBuilder.maybeAddEntry(first, 0); + indexSummary = summaryBuilder.build(partitioner); + } + finally + { + FileUtils.closeQuietly(in); + } + + last = null; // shouldn't need this for batch operations + + ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); + dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); + } + + private void buildSummary(boolean recreatebloom, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException + { // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary. RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true); - - // try to load summaries from the disk and check if we need - // to read primary index because we should re-create a BloomFilter or pre-load KeyCache - final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder); - final boolean readIndex = recreatebloom || !summaryLoaded; try { long indexSize = primaryIndex.length(); @@ -374,7 +440,7 @@ public class SSTableReader extends SSTable summaryBuilder = new IndexSummaryBuilder(estimatedKeys); long indexPosition; - while (readIndex && (indexPosition = primaryIndex.getFilePointer()) != indexSize) + while ((indexPosition = primaryIndex.getFilePointer()) != indexSize) { ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex); RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(primaryIndex, descriptor.version); @@ -405,12 +471,6 @@ public class SSTableReader extends SSTable first = getMinimalKey(first); last = getMinimalKey(last); - // finalize the load. - // finalize the state of the reader - ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); - dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); - if (readIndex) // save summary information to disk - saveSummary(this, ibuilder, dbuilder); } public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)