Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 109e600fa -> 581ce6310 refs/heads/trunk ae03e1bab -> ac4a0263f
Fail to start if commit log replay encounters an exception patch by Benedict; reviewed by Vijay Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/581ce631 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/581ce631 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/581ce631 Branch: refs/heads/cassandra-2.1 Commit: 581ce631026b98ee9438d54ef144df89bc91100b Parents: 109e600 Author: Benedict Elliott Smith <bened...@apache.org> Authored: Thu Jul 10 09:55:00 2014 +0100 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Thu Jul 10 09:58:52 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/commitlog/CommitLog.java | 17 ++- .../db/commitlog/CommitLogDescriptor.java | 8 +- .../db/commitlog/CommitLogReplayer.java | 76 ++++++++--- .../commitlog/MalformedCommitLogException.java | 16 +++ .../cassandra/service/CassandraDaemon.java | 2 + .../org/apache/cassandra/db/CommitLogTest.java | 133 +++++++++++++++---- 7 files changed, 205 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ae06d92..02a2d52 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.1 + * Fail to start if commit log replay detects a problem (CASSANDRA-7125) * Improve schema merge performance (CASSANDRA-7444) * Fix NPE when unknown prepared statement ID is used (CASSANDRA-7454) * Adjust MT depth based on # of partition validating (CASSANDRA-5263) http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index cf8a7f6..ac1d811 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -133,9 +133,20 @@ public class CommitLog implements CommitLogMBean */ public int recover(File... clogs) throws IOException { - CommitLogReplayer recovery = new CommitLogReplayer(); - recovery.recover(clogs); - return recovery.blockForWrites(); + try + { + CommitLogReplayer recovery = new CommitLogReplayer(); + recovery.recover(clogs); + return recovery.blockForWrites(); + } + catch (IOException e) + { + if (e instanceof UnknownColumnFamilyException) + logger.error("Commit log replay failed due to replaying a mutation for a missing table. This error can be ignored by providing -Dcassandra.commitlog.stop_on_missing_tables=false on the command line"); + if (e instanceof MalformedCommitLogException) + logger.error("Commit log replay failed due to a non-fatal exception. This error can be ignored by providing -Dcassandra.commitlog.stop_on_errors=false on the command line"); + throw e; + } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java index 91c81e1..77c25d3 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java @@ -28,6 +28,8 @@ import java.nio.ByteBuffer; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.FBUtilities; @@ -48,10 +50,11 @@ public class CommitLogDescriptor * Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes. * Note: make sure to handle {@link #getMessagingVersion()} */ + @VisibleForTesting public static final int current_version = VERSION_21; // [version, id, checksum] - static final int HEADER_SIZE = 4 + 8 + 4; + public static final int HEADER_SIZE = 4 + 8 + 4; final int version; public final long id; @@ -67,7 +70,8 @@ public class CommitLogDescriptor this(current_version, id); } - static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor) + @VisibleForTesting + public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor) { out.putInt(0, descriptor.version); out.putLong(4, descriptor.id); http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 1012829..10d13b2 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -23,6 +23,7 @@ import java.util.*; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterables; @@ -48,6 +49,8 @@ public class CommitLogReplayer private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class); private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024; private static final int LEGACY_END_OF_SEGMENT_MARKER = 0; + private static boolean IGNORE_ERRORS = System.getProperty("cassandra.commitlog.stop_on_errors", "true").equals("false"); + private static boolean IGNORE_MISSING_TABLES = IGNORE_ERRORS || System.getProperty("cassandra.commitlog.stop_on_missing_tables", "true").equals("false"); private final Set<Keyspace> keyspacesRecovered; private final List<Future<?>> futures; @@ -60,16 +63,16 @@ public class CommitLogReplayer public CommitLogReplayer() { - this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>(); - this.futures = new ArrayList<Future<?>>(); + this.keyspacesRecovered = new NonBlockingHashSet<>(); + this.futures = new ArrayList<>(); this.buffer = new byte[4096]; - this.invalidMutations = new HashMap<UUID, AtomicInteger>(); + this.invalidMutations = new HashMap<>(); // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference. this.replayedCount = new AtomicInteger(); this.checksum = new PureJavaCrc32(); // compute per-CF and global replay positions - cfPositions = new HashMap<UUID, ReplayPosition>(); + cfPositions = new HashMap<>(); Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator); for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { @@ -117,7 +120,12 @@ public class CommitLogReplayer if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE) { if (offset != reader.length() && offset != Integer.MAX_VALUE) - logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header", offset, reader.getPath()); + { + String message = String.format("Encountered bad header at position %d of Commit log %s; not enough room for a header", offset, reader.getPath()); + if (!IGNORE_ERRORS) + throw new MalformedCommitLogException(message); + logger.warn(message); + } // cannot possibly be a header here. if we're == length(), assume it's a correctly written final segment return -1; } @@ -136,13 +144,19 @@ public class CommitLogReplayer { if (end != 0 || filecrc != 0) { - logger.warn("Encountered bad header at position {} of commit log {}, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath()); + String message = String.format("Encountered bad header at position %d of Commit log %s, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath()); + if (!IGNORE_ERRORS) + throw new MalformedCommitLogException(message); + logger.warn(message); } return -1; } else if (end < offset || end > reader.length()) { - logger.warn("Encountered bad header at position {} of commit log {}, with bad position but valid CRC", offset, reader.getPath()); + String message = String.format("Encountered bad header at position %d of Commit log %s, with bad position but valid CRC.", offset, reader.getPath()); + if (!IGNORE_ERRORS) + throw new MalformedCommitLogException(message); + logger.warn(message); return -1; } return end; @@ -271,8 +285,9 @@ public class CommitLogReplayer /* read the logs populate Mutation and apply */ while (reader.getPosition() < end && !reader.isEOF()) { + long mutationStart = reader.getFilePointer(); if (logger.isDebugEnabled()) - logger.debug("Reading mutation at {}", reader.getFilePointer()); + logger.debug("Reading mutation at {}", mutationStart); long claimedCRC32; int serializedSize; @@ -282,7 +297,7 @@ public class CommitLogReplayer serializedSize = reader.readInt(); if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER) { - logger.debug("Encountered end of segment marker at {}", reader.getFilePointer()); + logger.debug("Encountered end of segment marker at {}", mutationStart); break main; } @@ -291,7 +306,11 @@ public class CommitLogReplayer // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count. // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128 if (serializedSize < 10) + { + if (!IGNORE_ERRORS) + throw new MalformedCommitLogException("Too small mutation encountered at position " + mutationStart); break main; + } long claimedSizeChecksum; if (desc.version < CommitLogDescriptor.VERSION_21) @@ -305,7 +324,11 @@ public class CommitLogReplayer checksum.updateInt(serializedSize); if (checksum.getValue() != claimedSizeChecksum) + { + if (!IGNORE_ERRORS) + throw new IOException("Invalid size checksum for mutation at position " + mutationStart + " of " + file); break main; // entry wasn't synced correctly/fully. that's + } // ok. if (serializedSize > buffer.length) @@ -318,12 +341,17 @@ public class CommitLogReplayer } catch (EOFException eof) { + if (!IGNORE_ERRORS) + throw new MalformedCommitLogException("Encountered end-of-file unexpectedly", eof); + break main; // last CL entry didn't get completely written. that's ok. } checksum.update(buffer, 0, serializedSize); if (claimedCRC32 != checksum.getValue()) { + if (!IGNORE_ERRORS) + throw new IOException("Invalid checksum for mutation at position " + mutationStart + " of " + file); // this entry must not have been fsynced. probably the rest is bad too, // but just in case there is no harm in trying them (since we still read on an entry boundary) continue; @@ -344,6 +372,9 @@ public class CommitLogReplayer } catch (UnknownColumnFamilyException ex) { + if (!IGNORE_MISSING_TABLES) + throw ex; + if (ex.cfId == null) continue; AtomicInteger i = invalidMutations.get(ex.cfId); @@ -358,16 +389,14 @@ public class CommitLogReplayer } catch (Throwable t) { + if (!IGNORE_ERRORS) + throw new MalformedCommitLogException("Encountered bad mutation", t); + File f = File.createTempFile("mutation", "dat"); - DataOutputStream out = new DataOutputStream(new FileOutputStream(f)); - try + try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f))) { out.write(buffer, 0, serializedSize); } - finally - { - out.close(); - } String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ", f.getAbsolutePath()); logger.error(st, t); @@ -383,7 +412,11 @@ public class CommitLogReplayer public void runMayThrow() throws IOException { if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) + { + if (!IGNORE_MISSING_TABLES) + throw new UnknownColumnFamilyException("Keyspace for this table is missing", mutation.getColumnFamilyIds().iterator().next()); return; + } if (pointInTimeExceeded(mutation)) return; @@ -398,7 +431,12 @@ public class CommitLogReplayer for (ColumnFamily columnFamily : replayFilter.filter(mutation)) { if (Schema.instance.getCF(columnFamily.id()) == null) + { + if (!IGNORE_MISSING_TABLES) + throw new UnknownColumnFamilyException("Missing table with cfid=" + columnFamily.id(), + mutation.getColumnFamilyIds().iterator().next()); continue; // dropped + } ReplayPosition rp = cfPositions.get(columnFamily.id()); @@ -415,7 +453,7 @@ public class CommitLogReplayer if (newMutation != null) { assert !newMutation.isEmpty(); - Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false); + keyspace.apply(newMutation, false); keyspacesRecovered.add(keyspace); } } @@ -453,4 +491,10 @@ public class CommitLogReplayer } return false; } + + @VisibleForTesting + public static void setIgnoreErrors(boolean ignore) + { + IGNORE_ERRORS = IGNORE_MISSING_TABLES = ignore; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java b/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java new file mode 100644 index 0000000..84a5cb0 --- /dev/null +++ b/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java @@ -0,0 +1,16 @@ +package org.apache.cassandra.db.commitlog; + +import java.io.IOException; + +// represents a non-fatal commit log replay exception (i.e. can be skipped with -Dcassandra.commitlog.ignoreerrors=true) +public class MalformedCommitLogException extends IOException +{ + public MalformedCommitLogException(String message) + { + super(message); + } + public MalformedCommitLogException(String message, Throwable cause) + { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index fbee7ce..07c6cc4 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -47,6 +47,8 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.UnknownColumnFamilyException; +import org.apache.cassandra.db.commitlog.MalformedCommitLogException; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.exceptions.ConfigurationException; http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/test/unit/org/apache/cassandra/db/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java index 7046536..dd05272 100644 --- a/test/unit/org/apache/cassandra/db/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java @@ -36,46 +36,53 @@ import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.CommitLogDescriptor; +import org.apache.cassandra.db.commitlog.CommitLogReplayer; import org.apache.cassandra.db.commitlog.CommitLogSegment; +import org.apache.cassandra.db.commitlog.MalformedCommitLogException; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.PureJavaCrc32; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; public class CommitLogTest extends SchemaLoader { + + static + { + System.setProperty("cassandra.commitlog.stop_on_errors", "true"); + } + @Test public void testRecoveryWithEmptyLog() throws Exception { - CommitLog.instance.recover(new File[]{ tmpFile() }); + testMalformed(badLogFile(new byte[0])); } @Test public void testRecoveryWithShortLog() throws Exception { // force EOF while reading log - testRecoveryWithBadSizeArgument(100, 10); + testMalformed(badLogFile(100, 10)); } @Test public void testRecoveryWithShortSize() throws Exception { - testRecovery(new byte[2]); + testMalformed(new byte[2]); } @Test public void testRecoveryWithShortCheckSum() throws Exception { - testRecovery(new byte[6]); + testMalformed(new byte[6]); } @Test public void testRecoveryWithGarbageLog() throws Exception { - byte[] garbage = new byte[100]; - (new java.util.Random()).nextBytes(garbage); - testRecovery(garbage); + testMalformed(garbage(100)); } @Test @@ -83,21 +90,30 @@ public class CommitLogTest extends SchemaLoader { Checksum checksum = new CRC32(); checksum.update(100); - testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue()); + testMalformed(badLogFile(100, checksum.getValue(), new byte[100])); + testMalformed(badLogFile(100, checksum.getValue(), garbage(100))); + } + + @Test + public void testRecoveryWithBadSize() throws Exception + { + Checksum checksum = new CRC32(); + checksum.update(100); + testMalformed(badLogFile(120, checksum.getValue(), garbage(100))); } @Test public void testRecoveryWithZeroSegmentSizeArgument() throws Exception { // many different combinations of 4 bytes (garbage) will be read as zero by readInt() - testRecoveryWithBadSizeArgument(0, 10); // zero size, but no EOF + testMalformed(badLogFile(0, -1L, 10)); // zero size, but no EOF } @Test public void testRecoveryWithNegativeSizeArgument() throws Exception { // garbage from a partial/bad flush could be read as a negative size even if there is no EOF - testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF + testMalformed(badLogFile(-10, 10)); // zero size, but no EOF } @Test @@ -174,8 +190,8 @@ public class CommitLogTest extends SchemaLoader private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String table, CellName column) { - Mutation rm = new Mutation("Keyspace1", bytes("k")); - rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(0), 0); + Mutation rm = new Mutation(keyspace, key); + rm.add(table, column, ByteBuffer.allocate(0), 0); int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2); max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead @@ -215,22 +231,73 @@ public class CommitLogTest extends SchemaLoader } } - protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception + // construct log file with correct chunk checksum for the provided size/position + protected File badLogFile(int markerSize, int realSize) throws Exception { - Checksum checksum = new CRC32(); - checksum.update(size); - testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue()); + return badLogFile(markerSize, garbage(realSize)); } - protected void testRecoveryWithBadSizeArgument(int size, int dataSize, long checksum) throws Exception + protected File badLogFile(int markerSize, byte[] data) throws Exception + { + File logFile = tmpFile(); + CommitLogDescriptor descriptor = CommitLogDescriptor.fromFileName(logFile.getName()); + PureJavaCrc32 crc = new PureJavaCrc32(); + crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL)); + crc.updateInt((int) (descriptor.id >>> 32)); + crc.updateInt(CommitLogDescriptor.HEADER_SIZE); + return badLogFile(markerSize, crc.getCrc(), data, logFile); + } + + protected byte[] garbage(int size) + { + byte[] garbage = new byte[size]; + (new java.util.Random()).nextBytes(garbage); + return garbage; + } + + protected File badLogFile(int markerSize, long checksum, int realSize) throws Exception + { + return badLogFile(markerSize, checksum, realSize, tmpFile()); + } + + protected File badLogFile(int markerSize, long checksum, int realSize, File logFile) throws Exception + { + return badLogFile(markerSize, checksum, new byte[realSize], logFile); + } + + protected File badLogFile(int markerSize, long checksum, byte[] chunk) throws Exception + { + return badLogFile(markerSize, checksum, chunk, tmpFile()); + } + + protected File badLogFile(int markerSize, long checksum, byte[] chunk, File logFile) throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(out); - dout.writeInt(size); + ByteBuffer buffer = ByteBuffer.allocate(CommitLogDescriptor.HEADER_SIZE); + CommitLogDescriptor.writeHeader(buffer, CommitLogDescriptor.fromFileName(logFile.getName())); + out.write(buffer.array()); + dout.writeInt(markerSize); dout.writeLong(checksum); - dout.write(new byte[dataSize]); + dout.write(chunk); dout.close(); - testRecovery(out.toByteArray()); + try (OutputStream lout = new FileOutputStream(logFile)) + { + lout.write(out.toByteArray()); + lout.close(); + } + return logFile; + } + + protected File badLogFile(byte[] contents) throws Exception + { + File logFile = tmpFile(); + try (OutputStream lout = new FileOutputStream(logFile)) + { + lout.write(contents); + lout.close(); + } + return logFile; } protected File tmpFile() throws IOException @@ -241,17 +308,29 @@ public class CommitLogTest extends SchemaLoader return logFile; } - protected void testRecovery(byte[] logData) throws Exception + private void testMalformed(byte[] contents) throws Exception { - File logFile = tmpFile(); - try (OutputStream lout = new FileOutputStream(logFile)) + testMalformed(badLogFile(contents)); + testMalformed(badLogFile(contents.length, contents)); + } + + private void testMalformed(File logFile) throws Exception + { + CommitLogReplayer.setIgnoreErrors(true); + CommitLog.instance.recover(new File[]{ logFile }); + CommitLogReplayer.setIgnoreErrors(false); + try { - lout.write(logData); - //statics make it annoying to test things correctly - CommitLog.instance.recover(new File[]{ logFile }); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/ + CommitLog.instance.recover(new File[]{ logFile }); + Assert.assertFalse(true); + } + catch (Throwable t) + { + if (!(t instanceof MalformedCommitLogException)) + throw t; } } - + @Test public void testVersions() {