Author: jbellis Date: Wed Apr 28 19:05:10 2010 New Revision: 939053 URL: http://svn.apache.org/viewvc?rev=939053&view=rev Log: redo "add crc to commitlogheader," with fix for header serialization. patch by jbellis; reviewed by gdusbabek for CASSANDRA-999
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java?rev=939053&r1=939052&r2=939053&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java Wed Apr 28 19:05:10 2010 @@ -19,10 +19,13 @@ package org.apache.cassandra.db.commitlog; import java.io.*; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Map; +import java.util.zip.CRC32; +import java.util.zip.Checksum; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.io.ICompactSerializer; @@ -31,7 +34,7 @@ import org.apache.cassandra.utils.Pair; class CommitLogHeader { - private static CommitLogHeaderSerializer serializer = new CommitLogHeaderSerializer(); + static CommitLogHeaderSerializer serializer = new CommitLogHeaderSerializer(); static int getLowestPosition(CommitLogHeader clheader) { @@ -52,7 +55,6 @@ class CommitLogHeader private final byte[] serializedCfMap; // serialized. only needed during commit log recovery. private final int cfCount; // we keep this in case cfcount changes in the interim (size of lastFlushedAt is not a good indication). - private transient final int maxSerializedSize; private transient Map<Pair<String, String>, Integer> cfIdMap; // only needed during recovery. created from this.serializedCfMap. CommitLogHeader() @@ -71,8 +73,6 @@ class CommitLogHeader this.lastFlushedAt = lastFlushedAt; this.serializedCfMap = serializedCfMap; assert lastFlushedAt.size() <= cfCount; - // (size of lastFlushedAt) + (size of map buf) + (size of cfCount int) - maxSerializedSize = (8 * cfCount + 4) + (serializedCfMap.length + 4) + (4); } boolean isDirty(int cfId) @@ -150,14 +150,11 @@ class CommitLogHeader byte[] toByteArray() throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(maxSerializedSize); - DataOutputStream dos = new DataOutputStream(bos); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); serializer.serialize(this, dos); - byte[] src = bos.toByteArray(); - assert src.length <= maxSerializedSize; - byte[] dst = new byte[maxSerializedSize]; - System.arraycopy(src, 0, dst, 0, src.length); - return dst; + dos.flush(); + return bos.toByteArray(); } // we use cf ids. getting the cf names would be pretty pretty expensive. @@ -195,27 +192,70 @@ class CommitLogHeader public void serialize(CommitLogHeader clHeader, DataOutputStream dos) throws IOException { assert clHeader.lastFlushedAt.size() <= clHeader.cfCount; + Checksum checksum = new CRC32(); + + // write the first checksum after the fixed-size part, so we won't OOM allocating a bogus cfmap buffer dos.writeInt(clHeader.cfCount); // 4 dos.writeInt(clHeader.serializedCfMap.length); // 4 - dos.write(clHeader.serializedCfMap); // colMap.length dos.writeInt(clHeader.lastFlushedAt.size()); // 4 + checksum.update(clHeader.cfCount); + checksum.update(clHeader.serializedCfMap.length); + checksum.update(clHeader.lastFlushedAt.size()); + dos.writeLong(checksum.getValue()); + + // write the 2nd checksum after the cfmap and lastflushedat map + dos.write(clHeader.serializedCfMap); // colMap.length + checksum.update(clHeader.serializedCfMap, 0, clHeader.serializedCfMap.length); for (Map.Entry<Integer, Integer> entry : clHeader.lastFlushedAt.entrySet()) { dos.writeInt(entry.getKey()); // 4 + checksum.update(entry.getKey()); dos.writeInt(entry.getValue()); // 4 + checksum.update(entry.getValue()); + } + dos.writeLong(checksum.getValue()); + + // keep the size constant by padding for missing flushed-at entries. these do not affect checksum. + for (int i = clHeader.lastFlushedAt.entrySet().size(); i < clHeader.cfCount; i++) + { + dos.writeInt(0); + dos.writeInt(0); } } public CommitLogHeader deserialize(DataInputStream dis) throws IOException { - int colCount = dis.readInt(); - byte[] map = new byte[dis.readInt()]; - dis.readFully(map); - int size = dis.readInt(); + Checksum checksum = new CRC32(); + + int cfCount = dis.readInt(); + checksum.update(cfCount); + int cfMapLength = dis.readInt(); + checksum.update(cfMapLength); + int lastFlushedAtSize = dis.readInt(); + checksum.update(lastFlushedAtSize); + if (checksum.getValue() != dis.readLong()) + { + throw new IOException("Invalid or corrupt commitlog header"); + } + + byte[] cfMap = new byte[cfMapLength]; + dis.readFully(cfMap); + checksum.update(cfMap, 0, cfMap.length); Map<Integer, Integer> lastFlushedAt = new HashMap<Integer, Integer>(); - for (int i = 0; i < size; i++) - lastFlushedAt.put(dis.readInt(), dis.readInt()); - return new CommitLogHeader(lastFlushedAt, map, colCount); + for (int i = 0; i < lastFlushedAtSize; i++) + { + int key = dis.readInt(); + checksum.update(key); + int value = dis.readInt(); + checksum.update(value); + lastFlushedAt.put(key, value); + } + if (checksum.getValue() != dis.readLong()) + { + throw new IOException("Invalid or corrupt commitlog header"); + } + + return new CommitLogHeader(lastFlushedAt, cfMap, cfCount); } } } Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java?rev=939053&r1=939052&r2=939053&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java Wed Apr 28 19:05:10 2010 @@ -80,6 +80,16 @@ public class CommitLogHeaderTest extends assert one.length == two.length; } + + @Test + public void byteArray() throws IOException + { + SchemaLoader.loadSchemaFromYaml(); + CommitLogHeader clh = new CommitLogHeader(); + assert clh.getCfIdMap().size() > 0; + CommitLogHeader clh2 = CommitLogHeader.serializer.deserialize(new DataInputStream(new ByteArrayInputStream(clh.toByteArray()))); + assert clh.getCfIdMap().equals(clh2.getCfIdMap()); + } @Test public void cfMapSerialization() throws IOException