Author: [email protected]
Date: Tue Apr 10 09:17:11 2012
New Revision: 2199
Log:
[AMDATUCASSANDRA-172] Added extra logging in case of corrupted commitlog
Added:
branches/amdatu-cassandra-0.2.3/_patches/AMDATUCASSANDRA-172-3.patch
Added: branches/amdatu-cassandra-0.2.3/_patches/AMDATUCASSANDRA-172-3.patch
==============================================================================
--- (empty file)
+++ branches/amdatu-cassandra-0.2.3/_patches/AMDATUCASSANDRA-172-3.patch
Tue Apr 10 09:17:11 2012
@@ -0,0 +1,101 @@
+diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+index 9f796f9..f9bca43 100644
+--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
++++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+@@ -23,15 +23,16 @@ package org.apache.cassandra.db;
+
+ import java.io.DataInput;
+ import java.io.DataOutput;
++import java.io.EOFException;
++import java.io.IOError;
+ import java.io.IOException;
+ import java.util.Collection;
+
+-import org.slf4j.Logger;
+-import org.slf4j.LoggerFactory;
+-
+ import org.apache.cassandra.config.CFMetaData;
+-import org.apache.cassandra.io.ICompactSerializer2;
++import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.io.ICompactSerializer3;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
+
+ public class ColumnFamilySerializer implements
ICompactSerializer3<ColumnFamily>
+ {
+@@ -121,10 +122,33 @@ public class ColumnFamilySerializer implements
ICompactSerializer3<ColumnFamily>
+ int cfId = dis.readInt();
+ if (CFMetaData.getCF(cfId) == null)
+ throw new UnserializableColumnFamilyException("Couldn't find
cfId=" + cfId, cfId);
+- ColumnFamily cf = ColumnFamily.create(cfId);
+- deserializeFromSSTableNoColumns(cf, dis);
+- deserializeColumns(dis, cf, intern, fromRemote);
+- return cf;
++ try {
++ ColumnFamily cf = ColumnFamily.create(cfId);
++ deserializeFromSSTableNoColumns(cf, dis);
++ deserializeColumns(dis, cf, intern, fromRemote);
++ return cf;
++ } catch (IOError err) {
++ String cfName = null;
++ try {
++ cfName = DatabaseDescriptor.getCFMetaData(cfId).cfName;
++ } catch (Exception e) {
++ cfName = "Unknown";
++ }
++
++ String msg = "Could not deserialize ColumnFamily: id='" + cfId +
"', name='" + cfName + "'";
++ throw new UnserializableColumnFamilyException(msg, err, cfId);
++ } catch (EOFException eof) {
++
++ String cfName = null;
++ try {
++ cfName = DatabaseDescriptor.getCFMetaData(cfId).cfName;
++ } catch (Exception e) {
++ cfName = "Unknown";
++ }
++
++ String msg = "Could not deserialize ColumnFamily: id='" + cfId +
"', name='" + cfName + "'";
++ throw new UnserializableColumnFamilyException(msg, eof, cfId);
++ }
+ }
+
+ public void deserializeColumns(DataInput dis, ColumnFamily cf, boolean
intern, boolean fromRemote) throws IOException
+diff --git
a/src/java/org/apache/cassandra/db/UnserializableColumnFamilyException.java
b/src/java/org/apache/cassandra/db/UnserializableColumnFamilyException.java
+index df46b28..425ca44 100644
+--- a/src/java/org/apache/cassandra/db/UnserializableColumnFamilyException.java
++++ b/src/java/org/apache/cassandra/db/UnserializableColumnFamilyException.java
+@@ -30,4 +30,10 @@ public class UnserializableColumnFamilyException extends
IOException
+ super(msg);
+ this.cfId = cfId;
+ }
++
++ public UnserializableColumnFamilyException(String msg, Throwable t, int
cfId)
++ {
++ super(msg, t);
++ this.cfId = cfId;
++ }
+ }
+diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+index ba1ebf0..ecc2a2e 100644
+--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
++++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+@@ -276,6 +276,9 @@ public class CommitLog
+ }
+ catch (UnserializableColumnFamilyException ex)
+ {
++ // Log the error message
++ logger.warn(ex.getMessage());
++
+ AtomicInteger i = invalidMutations.get(ex.cfId);
+ if (i == null)
+ {
+@@ -346,7 +349,7 @@ public class CommitLog
+ }
+
+ for (Map.Entry<Integer, AtomicInteger> entry :
invalidMutations.entrySet())
+- logger.info(String.format("Skipped %d mutations from unknown
(probably removed) CF with id %d", entry.getValue().intValue(),
entry.getKey()));
++ logger.warn(String.format("Skipped %d mutations from unknown
(probably removed) CF with id %d", entry.getValue().intValue(),
entry.getKey()));
+
+ // wait for all the writes to finish on the mutation stage
+ FBUtilities.waitOnFutures(futures);
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits