pool [Compressed]RandomAccessReader objects on the partitioned read path; 
creating them is expensive
patch by jbellis; reviewed by xedin and slebresne for CASSANDRA-4942


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/edcc7f13
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/edcc7f13
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/edcc7f13

Branch: refs/heads/cassandra-1.2
Commit: edcc7f137f573eb6aa38fe4f5b79c22de7811342
Parents: 6773383
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Fri Nov 9 23:03:51 2012 -0600
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Wed Nov 14 10:23:51 2012 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    6 ++-
 .../io/compress/CompressedRandomAccessReader.java  |   27 +++++++---
 .../cassandra/io/util/BufferedSegmentedFile.java   |   13 +----
 .../cassandra/io/util/CompressedSegmentedFile.java |   10 ++--
 .../cassandra/io/util/PoolingSegmentedFile.java    |   41 +++++++++++++++
 .../cassandra/io/util/RandomAccessReader.java      |   41 ++++++++++++---
 .../compress/CompressedRandomAccessReaderTest.java |    4 +-
 .../io/util/BufferedRandomAccessFileTest.java      |    4 +-
 8 files changed, 110 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcc7f13/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 900c9ef..be34e89 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,11 @@
 1.2.1
- * Add debug logging to list filenames processed by Directories.migrateFile 
method (CASSANDRA-4939)
+ * pool [Compressed]RandomAccessReader objects on the partitioned read path
+   (CASSANDRA-4942)
+ * Add debug logging to list filenames processed by Directories.migrateFile 
+   method (CASSANDRA-4939)
  * Expose black-listed directories via JMX (CASSANDRA-4848)
 
+
 1.2-rc1
  * fix cqlsh DESCRIBE command (CASSANDRA-4913)
  * save truncation position in system table (CASSANDRA-4906)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcc7f13/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java 
b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index da35e92..3b0c5ba 100644
--- 
a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ 
b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -25,26 +25,36 @@ import java.nio.channels.FileChannel;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
 
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.util.CompressedSegmentedFile;
+import org.apache.cassandra.io.util.PoolingSegmentedFile;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.FBUtilities;
 
 // TODO refactor this to separate concept of "buffer to avoid lots of read() 
syscalls" and "compression buffer"
 public class CompressedRandomAccessReader extends RandomAccessReader
 {
-    public static RandomAccessReader open(String dataFilePath, 
CompressionMetadata metadata)
+    public static CompressedRandomAccessReader open(String path, 
CompressionMetadata metadata, CompressedSegmentedFile owner)
     {
-        return open(dataFilePath, metadata, false);
+        try
+        {
+            return new CompressedRandomAccessReader(path, metadata, false, 
owner);
+        }
+        catch (FileNotFoundException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
-    public static RandomAccessReader open(String dataFilePath, 
CompressionMetadata metadata, boolean skipIOCache)
+    public static CompressedRandomAccessReader open(String dataFilePath, 
CompressionMetadata metadata, boolean skipIOCache)
     {
         try
         {
-            return new CompressedRandomAccessReader(dataFilePath, metadata, 
skipIOCache);
+            return new CompressedRandomAccessReader(dataFilePath, metadata, 
skipIOCache, null);
         }
         catch (FileNotFoundException e)
         {
@@ -65,9 +75,9 @@ public class CompressedRandomAccessReader extends 
RandomAccessReader
     private final FileInputStream source;
     private final FileChannel channel;
 
-    public CompressedRandomAccessReader(String dataFilePath, 
CompressionMetadata metadata, boolean skipIOCache) throws FileNotFoundException
+    private CompressedRandomAccessReader(String dataFilePath, 
CompressionMetadata metadata, boolean skipIOCache, PoolingSegmentedFile owner) 
throws FileNotFoundException
     {
-        super(new File(dataFilePath), metadata.chunkLength(), skipIOCache);
+        super(new File(dataFilePath), metadata.chunkLength(), skipIOCache, 
owner);
         this.metadata = metadata;
         compressed = new 
byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())];
         // can't use super.read(...) methods
@@ -155,9 +165,10 @@ public class CompressedRandomAccessReader extends 
RandomAccessReader
     }
 
     @Override
-    public void close()
+    public void deallocate()
     {
-        super.close();
+        super.deallocate();
+
         try
         {
             source.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcc7f13/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java 
b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index 2c8b89e..49972c8 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.io.util;
 
 import java.io.File;
 
-public class BufferedSegmentedFile extends SegmentedFile
+public class BufferedSegmentedFile extends PoolingSegmentedFile
 {
     public BufferedSegmentedFile(String path, long length)
     {
@@ -49,15 +49,8 @@ public class BufferedSegmentedFile extends SegmentedFile
         }
     }
 
-    public FileDataInput getSegment(long position)
+    protected RandomAccessReader createReader(String path)
     {
-        RandomAccessReader file = RandomAccessReader.open(new File(path));
-        file.seek(position);
-        return file;
-    }
-
-    public void cleanup()
-    {
-        // nothing to do
+        return RandomAccessReader.open(new File(path), this);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcc7f13/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java 
b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index 7280dcd..e106be7 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.io.util;
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 
-public class CompressedSegmentedFile extends SegmentedFile
+public class CompressedSegmentedFile extends PoolingSegmentedFile
 {
     public final CompressionMetadata metadata;
 
@@ -52,15 +52,15 @@ public class CompressedSegmentedFile extends SegmentedFile
         }
     }
 
-    public FileDataInput getSegment(long position)
+    protected RandomAccessReader createReader(String path)
     {
-        RandomAccessReader file = CompressedRandomAccessReader.open(path, 
metadata);
-        file.seek(position);
-        return file;
+        return CompressedRandomAccessReader.open(path, metadata, this);
     }
 
+    @Override
     public void cleanup()
     {
+        super.cleanup();
         metadata.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcc7f13/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java 
b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
new file mode 100644
index 0000000..2e0acfc
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
@@ -0,0 +1,41 @@
+package org.apache.cassandra.io.util;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public abstract class PoolingSegmentedFile extends SegmentedFile
+{
+    public final Queue<RandomAccessReader> pool = new 
ConcurrentLinkedQueue<RandomAccessReader>();
+
+    protected PoolingSegmentedFile(String path, long length)
+    {
+        super(path, length);
+    }
+
+    protected PoolingSegmentedFile(String path, long length, long onDiskLength)
+    {
+        super(path, length, onDiskLength);
+    }
+
+    public FileDataInput getSegment(long position)
+    {
+        RandomAccessReader reader = pool.poll();
+        if (reader == null)
+            reader = createReader(path);
+        reader.seek(position);
+        return reader;
+    }
+
+    protected abstract RandomAccessReader createReader(String path);
+
+    public void recycle(RandomAccessReader reader)
+    {
+        pool.add(reader);
+    }
+
+    public void cleanup()
+    {
+        for (RandomAccessReader reader : pool)
+            reader.deallocate();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcc7f13/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java 
b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 06778d9..3210372 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -60,11 +60,14 @@ public class RandomAccessReader extends RandomAccessFile 
implements FileDataInpu
 
     private final long fileLength;
 
-    // used in tests
-    public RandomAccessReader(File file, int bufferSize, boolean skipIOCache) 
throws FileNotFoundException
+    protected final PoolingSegmentedFile owner;
+
+    protected RandomAccessReader(File file, int bufferSize, boolean 
skipIOCache, PoolingSegmentedFile owner) throws FileNotFoundException
     {
         super(file, "r");
 
+        this.owner = owner;
+
         channel = super.getChannel();
         filePath = file.getAbsolutePath();
 
@@ -101,17 +104,22 @@ public class RandomAccessReader extends RandomAccessFile 
implements FileDataInpu
         return open(file, false);
     }
 
+    public static RandomAccessReader open(File file, PoolingSegmentedFile 
owner)
+    {
+        return open(file, DEFAULT_BUFFER_SIZE, false, owner);
+    }
+
     public static RandomAccessReader open(File file, boolean skipIOCache)
     {
-        return open(file, DEFAULT_BUFFER_SIZE, skipIOCache);
+        return open(file, DEFAULT_BUFFER_SIZE, skipIOCache, null);
     }
 
     @VisibleForTesting
-    static RandomAccessReader open(File file, int bufferSize, boolean 
skipIOCache)
+    static RandomAccessReader open(File file, int bufferSize, boolean 
skipIOCache, PoolingSegmentedFile owner)
     {
         try
         {
-            return new RandomAccessReader(file, bufferSize, skipIOCache);
+            return new RandomAccessReader(file, bufferSize, skipIOCache, 
owner);
         }
         catch (FileNotFoundException e)
         {
@@ -120,9 +128,9 @@ public class RandomAccessReader extends RandomAccessFile 
implements FileDataInpu
     }
 
     @VisibleForTesting
-    public static RandomAccessReader open(SequentialWriter writer)
+    static RandomAccessReader open(SequentialWriter writer)
     {
-        return open(new File(writer.getPath()), DEFAULT_BUFFER_SIZE, false);
+        return open(new File(writer.getPath()), DEFAULT_BUFFER_SIZE, false, 
null);
     }
 
     /**
@@ -237,7 +245,24 @@ public class RandomAccessReader extends RandomAccessFile 
implements FileDataInpu
     @Override
     public void close()
     {
-        buffer = null;
+        if (owner == null || buffer == null)
+        {
+            // The buffer == null check is so that if the pool owner has 
deallocated us, calling close()
+            // will re-call deallocate rather than recycling a deallocated 
object.
+            // I'd be more comfortable if deallocate didn't have to handle 
being idempotent like that,
+            // but RandomAccessFile.close will call 
AbstractInterruptibleChannel.close which will
+            // re-call RAF.close -- in this case, [C]RAR.close since we are 
overriding that.
+            deallocate();
+        }
+        else
+        {
+            owner.recycle(this);
+        }
+    }
+
+    public void deallocate()
+    {
+        buffer = null; // makes sure we don't use this after it's ostensibly 
closed
 
         if (skipIOCache && bytesSinceCacheFlush > 0)
             CLibrary.trySkipCache(fd, 0, 0);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcc7f13/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
 
b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index eabb489..830c3e1 100644
--- 
a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ 
b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -75,8 +75,8 @@ public class CompressedRandomAccessReaderTest
 
             assert f.exists();
             RandomAccessReader reader = compressed
-                ? new CompressedRandomAccessReader(filename, new 
CompressionMetadata(filename + ".metadata", f.length()), false)
-                : new RandomAccessReader(f, 
CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
+                                      ? 
CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + 
".metadata", f.length()), false)
+                                      : RandomAccessReader.open(f);
             String expected = "The quick brown fox jumps over the lazy dog";
             assertEquals(expected.length(), reader.length());
             byte[] b = new byte[expected.length()];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcc7f13/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java 
b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
index e7fa8e3..8059bbd 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
@@ -338,7 +338,7 @@ public class BufferedRandomAccessFileTest
             for (final int offset : Arrays.asList(0, 8))
             {
                 File file1 = writeTemporaryFile(new byte[16]);
-                final RandomAccessReader file = RandomAccessReader.open(file1, 
bufferSize, false);
+                final RandomAccessReader file = RandomAccessReader.open(file1, 
bufferSize, false, null);
                 expectEOF(new Callable<Object>()
                 {
                     public Object call() throws IOException
@@ -353,7 +353,7 @@ public class BufferedRandomAccessFileTest
             for (final int n : Arrays.asList(1, 2, 4, 8))
             {
                 File file1 = writeTemporaryFile(new byte[16]);
-                final RandomAccessReader file = RandomAccessReader.open(file1, 
bufferSize, false);
+                final RandomAccessReader file = RandomAccessReader.open(file1, 
bufferSize, false, null);
                 expectEOF(new Callable<Object>()
                 {
                     public Object call() throws IOException

Reply via email to