avoid serializing to byte[] on commitlog append
patch by jbellis; reviewed by yukim for CASSANDRA-5199


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/baca0bc7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/baca0bc7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/baca0bc7

Branch: refs/heads/trunk
Commit: baca0bc7fe4b13bf8970f974c15d83701e02c048
Parents: 1dcf18c
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Tue Feb 5 15:44:32 2013 +0100
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Tue Feb 5 15:45:34 2013 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/db/commitlog/CommitLogReplayer.java  |    7 +++-
 .../cassandra/db/commitlog/CommitLogSegment.java   |   25 +++++++----
 .../cassandra/io/util/ByteBufferOutputStream.java  |   25 ++++++++++
 .../cassandra/io/util/ChecksummedOutputStream.java |   35 +++++++++++++++
 .../org/apache/cassandra/utils/FBUtilities.java    |    9 ++++
 6 files changed, 92 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/baca0bc7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bf95ae1..b1c066c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.3
+ * avoid serializing to byte[] on commitlog append (CASSANDRA-5199)
  * make index_interval configurable per columnfamily (CASSANDRA-3961)
  * add default_tim_to_live (CASSANDRA-3974)
  * add memtable_flush_period_in_ms (CASSANDRA-4237)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/baca0bc7/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 705053b..107e9b7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -167,9 +167,14 @@ public class CommitLogReplayer
                     // This prevents CRC by being fooled by special-case 
garbage in the file; see CASSANDRA-2128
                     if (serializedSize < 10)
                         break;
+
                     long claimedSizeChecksum = reader.readLong();
                     checksum.reset();
-                    checksum.update(serializedSize);
+                    if (CommitLogDescriptor.current_version < 
CommitLogDescriptor.VERSION_20)
+                        checksum.update(serializedSize);
+                    else
+                        FBUtilities.updateChecksumInt(checksum, 
serializedSize);
+
                     if (checksum.getValue() != claimedSizeChecksum)
                         break; // entry wasn't synced correctly/fully. that's
                                // ok.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/baca0bc7/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 469ab99..9256214 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.db.commitlog;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -38,6 +39,8 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.util.ByteBufferOutputStream;
+import org.apache.cassandra.io.util.ChecksummedOutputStream;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
@@ -69,6 +72,8 @@ public class CommitLogSegment
     private boolean needsSync = false;
 
     private final MappedByteBuffer buffer;
+    private final Checksum checksum;
+    private final DataOutputStream bufferStream;
     private boolean closed;
 
     public final CommitLogDescriptor descriptor;
@@ -122,6 +127,8 @@ public class CommitLogSegment
             
logFileAccessor.setLength(DatabaseDescriptor.getCommitLogSegmentSize());
 
             buffer = 
logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 
DatabaseDescriptor.getCommitLogSegmentSize());
+            checksum = new PureJavaCrc32();
+            bufferStream = new DataOutputStream(new 
ChecksummedOutputStream(new ByteBufferOutputStream(buffer), checksum));
             buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
             buffer.position(0);
 
@@ -204,24 +211,24 @@ public class CommitLogSegment
    /**
      * Appends a row mutation onto the commit log.  Requres that 
hasCapacityFor has already been checked.
      *
-     * @param   rowMutation   the mutation to append to the commit log.
+     * @param   mutation   the mutation to append to the commit log.
      * @return  the position of the appended mutation
      */
-    public ReplayPosition write(RowMutation rowMutation) throws IOException
+    public ReplayPosition write(RowMutation mutation) throws IOException
     {
         assert !closed;
         ReplayPosition repPos = getContext();
-        markDirty(rowMutation, repPos);
+        markDirty(mutation, repPos);
 
-        Checksum checksum = new PureJavaCrc32();
-        byte[] serializedRow = FBUtilities.serialize(rowMutation, 
RowMutation.serializer, MessagingService.current_version);
+        checksum.reset();
 
-        checksum.update(serializedRow.length);
-        buffer.putInt(serializedRow.length);
+        // checksummed length
+        int length = (int) RowMutation.serializer.serializedSize(mutation, 
MessagingService.current_version);
+        bufferStream.writeInt(length);
         buffer.putLong(checksum.getValue());
 
-        buffer.put(serializedRow);
-        checksum.update(serializedRow, 0, serializedRow.length);
+        // checksummed mutation
+        RowMutation.serializer.serialize(mutation, bufferStream, 
MessagingService.current_version);
         buffer.putLong(checksum.getValue());
 
         if (buffer.remaining() >= 4)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/baca0bc7/src/java/org/apache/cassandra/io/util/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ByteBufferOutputStream.java 
b/src/java/org/apache/cassandra/io/util/ByteBufferOutputStream.java
new file mode 100644
index 0000000..9a15a1c
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/ByteBufferOutputStream.java
@@ -0,0 +1,25 @@
+package org.apache.cassandra.io.util;
+
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class ByteBufferOutputStream extends OutputStream
+{
+    private final ByteBuffer buffer;
+
+    public ByteBufferOutputStream(ByteBuffer buffer)
+    {
+        this.buffer = buffer;
+    }
+
+    public void write(int b)
+    {
+        buffer.put((byte) b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len)
+    {
+        buffer.put(b, off, len);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/baca0bc7/src/java/org/apache/cassandra/io/util/ChecksummedOutputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedOutputStream.java 
b/src/java/org/apache/cassandra/io/util/ChecksummedOutputStream.java
new file mode 100644
index 0000000..c7f2df3
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedOutputStream.java
@@ -0,0 +1,35 @@
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.Checksum;
+
+public class ChecksummedOutputStream extends OutputStream
+{
+    private final OutputStream out;
+    private final Checksum checksum;
+
+    public ChecksummedOutputStream(OutputStream out, Checksum checksum)
+    {
+        this.out = out;
+        this.checksum = checksum;
+    }
+
+    public void resetChecksum()
+    {
+        checksum.reset();
+    }
+
+    public void write(int b) throws IOException
+    {
+        out.write(b);
+        checksum.update(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException
+    {
+        out.write(b, off, len);
+        checksum.update(b, off, len);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/baca0bc7/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java 
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index cafd856..15f7fb0 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.zip.Checksum;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.AbstractIterator;
@@ -609,6 +610,14 @@ public class FBUtilities
         }
     }
 
+    public static void updateChecksumInt(Checksum checksum, int v)
+    {
+        checksum.update((v >>> 24) & 0xFF);
+        checksum.update((v >>> 16) & 0xFF);
+        checksum.update((v >>> 8) & 0xFF);
+        checksum.update((v >>> 0) & 0xFF);
+    }
+
     private static final class WrappedCloseableIterator<T>
         extends AbstractIterator<T> implements CloseableIterator<T>
     {

Reply via email to