Skip replaying mutations that pass CRC but fail to deserialize
patch by jbellis; reviewed by Lyuben Todorov for CASSANDRA-6183


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bafb9663
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bafb9663
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bafb9663

Branch: refs/heads/trunk
Commit: bafb9663940b914274648ec01a9087a309545130
Parents: 4e35969
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Mon Oct 14 20:24:31 2013 +0100
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Mon Oct 14 20:25:05 2013 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/commitlog/CommitLogReplayer.java         | 26 +++++++++++++++++---
 2 files changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bafb9663/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 48fcf58..046ecfb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,7 @@
  * Fix indexed queries with row cache enabled on parent table (CASSANDRA-5732)
  * Fix compaction race during columnfamily drop (CASSANDRA-5957)
  * Fix validation of empty column names for compact tables (CASSANDRA-6152)
+ * Skip replaying mutations that pass CRC but fail to deserialize 
(CASSANDRA-6183)
 
 
 1.2.10

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bafb9663/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 796ab5b..934cb6a 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -18,10 +18,7 @@
  */
 package org.apache.cassandra.db.commitlog;
 
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
 import java.util.*;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -200,6 +197,10 @@ public class CommitLogReplayer
                     // assuming version here. We've gone to lengths to make 
sure what gets written to the CL is in
                     // the current version. so do make sure the CL is drained 
prior to upgrading a node.
                     rm = RowMutation.serializer.deserialize(new 
DataInputStream(bufIn), version, IColumnSerializer.Flag.LOCAL);
+                    // doublecheck that what we read is [still] valid for the 
current schema
+                    for (ColumnFamily cf : rm.getColumnFamilies())
+                        for (IColumn cell : cf)
+                            cf.getComparator().validate(cell.name());
                 }
                 catch (UnknownColumnFamilyException ex)
                 {
@@ -215,6 +216,23 @@ public class CommitLogReplayer
                         i.incrementAndGet();
                     continue;
                 }
+                catch (Throwable t)
+                {
+                    File f = File.createTempFile("mutation", "dat");
+                    DataOutputStream out = new DataOutputStream(new 
FileOutputStream(f));
+                    try
+                    {
+                        out.write(buffer, 0, serializedSize);
+                    }
+                    finally
+                    {
+                        out.close();
+                    }
+                    String st = String.format("Unexpected error deserializing 
mutation; saved to %s and ignored.  This may be caused by replaying a mutation 
against a table with the same name but incompatible schema.  Exception follows: 
",
+                                              f.getAbsolutePath());
+                    logger.error(st, t);
+                    continue;
+                }
 
                 if (logger.isDebugEnabled())
                     logger.debug(String.format("replaying mutation for %s.%s: 
%s", rm.getTable(), ByteBufferUtil.bytesToHex(rm.key()), "{" + 
StringUtils.join(rm.getColumnFamilies().iterator(), ", ")

Reply via email to