Reduce SSTableLoader memory usage
patch by jbellis; reviewed by dbrosius for CASSANDRA-5555


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/079ae68f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/079ae68f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/079ae68f

Branch: refs/heads/trunk
Commit: 079ae68fb7086259439491e6a10bc2d8a947f52c
Parents: 2f72f8b
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Thu May 30 23:14:40 2013 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Thu May 30 23:14:40 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |    2 +-
 .../apache/cassandra/io/sstable/SSTableLoader.java |    2 +-
 .../apache/cassandra/io/sstable/SSTableReader.java |  150 ++++++++++-----
 4 files changed, 108 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/079ae68f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9d53d17..a746c09 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.6
+ * Reduce SSTableLoader memory usage (CASSANDRA-5555)
  * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
  * (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536)
  * Fix dealing with ridiculously large max sstable sizes in LCS 
(CASSANDRA-5589)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/079ae68f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 429859e..81ced05 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -232,7 +232,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         if (loadSSTables)
         {
             Directories.SSTableLister sstableFiles = 
directories.sstableLister().skipTemporary(true);
-            Collection<SSTableReader> sstables = 
SSTableReader.batchOpen(sstableFiles.list().entrySet(), metadata, 
this.partitioner);
+            Collection<SSTableReader> sstables = 
SSTableReader.openAll(sstableFiles.list().entrySet(), metadata, 
this.partitioner);
             if (metadata.getDefaultValidator().isCommutative())
             {
                 // Filter non-compacted sstables, remove compacted ones

http://git-wip-us.apache.org/repos/asf/cassandra/blob/079ae68f/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 9965138..67c6a02 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -95,7 +95,7 @@ public class SSTableLoader
 
                 try
                 {
-                    sstables.add(SSTableReader.open(desc, components, null, 
client.getPartitioner()));
+                    sstables.add(SSTableReader.openForBatch(desc, components, 
client.getPartitioner()));
                 }
                 catch (IOException e)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/079ae68f/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index ea9c451..574465d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.*;
 
-import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -154,41 +153,33 @@ public class SSTableReader extends SSTable
         return open(descriptor, components, metadata, partitioner, true);
     }
 
+    public static SSTableReader openForBatch(Descriptor descriptor, 
Set<Component> components, IPartitioner partitioner) throws IOException
+    {
+        SSTableMetadata sstableMetadata = openMetadata(descriptor, components, 
partitioner);
+        SSTableReader sstable = new SSTableReader(descriptor,
+                                                  components,
+                                                  null,
+                                                  partitioner,
+                                                  System.currentTimeMillis(),
+                                                  sstableMetadata);
+        sstable.bf = new AlwaysPresentFilter();
+        sstable.loadForBatch();
+        return sstable;
+    }
+
     private static SSTableReader open(Descriptor descriptor,
                                       Set<Component> components,
                                       CFMetaData metadata,
                                       IPartitioner partitioner,
                                       boolean validate) throws IOException
     {
-        assert partitioner != null;
-        // Minimum components without which we can't do anything
-        assert components.contains(Component.DATA) : "Data component is 
missing for sstable" + descriptor;
-        assert components.contains(Component.PRIMARY_INDEX) : "Primary index 
component is missing for sstable " + descriptor;
-
         long start = System.currentTimeMillis();
-        logger.info("Opening {} ({} bytes)", descriptor, new 
File(descriptor.filenameFor(COMPONENT_DATA)).length());
-
-        SSTableMetadata sstableMetadata = 
SSTableMetadata.serializer.deserialize(descriptor);
-
-        // Check if sstable is created using same partitioner.
-        // Partitioner can be null, which indicates older version of sstable 
or no stats available.
-        // In that case, we skip the check.
-        String partitionerName = partitioner.getClass().getCanonicalName();
-        if (sstableMetadata.partitioner != null && 
!partitionerName.equals(sstableMetadata.partitioner))
-        {
-            logger.error(String.format("Cannot open %s; partitioner %s does 
not match system partitioner %s.  Note that the default partitioner starting 
with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to 
match your old partitioner if upgrading.",
-                                       descriptor, 
sstableMetadata.partitioner, partitionerName));
-            System.exit(1);
-        }
+        SSTableMetadata sstableMetadata = openMetadata(descriptor, components, 
partitioner);
 
         SSTableReader sstable = new SSTableReader(descriptor,
                                                   components,
                                                   metadata,
                                                   partitioner,
-                                                  null,
-                                                  null,
-                                                  null,
-                                                  null,
                                                   System.currentTimeMillis(),
                                                   sstableMetadata);
         // versions before 'c' encoded keys as utf-16 before hashing to the 
filter
@@ -214,6 +205,30 @@ public class SSTableReader extends SSTable
         return sstable;
     }
 
+    private static SSTableMetadata openMetadata(Descriptor descriptor, 
Set<Component> components, IPartitioner partitioner) throws IOException
+    {
+        assert partitioner != null;
+        // Minimum components without which we can't do anything
+        assert components.contains(Component.DATA) : "Data component is 
missing for sstable" + descriptor;
+        assert components.contains(Component.PRIMARY_INDEX) : "Primary index 
component is missing for sstable " + descriptor;
+
+        logger.info("Opening {} ({} bytes)", descriptor, new 
File(descriptor.filenameFor(COMPONENT_DATA)).length());
+
+        SSTableMetadata sstableMetadata = 
SSTableMetadata.serializer.deserialize(descriptor);
+
+        // Check if sstable is created using same partitioner.
+        // Partitioner can be null, which indicates older version of sstable 
or no stats available.
+        // In that case, we skip the check.
+        String partitionerName = partitioner.getClass().getCanonicalName();
+        if (sstableMetadata.partitioner != null && 
!partitionerName.equals(sstableMetadata.partitioner))
+        {
+            logger.error(String.format("Cannot open %s; partitioner %s does 
not match system partitioner %s.  Note that the default partitioner starting 
with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to 
match your old partitioner if upgrading.",
+                                       descriptor, 
sstableMetadata.partitioner, partitionerName));
+            System.exit(1);
+        }
+        return sstableMetadata;
+    }
+
     public static void logOpenException(Descriptor descriptor, IOException e)
     {
         if (e instanceof FileNotFoundException)
@@ -222,9 +237,9 @@ public class SSTableReader extends SSTable
             logger.error("Corrupt sstable " + descriptor + "; skipped", e);
     }
 
-    public static Collection<SSTableReader> 
batchOpen(Set<Map.Entry<Descriptor, Set<Component>>> entries,
-                                                      final CFMetaData 
metadata,
-                                                      final IPartitioner 
partitioner)
+    public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, 
Set<Component>>> entries,
+                                                    final CFMetaData metadata,
+                                                    final IPartitioner 
partitioner)
     {
         final Collection<SSTableReader> sstables = new 
LinkedBlockingQueue<SSTableReader>();
 
@@ -295,6 +310,20 @@ public class SSTableReader extends SSTable
                           Set<Component> components,
                           CFMetaData metadata,
                           IPartitioner partitioner,
+                          long maxDataAge,
+                          SSTableMetadata sstableMetadata)
+    {
+        super(desc, components, metadata, partitioner);
+        this.sstableMetadata = sstableMetadata;
+        this.maxDataAge = maxDataAge;
+
+        this.deletingTask = new SSTableDeletingTask(this);
+    }
+
+    private SSTableReader(Descriptor desc,
+                          Set<Component> components,
+                          CFMetaData metadata,
+                          IPartitioner partitioner,
                           SegmentedFile ifile,
                           SegmentedFile dfile,
                           IndexSummary indexSummary,
@@ -302,15 +331,12 @@ public class SSTableReader extends SSTable
                           long maxDataAge,
                           SSTableMetadata sstableMetadata)
     {
-        super(desc, components, metadata, partitioner);
-        this.sstableMetadata = sstableMetadata;
-        this.maxDataAge = maxDataAge;
+        this(desc, components, metadata, partitioner, maxDataAge, 
sstableMetadata);
 
         this.ifile = ifile;
         this.dfile = dfile;
         this.indexSummary = indexSummary;
         this.bf = bloomFilter;
-        this.deletingTask = new SSTableDeletingTask(this);
     }
 
     public void setTrackedBy(DataTracker tracker)
@@ -349,16 +375,56 @@ public class SSTableReader extends SSTable
     {
         SegmentedFile.Builder ibuilder = 
SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
         SegmentedFile.Builder dbuilder = compression
-                                          ? 
SegmentedFile.getCompressedBuilder()
-                                          : 
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+                                       ? SegmentedFile.getCompressedBuilder()
+                                       : 
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+
 
+        boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder);
+        if (recreatebloom || !summaryLoaded)
+            buildSummary(recreatebloom, ibuilder, dbuilder, summaryLoaded);
+
+        ifile = 
ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+        dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+        if (recreatebloom || !summaryLoaded) // save summary information to 
disk
+            saveSummary(this, ibuilder, dbuilder);
+    }
+
+    /**
+     * A simplified load that creates a minimal partition index
+     */
+    private void loadForBatch() throws IOException
+    {
+        // force buffered i/o in non-compressed mode so we don't need to worry 
about mmap segments
+        SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
+        SegmentedFile.Builder dbuilder = compression
+                                         ? SegmentedFile.getCompressedBuilder()
+                                         : new BufferedSegmentedFile.Builder();
+
+        // build a bare-bones IndexSummary
+        IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(1);
+        RandomAccessReader in = RandomAccessReader.open(new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
+        try
+        {
+            ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
+            first = decodeKey(partitioner, descriptor, key);
+            summaryBuilder.maybeAddEntry(first, 0);
+            indexSummary = summaryBuilder.build(partitioner);
+        }
+        finally
+        {
+            FileUtils.closeQuietly(in);
+        }
+
+        last = null; // shouldn't need this for batch operations
+
+        ifile = 
ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+        dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+    }
+
+    private void buildSummary(boolean recreatebloom, SegmentedFile.Builder 
ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws 
IOException
+    {
         // we read the positions in a BRAF so we don't have to worry about an 
entry spanning a mmap boundary.
         RandomAccessReader primaryIndex = RandomAccessReader.open(new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
-
-        // try to load summaries from the disk and check if we need
-        // to read primary index because we should re-create a BloomFilter or 
pre-load KeyCache
-        final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder);
-        final boolean readIndex = recreatebloom || !summaryLoaded;
         try
         {
             long indexSize = primaryIndex.length();
@@ -374,7 +440,7 @@ public class SSTableReader extends SSTable
                 summaryBuilder = new IndexSummaryBuilder(estimatedKeys);
 
             long indexPosition;
-            while (readIndex && (indexPosition = 
primaryIndex.getFilePointer()) != indexSize)
+            while ((indexPosition = primaryIndex.getFilePointer()) != 
indexSize)
             {
                 ByteBuffer key = 
ByteBufferUtil.readWithShortLength(primaryIndex);
                 RowIndexEntry indexEntry = 
RowIndexEntry.serializer.deserialize(primaryIndex, descriptor.version);
@@ -405,12 +471,6 @@ public class SSTableReader extends SSTable
 
         first = getMinimalKey(first);
         last = getMinimalKey(last);
-        // finalize the load.
-        // finalize the state of the reader
-        ifile = 
ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
-        dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
-        if (readIndex) // save summary information to disk
-            saveSummary(this, ibuilder, dbuilder);
     }
 
     public static boolean loadSummary(SSTableReader reader, 
SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)

Reply via email to