avoid serializing to byte[] on commitlog append patch by jbellis; reviewed by yukim for CASSANDRA-5199
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/baca0bc7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/baca0bc7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/baca0bc7 Branch: refs/heads/trunk Commit: baca0bc7fe4b13bf8970f974c15d83701e02c048 Parents: 1dcf18c Author: Jonathan Ellis <jbel...@apache.org> Authored: Tue Feb 5 15:44:32 2013 +0100 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Tue Feb 5 15:45:34 2013 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/commitlog/CommitLogReplayer.java | 7 +++- .../cassandra/db/commitlog/CommitLogSegment.java | 25 +++++++---- .../cassandra/io/util/ByteBufferOutputStream.java | 25 ++++++++++ .../cassandra/io/util/ChecksummedOutputStream.java | 35 +++++++++++++++ .../org/apache/cassandra/utils/FBUtilities.java | 9 ++++ 6 files changed, 92 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/baca0bc7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bf95ae1..b1c066c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.3 + * avoid serializing to byte[] on commitlog append (CASSANDRA-5199) * make index_interval configurable per columnfamily (CASSANDRA-3961) * add default_tim_to_live (CASSANDRA-3974) * add memtable_flush_period_in_ms (CASSANDRA-4237) http://git-wip-us.apache.org/repos/asf/cassandra/blob/baca0bc7/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 705053b..107e9b7 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -167,9 +167,14 @@ public class CommitLogReplayer // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128 if (serializedSize < 10) break; + long claimedSizeChecksum = reader.readLong(); checksum.reset(); - checksum.update(serializedSize); + if (CommitLogDescriptor.current_version < CommitLogDescriptor.VERSION_20) + checksum.update(serializedSize); + else + FBUtilities.updateChecksumInt(checksum, serializedSize); + if (checksum.getValue() != claimedSizeChecksum) break; // entry wasn't synced correctly/fully. that's // ok. http://git-wip-us.apache.org/repos/asf/cassandra/blob/baca0bc7/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index 469ab99..9256214 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.db.commitlog; +import java.io.DataOutputStream; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; @@ -38,6 +39,8 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.RowMutation; import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.util.ByteBufferOutputStream; +import org.apache.cassandra.io.util.ChecksummedOutputStream; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.FBUtilities; @@ -69,6 +72,8 @@ public class CommitLogSegment private boolean needsSync = false; private final MappedByteBuffer buffer; + private final Checksum checksum; + private final DataOutputStream bufferStream; private boolean closed; public final CommitLogDescriptor descriptor; @@ -122,6 +127,8 @@ public class CommitLogSegment logFileAccessor.setLength(DatabaseDescriptor.getCommitLogSegmentSize()); buffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize()); + checksum = new PureJavaCrc32(); + bufferStream = new DataOutputStream(new ChecksummedOutputStream(new ByteBufferOutputStream(buffer), checksum)); buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER); buffer.position(0); @@ -204,24 +211,24 @@ public class CommitLogSegment /** * Appends a row mutation onto the commit log. Requres that hasCapacityFor has already been checked. * - * @param rowMutation the mutation to append to the commit log. + * @param mutation the mutation to append to the commit log. * @return the position of the appended mutation */ - public ReplayPosition write(RowMutation rowMutation) throws IOException + public ReplayPosition write(RowMutation mutation) throws IOException { assert !closed; ReplayPosition repPos = getContext(); - markDirty(rowMutation, repPos); + markDirty(mutation, repPos); - Checksum checksum = new PureJavaCrc32(); - byte[] serializedRow = FBUtilities.serialize(rowMutation, RowMutation.serializer, MessagingService.current_version); + checksum.reset(); - checksum.update(serializedRow.length); - buffer.putInt(serializedRow.length); + // checksummed length + int length = (int) RowMutation.serializer.serializedSize(mutation, MessagingService.current_version); + bufferStream.writeInt(length); buffer.putLong(checksum.getValue()); - buffer.put(serializedRow); - checksum.update(serializedRow, 0, serializedRow.length); + // checksummed mutation + RowMutation.serializer.serialize(mutation, bufferStream, MessagingService.current_version); buffer.putLong(checksum.getValue()); if (buffer.remaining() >= 4) http://git-wip-us.apache.org/repos/asf/cassandra/blob/baca0bc7/src/java/org/apache/cassandra/io/util/ByteBufferOutputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ByteBufferOutputStream.java b/src/java/org/apache/cassandra/io/util/ByteBufferOutputStream.java new file mode 100644 index 0000000..9a15a1c --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/ByteBufferOutputStream.java @@ -0,0 +1,25 @@ +package org.apache.cassandra.io.util; + +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class ByteBufferOutputStream extends OutputStream +{ + private final ByteBuffer buffer; + + public ByteBufferOutputStream(ByteBuffer buffer) + { + this.buffer = buffer; + } + + public void write(int b) + { + buffer.put((byte) b); + } + + @Override + public void write(byte[] b, int off, int len) + { + buffer.put(b, off, len); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/baca0bc7/src/java/org/apache/cassandra/io/util/ChecksummedOutputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedOutputStream.java b/src/java/org/apache/cassandra/io/util/ChecksummedOutputStream.java new file mode 100644 index 0000000..c7f2df3 --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/ChecksummedOutputStream.java @@ -0,0 +1,35 @@ +package org.apache.cassandra.io.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.zip.Checksum; + +public class ChecksummedOutputStream extends OutputStream +{ + private final OutputStream out; + private final Checksum checksum; + + public ChecksummedOutputStream(OutputStream out, Checksum checksum) + { + this.out = out; + this.checksum = checksum; + } + + public void resetChecksum() + { + checksum.reset(); + } + + public void write(int b) throws IOException + { + out.write(b); + checksum.update(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + out.write(b, off, len); + checksum.update(b, off, len); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/baca0bc7/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index cafd856..15f7fb0 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -34,6 +34,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.zip.Checksum; import com.google.common.base.Joiner; import com.google.common.collect.AbstractIterator; @@ -609,6 +610,14 @@ public class FBUtilities } } + public static void updateChecksumInt(Checksum checksum, int v) + { + checksum.update((v >>> 24) & 0xFF); + checksum.update((v >>> 16) & 0xFF); + checksum.update((v >>> 8) & 0xFF); + checksum.update((v >>> 0) & 0xFF); + } + private static final class WrappedCloseableIterator<T> extends AbstractIterator<T> implements CloseableIterator<T> {