Author: jbellis
Date: Wed Apr 28 19:05:10 2010
New Revision: 939053

URL: http://svn.apache.org/viewvc?rev=939053&view=rev
Log:
redo "add crc to commitlogheader," with fix for header serialization.
patch by jbellis; reviewed by gdusbabek for CASSANDRA-999

Modified:
    
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
    
cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java?rev=939053&r1=939052&r2=939053&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java 
Wed Apr 28 19:05:10 2010
@@ -19,10 +19,13 @@
 package org.apache.cassandra.db.commitlog;
 
 import java.io.*;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.io.ICompactSerializer;
@@ -31,7 +34,7 @@ import org.apache.cassandra.utils.Pair;
 
 class CommitLogHeader
 {    
-    private static CommitLogHeaderSerializer serializer = new 
CommitLogHeaderSerializer();
+    static CommitLogHeaderSerializer serializer = new 
CommitLogHeaderSerializer();
 
     static int getLowestPosition(CommitLogHeader clheader)
     {
@@ -52,7 +55,6 @@ class CommitLogHeader
     private final byte[] serializedCfMap; // serialized. only needed during 
commit log recovery.
     private final int cfCount; // we keep this in case cfcount changes in the 
interim (size of lastFlushedAt is not a good indication).
     
-    private transient final int maxSerializedSize;
     private transient Map<Pair<String, String>, Integer> cfIdMap; // only 
needed during recovery. created from this.serializedCfMap.
     
     CommitLogHeader()
@@ -71,8 +73,6 @@ class CommitLogHeader
         this.lastFlushedAt = lastFlushedAt;
         this.serializedCfMap = serializedCfMap;
         assert lastFlushedAt.size() <= cfCount;
-        // (size of lastFlushedAt) + (size of map buf) + (size of cfCount int)
-        maxSerializedSize = (8 * cfCount + 4) + (serializedCfMap.length + 4) + 
(4);
     }
         
     boolean isDirty(int cfId)
@@ -150,14 +150,11 @@ class CommitLogHeader
 
     byte[] toByteArray() throws IOException
     {
-        ByteArrayOutputStream bos = new 
ByteArrayOutputStream(maxSerializedSize);
-        DataOutputStream dos = new DataOutputStream(bos);        
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
         serializer.serialize(this, dos);
-        byte[] src = bos.toByteArray();
-        assert src.length <= maxSerializedSize;
-        byte[] dst = new byte[maxSerializedSize];
-        System.arraycopy(src, 0, dst, 0, src.length);
-        return dst;
+        dos.flush();
+        return bos.toByteArray();
     }
     
     // we use cf ids. getting the cf names would be pretty pretty expensive.
@@ -195,27 +192,70 @@ class CommitLogHeader
         public void serialize(CommitLogHeader clHeader, DataOutputStream dos) 
throws IOException
         {
             assert clHeader.lastFlushedAt.size() <= clHeader.cfCount;
+            Checksum checksum = new CRC32();
+
+            // write the first checksum after the fixed-size part, so we won't 
OOM allocating a bogus cfmap buffer
             dos.writeInt(clHeader.cfCount); // 4
             dos.writeInt(clHeader.serializedCfMap.length); // 4
-            dos.write(clHeader.serializedCfMap); // colMap.length
             dos.writeInt(clHeader.lastFlushedAt.size()); // 4
+            checksum.update(clHeader.cfCount);
+            checksum.update(clHeader.serializedCfMap.length);
+            checksum.update(clHeader.lastFlushedAt.size());
+            dos.writeLong(checksum.getValue());
+
+            // write the 2nd checksum after the cfmap and lastflushedat map
+            dos.write(clHeader.serializedCfMap); // colMap.length
+            checksum.update(clHeader.serializedCfMap, 0, 
clHeader.serializedCfMap.length);
             for (Map.Entry<Integer, Integer> entry : 
clHeader.lastFlushedAt.entrySet())
             {
                 dos.writeInt(entry.getKey()); // 4
+                checksum.update(entry.getKey());
                 dos.writeInt(entry.getValue()); // 4
+                checksum.update(entry.getValue());
+            }
+            dos.writeLong(checksum.getValue());
+
+            // keep the size constant by padding for missing flushed-at 
entries.  these do not affect checksum.
+            for (int i = clHeader.lastFlushedAt.entrySet().size(); i < 
clHeader.cfCount; i++)
+            {
+                dos.writeInt(0);
+                dos.writeInt(0);
             }
         }
 
         public CommitLogHeader deserialize(DataInputStream dis) throws 
IOException
         {
-            int colCount = dis.readInt();
-            byte[] map = new byte[dis.readInt()];
-            dis.readFully(map);
-            int size = dis.readInt();
+            Checksum checksum = new CRC32();
+
+            int cfCount = dis.readInt();
+            checksum.update(cfCount);
+            int cfMapLength = dis.readInt();
+            checksum.update(cfMapLength);
+            int lastFlushedAtSize = dis.readInt();
+            checksum.update(lastFlushedAtSize);
+            if (checksum.getValue() != dis.readLong())
+            {
+                throw new IOException("Invalid or corrupt commitlog header");
+            }
+
+            byte[] cfMap = new byte[cfMapLength];
+            dis.readFully(cfMap);
+            checksum.update(cfMap, 0, cfMap.length);
             Map<Integer, Integer> lastFlushedAt = new HashMap<Integer, 
Integer>();
-            for (int i = 0; i < size; i++)
-                lastFlushedAt.put(dis.readInt(), dis.readInt());
-            return new CommitLogHeader(lastFlushedAt, map, colCount);
+            for (int i = 0; i < lastFlushedAtSize; i++)
+            {
+                int key = dis.readInt();
+                checksum.update(key);
+                int value = dis.readInt();
+                checksum.update(value);
+                lastFlushedAt.put(key, value);
+            }
+            if (checksum.getValue() != dis.readLong())
+            {
+                throw new IOException("Invalid or corrupt commitlog header");
+            }
+
+            return new CommitLogHeader(lastFlushedAt, cfMap, cfCount);
         }
     }
 }

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java?rev=939053&r1=939052&r2=939053&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java
 Wed Apr 28 19:05:10 2010
@@ -80,6 +80,16 @@ public class CommitLogHeaderTest extends
         
         assert one.length == two.length;
     }
+
+    @Test
+    public void byteArray() throws IOException
+    {
+        SchemaLoader.loadSchemaFromYaml();
+        CommitLogHeader clh = new CommitLogHeader();
+        assert clh.getCfIdMap().size() > 0;
+        CommitLogHeader clh2 = CommitLogHeader.serializer.deserialize(new 
DataInputStream(new ByteArrayInputStream(clh.toByteArray())));
+        assert clh.getCfIdMap().equals(clh2.getCfIdMap());
+    }
     
     @Test
     public void cfMapSerialization() throws IOException


Reply via email to