Honors commit log policy when replaying; treats errors in the last segment (section for compressed files) as permissible due to incomplete flush. Reviewed by aweisberg for CASSANDRA-9749
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d12d2d49 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d12d2d49 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d12d2d49 Branch: refs/heads/trunk Commit: d12d2d496540c698f30e9b528b66e8f6636842d3 Parents: 7eed4b6 Author: Branimir Lambov <branimir.lam...@datastax.com> Authored: Thu Jul 30 20:59:16 2015 +0300 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Tue Aug 18 20:16:31 2015 -0500 ---------------------------------------------------------------------- .../cassandra/db/commitlog/CommitLog.java | 4 +- .../db/commitlog/CommitLogReplayer.java | 117 +++- .../db/commitlog/CommitLogSegmentManager.java | 8 +- .../CommitLog-5-1438186885380.log | Bin 0 -> 839051 bytes .../legacy-commitlog/2.2-lz4-bitrot/hash.txt | 6 + .../CommitLog-5-1438186885380.log | Bin 0 -> 839051 bytes .../legacy-commitlog/2.2-lz4-bitrot2/hash.txt | 6 + .../CommitLog-5-1438186885380.log | Bin 0 -> 839001 bytes .../legacy-commitlog/2.2-lz4-truncated/hash.txt | 5 + .../db/CommitLogFailurePolicyTest.java | 141 ----- .../org/apache/cassandra/db/CommitLogTest.java | 467 --------------- .../commitlog/CommitLogFailurePolicyTest.java | 141 +++++ .../cassandra/db/commitlog/CommitLogTest.java | 578 +++++++++++++++++++ .../db/commitlog/CommitLogUpgradeTest.java | 65 +++ 14 files changed, 899 insertions(+), 639 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 8d74677..ff27225 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -193,7 +193,9 @@ public class CommitLog implements CommitLogMBean */ public void recover(String path) throws IOException { - recover(new File(path)); + CommitLogReplayer recovery = CommitLogReplayer.construct(this); + recovery.recover(new File(path), false); + recovery.blockForWrites(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 358661a..93c3026 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -64,6 +64,7 @@ import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; public class CommitLogReplayer { + static final String IGNORE_REPLAY_ERRORS_PROPERTY = "cassandra.commitlog.ignorereplayerrors"; private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class); private static final int MAX_OUTSTANDING_REPLAY_COUNT = Integer.getInteger("cassandra.commitlog_max_outstanding_replay_count", 1024); private static final int LEGACY_END_OF_SEGMENT_MARKER = 0; @@ -144,14 +145,15 @@ public class CommitLogReplayer public void recover(File[] clogs) throws IOException { - for (final File file : clogs) - recover(file); + int i; + for (i = 0; i < clogs.length; ++i) + recover(clogs[i], i + 1 == clogs.length); } public int blockForWrites() { for (Map.Entry<UUID, AtomicInteger> entry : invalidMutations.entrySet()) - logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %s", entry.getValue().intValue(), entry.getKey())); + logger.warn(String.format("Skipped %d mutations from unknown (probably removed) CF with id %s", entry.getValue().intValue(), entry.getKey())); // wait for all the writes to finish on the mutation stage FBUtilities.waitOnFutures(futures); @@ -165,7 +167,7 @@ public class CommitLogReplayer return replayedCount.get(); } - private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException + private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader, boolean tolerateTruncation) throws IOException { if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE) { @@ -183,13 +185,17 @@ public class CommitLogReplayer { if (end != 0 || filecrc != 0) { - logger.warn("Encountered bad header at position {} of commit log {}, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath()); + handleReplayError(false, + "Encountered bad header at position %d of commit log %s, with invalid CRC. " + + "The end of segment marker should be zero.", + offset, reader.getPath()); } return -1; } else if (end < offset || end > reader.length()) { - logger.warn("Encountered bad header at position {} of commit log {}, with bad position but valid CRC", offset, reader.getPath()); + handleReplayError(tolerateTruncation, "Encountered bad header at position %d of commit log %s, with bad position but valid CRC", + offset, reader.getPath()); return -1; } return end; @@ -270,8 +276,7 @@ public class CommitLogReplayer } } - @SuppressWarnings("resource") - public void recover(File file) throws IOException + public void recover(File file, boolean tolerateTruncation) throws IOException { CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath())); @@ -283,7 +288,7 @@ public class CommitLogReplayer return; if (globalPosition.segment == desc.id) reader.seek(globalPosition.position); - replaySyncSection(reader, (int) reader.getPositionLimit(), desc); + replaySyncSection(reader, (int) reader.getPositionLimit(), desc, desc.fileName(), tolerateTruncation); return; } @@ -297,10 +302,15 @@ public class CommitLogReplayer desc = null; } if (desc == null) { - logger.warn("Could not read commit log descriptor in file {}", file); + handleReplayError(false, "Could not read commit log descriptor in file %s", file); return; } - assert segmentId == desc.id; + if (segmentId != desc.id) + { + handleReplayError(false, "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentId, desc.id, file); + // continue processing if ignored. + } + if (logAndCheckIfShouldSkip(file, desc)) return; @@ -313,7 +323,7 @@ public class CommitLogReplayer } catch (ConfigurationException e) { - logger.warn("Unknown compression: {}", e.getMessage()); + handleReplayError(false, "Unknown compression: %s", e.getMessage()); return; } } @@ -322,7 +332,7 @@ public class CommitLogReplayer int end = (int) reader.getFilePointer(); int replayEnd = end; - while ((end = readSyncMarker(desc, end, reader)) >= 0) + while ((end = readSyncMarker(desc, end, reader, tolerateTruncation)) >= 0) { int replayPos = replayEnd + CommitLogSegment.SYNC_MARKER_SIZE; @@ -343,11 +353,17 @@ public class CommitLogReplayer continue; FileDataInput sectionReader = reader; + String errorContext = desc.fileName(); + // In the uncompressed case the last non-fully-flushed section can be anywhere in the file. + boolean tolerateErrorsInSection = tolerateTruncation; if (compressor != null) { + // In the compressed case we know if this is the last section. + tolerateErrorsInSection &= end == reader.length() || end < 0; + + int start = (int) reader.getFilePointer(); try { - int start = (int) reader.getFilePointer(); int compressedLength = end - start; if (logger.isDebugEnabled()) logger.trace("Decompressing {} between replay positions {} and {}", @@ -362,15 +378,18 @@ public class CommitLogReplayer uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)]; compressedLength = compressor.uncompress(buffer, 0, compressedLength, uncompressedBuffer, 0); sectionReader = new ByteBufferDataInput(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos, 0); + errorContext = "compressed section at " + start + " in " + errorContext; } - catch (IOException e) + catch (IOException | ArrayIndexOutOfBoundsException e) { - logger.error("Unexpected exception decompressing section {}", e); + handleReplayError(tolerateErrorsInSection, + "Unexpected exception decompressing section at %d: %s", + start, e); continue; } } - if (!replaySyncSection(sectionReader, replayEnd, desc)) + if (!replaySyncSection(sectionReader, replayEnd, desc, errorContext, tolerateErrorsInSection)) break; } } @@ -402,13 +421,14 @@ public class CommitLogReplayer * * @return Whether replay should continue with the next section. */ - private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc) throws IOException + private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc, String errorContext, boolean tolerateErrors) throws IOException { /* read the logs populate Mutation and apply */ while (reader.getFilePointer() < end && !reader.isEOF()) { + long mutationStart = reader.getFilePointer(); if (logger.isDebugEnabled()) - logger.trace("Reading mutation at {}", reader.getFilePointer()); + logger.trace("Reading mutation at {}", mutationStart); long claimedCRC32; int serializedSize; @@ -427,7 +447,12 @@ public class CommitLogReplayer // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count. // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128 if (serializedSize < 10) + { + handleReplayError(tolerateErrors, + "Invalid mutation size %d at %d in %s", + serializedSize, mutationStart, errorContext); return false; + } long claimedSizeChecksum; if (desc.version < CommitLogDescriptor.VERSION_21) @@ -441,7 +466,12 @@ public class CommitLogReplayer updateChecksumInt(checksum, serializedSize); if (checksum.getValue() != claimedSizeChecksum) + { + handleReplayError(tolerateErrors, + "Mutation size checksum failure at %d in %s", + mutationStart, errorContext); return false; + } // ok. if (serializedSize > buffer.length) @@ -454,14 +484,18 @@ public class CommitLogReplayer } catch (EOFException eof) { + handleReplayError(tolerateErrors, + "Unexpected end of segment", + mutationStart, errorContext); return false; // last CL entry didn't get completely written. that's ok. } checksum.update(buffer, 0, serializedSize); if (claimedCRC32 != checksum.getValue()) { - // this entry must not have been fsynced. probably the rest is bad too, - // but just in case there is no harm in trying them (since we still read on an entry boundary) + handleReplayError(tolerateErrors, + "Mutation checksum failure at %d in %s", + mutationStart, errorContext); continue; } replayMutation(buffer, serializedSize, reader.getFilePointer(), desc); @@ -510,9 +544,13 @@ public class CommitLogReplayer out.write(inputBuffer, 0, size); } - String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ", - f.getAbsolutePath()); - logger.error(st, t); + // Checksum passed so this error can't be permissible. + handleReplayError(false, + "Unexpected error deserializing mutation; saved to %s. " + + "This may be caused by replaying a mutation against a table with the same name but incompatible schema. " + + "Exception follows: %s", + f.getAbsolutePath(), + t); return; } @@ -580,4 +618,35 @@ public class CommitLogReplayer } return false; } + + static void handleReplayError(boolean permissible, String message, Object... messageArgs) throws IOException + { + String msg = String.format(message, messageArgs); + IOException e = new CommitLogReplayException(msg); + if (permissible) + logger.error("Ignoring commit log replay error likely due to incomplete flush to disk", e); + else if (Boolean.getBoolean(IGNORE_REPLAY_ERRORS_PROPERTY)) + logger.error("Ignoring commit log replay error", e); + else if (!CommitLog.handleCommitError("Failed commit log replay", e)) + { + logger.error("Replay stopped. If you wish to override this error and continue starting the node ignoring " + + "commit log replay problems, specify -D" + IGNORE_REPLAY_ERRORS_PROPERTY + "=true " + + "on the command line"); + throw e; + } + } + + @SuppressWarnings("serial") + public static class CommitLogReplayException extends IOException + { + public CommitLogReplayException(String message, Throwable cause) + { + super(message, cause); + } + + public CommitLogReplayException(String message) + { + super(message); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java index 19b850f..9849350 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java @@ -34,10 +34,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.*; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,14 +94,12 @@ public class CommitLogSegmentManager private volatile boolean run = true; private final CommitLog commitLog; - @VisibleForTesting - public CommitLogSegmentManager(final CommitLog commitLog) + CommitLogSegmentManager(final CommitLog commitLog) { this.commitLog = commitLog; } - @VisibleForTesting - public void start() + void start() { // The run loop for the manager thread Runnable runnable = new WrappedRunnable() http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log b/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log new file mode 100644 index 0000000..d248d59 Binary files /dev/null and b/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt b/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt new file mode 100644 index 0000000..c4d8fe7 --- /dev/null +++ b/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt @@ -0,0 +1,6 @@ +#CommitLog bitrot test, version 2.2.0-SNAPSHOT +#This is a copy of 2.2-lz4 with some overwritten bytes. +#Replaying this should result in an error which can be overridden. +cells=6051 +hash=-170208326 +cfid=dc32ce20-360d-11e5-826c-afadad37221d http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log b/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log new file mode 100644 index 0000000..083d65c Binary files /dev/null and b/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt b/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt new file mode 100644 index 0000000..c49dda0 --- /dev/null +++ b/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt @@ -0,0 +1,6 @@ +#CommitLog upgrade test, version 2.2.0-SNAPSHOT +#This is a copy of 2.2-lz4 with some overwritten bytes. +#Replaying this should result in an error which can be overridden. +cells=6037 +hash=-1312748407 +cfid=dc32ce20-360d-11e5-826c-afadad37221d http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log b/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log new file mode 100644 index 0000000..939d408 Binary files /dev/null and b/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt b/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt new file mode 100644 index 0000000..ce7f600 --- /dev/null +++ b/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt @@ -0,0 +1,5 @@ +#Truncated CommitLog test. +#This is a copy of 2.2-lz4 with the last 50 bytes deleted. +cells=6037 +hash=-889057729 +cfid=dc32ce20-360d-11e5-826c-afadad37221d http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java b/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java deleted file mode 100644 index 0ecab3c..0000000 --- a/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.cassandra.db; - -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.config.Config; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.service.CassandraDaemon; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.KillerForTests; - -public class CommitLogFailurePolicyTest -{ - @BeforeClass - public static void defineSchema() throws ConfigurationException - { - SchemaLoader.prepareServer(); - System.setProperty("cassandra.commitlog.stop_on_errors", "true"); - } - - @Test - public void testCommitFailurePolicy_stop() throws ConfigurationException - { - CassandraDaemon daemon = new CassandraDaemon(); - daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy - StorageService.instance.registerDaemon(daemon); - - // Need storage service active so stop policy can shutdown gossip - StorageService.instance.initServer(); - Assert.assertTrue(Gossiper.instance.isEnabled()); - - Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); - try - { - DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop); - CommitLog.handleCommitError("Test stop error", new Throwable()); - Assert.assertFalse(Gossiper.instance.isEnabled()); - } - finally - { - DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); - } - } - - @Test - public void testCommitFailurePolicy_die() - { - CassandraDaemon daemon = new CassandraDaemon(); - daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy - StorageService.instance.registerDaemon(daemon); - - KillerForTests killerForTests = new KillerForTests(); - JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); - Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); - try - { - DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die); - CommitLog.handleCommitError("Testing die policy", new Throwable()); - Assert.assertTrue(killerForTests.wasKilled()); - Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure - } - finally - { - DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); - JVMStabilityInspector.replaceKiller(originalKiller); - } - } - - @Test - public void testCommitFailurePolicy_ignore_beforeStartup() - { - //startup was not completed successfuly (since method completeSetup() was not called) - CassandraDaemon daemon = new CassandraDaemon(); - StorageService.instance.registerDaemon(daemon); - - KillerForTests killerForTests = new KillerForTests(); - JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); - Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); - try - { - DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore); - CommitLog.handleCommitError("Testing ignore policy", new Throwable()); - //even though policy is ignore, JVM must die because Daemon has not finished initializing - Assert.assertTrue(killerForTests.wasKilled()); - Assert.assertTrue(killerForTests.wasKilledQuietly()); //killed quietly due to startup failure - } - finally - { - DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); - JVMStabilityInspector.replaceKiller(originalKiller); - } - } - - @Test - public void testCommitFailurePolicy_ignore_afterStartup() throws Exception - { - CassandraDaemon daemon = new CassandraDaemon(); - daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy - StorageService.instance.registerDaemon(daemon); - - KillerForTests killerForTests = new KillerForTests(); - JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); - Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); - try - { - DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore); - CommitLog.handleCommitError("Testing ignore policy", new Throwable()); - //error policy is set to IGNORE, so JVM must not be killed if error ocurs after startup - Assert.assertFalse(killerForTests.wasKilled()); - } - finally - { - DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); - JVMStabilityInspector.replaceKiller(originalKiller); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/unit/org/apache/cassandra/db/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java deleted file mode 100644 index 6db29a8..0000000 --- a/test/unit/org/apache/cassandra/db/CommitLogTest.java +++ /dev/null @@ -1,467 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.cassandra.db; - -import static junit.framework.Assert.assertTrue; -import static org.apache.cassandra.utils.ByteBufferUtil.bytes; -import static org.junit.Assert.assertEquals; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.zip.CRC32; -import java.util.zip.Checksum; - -import com.google.common.collect.ImmutableMap; - -import org.apache.cassandra.db.marshal.AsciiType; -import org.apache.cassandra.db.marshal.BytesType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.Util; -import org.apache.cassandra.config.Config; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.ParameterizedClass; -import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.db.commitlog.CommitLogDescriptor; -import org.apache.cassandra.db.commitlog.CommitLogSegmentManager; -import org.apache.cassandra.db.commitlog.ReplayPosition; -import org.apache.cassandra.db.commitlog.CommitLogSegment; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.io.util.ByteBufferDataInput; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.vint.VIntCoding; -import org.apache.cassandra.service.CassandraDaemon; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.KillerForTests; - -public class CommitLogTest -{ - private static final Logger logger = LoggerFactory.getLogger(CommitLogTest.class); - - private static final String KEYSPACE1 = "CommitLogTest"; - private static final String KEYSPACE2 = "CommitLogTestNonDurable"; - private static final String STANDARD1 = "Standard1"; - private static final String STANDARD2 = "Standard2"; - - @BeforeClass - public static void defineSchema() throws ConfigurationException - { - SchemaLoader.prepareServer(); - SchemaLoader.createKeyspace(KEYSPACE1, - KeyspaceParams.simple(1), - SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance), - SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance)); - SchemaLoader.createKeyspace(KEYSPACE2, - KeyspaceParams.simpleTransient(1), - SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance), - SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance)); - System.setProperty("cassandra.commitlog.stop_on_errors", "true"); - CompactionManager.instance.disableAutoCompaction(); - } - - @Test - public void testRecoveryWithEmptyLog() throws Exception - { - CommitLog.instance.recover(new File[]{ tmpFile() }); - } - - @Test - public void testRecoveryWithShortLog() throws Exception - { - // force EOF while reading log - testRecoveryWithBadSizeArgument(100, 10); - } - - @Test - public void testRecoveryWithShortSize() throws Exception - { - testRecovery(new byte[2]); - } - - @Test - public void testRecoveryWithShortCheckSum() throws Exception - { - testRecovery(new byte[6]); - } - - @Test - public void testRecoveryWithGarbageLog() throws Exception - { - byte[] garbage = new byte[100]; - (new java.util.Random()).nextBytes(garbage); - testRecovery(garbage); - } - - @Test - public void testRecoveryWithBadSizeChecksum() throws Exception - { - Checksum checksum = new CRC32(); - checksum.update(100); - testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue()); - } - - @Test - public void testRecoveryWithZeroSegmentSizeArgument() throws Exception - { - // many different combinations of 4 bytes (garbage) will be read as zero by readInt() - testRecoveryWithBadSizeArgument(0, 10); // zero size, but no EOF - } - - @Test - public void testRecoveryWithNegativeSizeArgument() throws Exception - { - // garbage from a partial/bad flush could be read as a negative size even if there is no EOF - testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF - } - - @Test - public void testDontDeleteIfDirty() throws Exception - { - CommitLog.instance.resetUnsafe(true); - ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); - ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2); - - // Roughly 32 MB mutation - Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k") - .clustering("bytes") - .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4)) - .build(); - - // Adding it 5 times - CommitLog.instance.add(m); - CommitLog.instance.add(m); - CommitLog.instance.add(m); - CommitLog.instance.add(m); - CommitLog.instance.add(m); - - // Adding new mutation on another CF - Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k") - .clustering("bytes") - .add("val", ByteBuffer.allocate(4)) - .build(); - CommitLog.instance.add(m2); - - assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments(); - - UUID cfid2 = m2.getColumnFamilyIds().iterator().next(); - CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext()); - - // Assert we still have both our segment - assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments(); - } - - @Test - public void testDeleteIfNotDirty() throws Exception - { - DatabaseDescriptor.getCommitLogSegmentSize(); - CommitLog.instance.resetUnsafe(true); - ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); - ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2); - - // Roughly 32 MB mutation - Mutation rm = new RowUpdateBuilder(cfs1.metadata, 0, "k") - .clustering("bytes") - .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1)) - .build(); - - // Adding it twice (won't change segment) - CommitLog.instance.add(rm); - CommitLog.instance.add(rm); - - assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); - - // "Flush": this won't delete anything - UUID cfid1 = rm.getColumnFamilyIds().iterator().next(); - CommitLog.instance.sync(true); - CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext()); - - assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); - - // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created - Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k") - .clustering("bytes") - .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/2) - 200)) - .build(); - CommitLog.instance.add(rm2); - // also forces a new segment, since each entry-with-overhead is just under half the CL size - CommitLog.instance.add(rm2); - CommitLog.instance.add(rm2); - - assert CommitLog.instance.activeSegments() == 3 : "Expecting 3 segments, got " + CommitLog.instance.activeSegments(); - - - // "Flush" second cf: The first segment should be deleted since we - // didn't write anything on cf1 since last flush (and we flush cf2) - - UUID cfid2 = rm2.getColumnFamilyIds().iterator().next(); - CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext()); - - // Assert we still have both our segment - assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); - } - - private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String cfName, String colName) - { - ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(cfName); - // We don't want to allocate a size of 0 as this is optimized under the hood and our computation would - // break testEqualRecordLimit - int allocSize = 1; - Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, key) - .clustering(colName) - .add("val", ByteBuffer.allocate(allocSize)).build(); - - int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2); - max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead - - // Note that the size of the value if vint encoded. So we first compute the ovehead of the mutation without the value and it's size - int mutationOverhead = (int)Mutation.serializer.serializedSize(rm, MessagingService.current_version) - (VIntCoding.computeVIntSize(allocSize) + allocSize); - max -= mutationOverhead; - - // Now, max is the max for both the value and it's size. But we want to know how much we can allocate, i.e. the size of the value. - int sizeOfMax = VIntCoding.computeVIntSize(max); - max -= sizeOfMax; - assert VIntCoding.computeVIntSize(max) == sizeOfMax; // sanity check that we're still encoded with the size we though we would - return max; - } - - private static int getMaxRecordDataSize() - { - return getMaxRecordDataSize(KEYSPACE1, bytes("k"), STANDARD1, "bytes"); - } - - // CASSANDRA-3615 - @Test - public void testEqualRecordLimit() throws Exception - { - CommitLog.instance.resetUnsafe(true); - ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); - Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k") - .clustering("bytes") - .add("val", ByteBuffer.allocate(getMaxRecordDataSize())) - .build(); - CommitLog.instance.add(rm); - } - - @Test - public void testExceedRecordLimit() throws Exception - { - CommitLog.instance.resetUnsafe(true); - ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); - try - { - Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k") - .clustering("bytes") - .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize())) - .build(); - CommitLog.instance.add(rm); - throw new AssertionError("mutation larger than limit was accepted"); - } - catch (IllegalArgumentException e) - { - // IAE is thrown on too-large mutations - } - } - - @Test - public void testVersions() - { - Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log")); - Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log")); - Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log")); - Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log")); - Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log")); - - assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id); - - assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion()); - String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log"; - assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion()); - } - - @Test - public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException - { - boolean originalState = DatabaseDescriptor.isAutoSnapshot(); - try - { - CommitLog.instance.resetUnsafe(true); - boolean prev = DatabaseDescriptor.isAutoSnapshot(); - DatabaseDescriptor.setAutoSnapshot(false); - ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); - ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2); - - new RowUpdateBuilder(cfs1.metadata, 0, "k").clustering("bytes").add("val", ByteBuffer.allocate(100)).build().applyUnsafe(); - cfs1.truncateBlocking(); - DatabaseDescriptor.setAutoSnapshot(prev); - Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k") - .clustering("bytes") - .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4)) - .build(); - - for (int i = 0 ; i < 5 ; i++) - CommitLog.instance.add(m2); - - assertEquals(2, CommitLog.instance.activeSegments()); - ReplayPosition position = CommitLog.instance.getContext(); - for (Keyspace ks : Keyspace.system()) - for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores()) - CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position); - CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position); - assertEquals(1, CommitLog.instance.activeSegments()); - } - finally - { - DatabaseDescriptor.setAutoSnapshot(originalState); - } - } - - @Test - public void testTruncateWithoutSnapshotNonDurable() throws IOException - { - CommitLog.instance.resetUnsafe(true); - boolean originalState = DatabaseDescriptor.getAutoSnapshot(); - try - { - DatabaseDescriptor.setAutoSnapshot(false); - Keyspace notDurableKs = Keyspace.open(KEYSPACE2); - Assert.assertFalse(notDurableKs.getMetadata().params.durableWrites); - - ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1"); - new RowUpdateBuilder(cfs.metadata, 0, "key1") - .clustering("bytes").add("val", ByteBufferUtil.bytes("abcd")) - .build() - .applyUnsafe(); - - assertTrue(Util.getOnlyRow(Util.cmd(cfs).columns("val").build()) - .cells().iterator().next().value().equals(ByteBufferUtil.bytes("abcd"))); - - cfs.truncateBlocking(); - - Util.assertEmpty(Util.cmd(cfs).columns("val").build()); - } - finally - { - DatabaseDescriptor.setAutoSnapshot(originalState); - } - } - - private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException - { - ByteBuffer buf = ByteBuffer.allocate(1024); - CommitLogDescriptor.writeHeader(buf, desc); - long length = buf.position(); - // Put some extra data in the stream. - buf.putDouble(0.1); - buf.flip(); - FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0); - CommitLogDescriptor read = CommitLogDescriptor.readHeader(input); - Assert.assertEquals("Descriptor length", length, input.getFilePointer()); - Assert.assertEquals("Descriptors", desc, read); - } - - @Test - public void testDescriptorPersistence() throws IOException - { - testDescriptorPersistence(new CommitLogDescriptor(11, null)); - testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null)); - testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 15, null)); - testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 17, new ParameterizedClass("LZ4Compressor", null))); - testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 19, - new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null")))); - } - - @Test - public void testDescriptorInvalidParametersSize() throws IOException - { - Map<String, String> params = new HashMap<>(); - for (int i=0; i<65535; ++i) - params.put("key"+i, Integer.toString(i, 16)); - try { - CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, - 21, - new ParameterizedClass("LZ4Compressor", params)); - ByteBuffer buf = ByteBuffer.allocate(1024000); - CommitLogDescriptor.writeHeader(buf, desc); - Assert.fail("Parameter object too long should fail on writing descriptor."); - } catch (ConfigurationException e) - { - // correct path - } - } - - protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception - { - Checksum checksum = new CRC32(); - checksum.update(size); - testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue()); - } - - protected void testRecoveryWithBadSizeArgument(int size, int dataSize, long checksum) throws Exception - { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - DataOutputStream dout = new DataOutputStream(out); - dout.writeInt(size); - dout.writeLong(checksum); - dout.write(new byte[dataSize]); - dout.close(); - testRecovery(out.toByteArray()); - } - - protected File tmpFile() throws IOException - { - File logFile = File.createTempFile("CommitLog-" + CommitLogDescriptor.current_version + "-", ".log"); - logFile.deleteOnExit(); - assert logFile.length() == 0; - return logFile; - } - - protected void testRecovery(byte[] logData) throws Exception - { - File logFile = tmpFile(); - try (OutputStream lout = new FileOutputStream(logFile)) - { - lout.write(logData); - //statics make it annoying to test things correctly - CommitLog.instance.recover(new File[]{ logFile }); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure - } - } -} - http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java new file mode 100644 index 0000000..79f83fe --- /dev/null +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java @@ -0,0 +1,141 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.cassandra.db.commitlog; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.service.CassandraDaemon; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.KillerForTests; + +public class CommitLogFailurePolicyTest +{ + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + System.setProperty("cassandra.commitlog.stop_on_errors", "true"); + } + + @Test + public void testCommitFailurePolicy_stop() throws ConfigurationException + { + CassandraDaemon daemon = new CassandraDaemon(); + daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy + StorageService.instance.registerDaemon(daemon); + + // Need storage service active so stop policy can shutdown gossip + StorageService.instance.initServer(); + Assert.assertTrue(Gossiper.instance.isEnabled()); + + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop); + CommitLog.handleCommitError("Test stop error", new Throwable()); + Assert.assertFalse(Gossiper.instance.isEnabled()); + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + } + } + + @Test + public void testCommitFailurePolicy_die() + { + CassandraDaemon daemon = new CassandraDaemon(); + daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy + StorageService.instance.registerDaemon(daemon); + + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die); + CommitLog.handleCommitError("Testing die policy", new Throwable()); + Assert.assertTrue(killerForTests.wasKilled()); + Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + } + } + + @Test + public void testCommitFailurePolicy_ignore_beforeStartup() + { + //startup was not completed successfuly (since method completeSetup() was not called) + CassandraDaemon daemon = new CassandraDaemon(); + StorageService.instance.registerDaemon(daemon); + + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore); + CommitLog.handleCommitError("Testing ignore policy", new Throwable()); + //even though policy is ignore, JVM must die because Daemon has not finished initializing + Assert.assertTrue(killerForTests.wasKilled()); + Assert.assertTrue(killerForTests.wasKilledQuietly()); //killed quietly due to startup failure + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + } + } + + @Test + public void testCommitFailurePolicy_ignore_afterStartup() throws Exception + { + CassandraDaemon daemon = new CassandraDaemon(); + daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy + StorageService.instance.registerDaemon(daemon); + + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore); + CommitLog.handleCommitError("Testing ignore policy", new Throwable()); + //error policy is set to IGNORE, so JVM must not be killed if error ocurs after startup + Assert.assertFalse(killerForTests.wasKilled()); + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java new file mode 100644 index 0000000..b41b7b3 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@ -0,0 +1,578 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db.commitlog; + +import static junit.framework.Assert.assertTrue; +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +import com.google.common.collect.ImmutableMap; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.db.marshal.BytesType; + +import org.junit.*; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.Config.CommitFailurePolicy; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.commitlog.CommitLogDescriptor; +import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.commitlog.CommitLogSegment; +import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.util.ByteBufferDataInput; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.KillerForTests; +import org.apache.cassandra.utils.vint.VIntCoding; + +public class CommitLogTest +{ + private static final String KEYSPACE1 = "CommitLogTest"; + private static final String KEYSPACE2 = "CommitLogTestNonDurable"; + private static final String STANDARD1 = "Standard1"; + private static final String STANDARD2 = "Standard2"; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance)); + SchemaLoader.createKeyspace(KEYSPACE2, + KeyspaceParams.simpleTransient(1), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance)); + CompactionManager.instance.disableAutoCompaction(); + } + + @Test + public void testRecoveryWithEmptyLog() throws Exception + { + runExpecting(() -> { + CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.current_version) }); + return null; + }, CommitLogReplayException.class); + } + + @Test + public void testRecoveryWithEmptyLog20() throws Exception + { + CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.VERSION_20) }); + } + + @Test + public void testRecoveryWithZeroLog() throws Exception + { + testRecovery(new byte[10], null); + } + + @Test + public void testRecoveryWithShortLog() throws Exception + { + // force EOF while reading log + testRecoveryWithBadSizeArgument(100, 10); + } + + @Test + public void testRecoveryWithShortSize() throws Exception + { + runExpecting(() -> { + testRecovery(new byte[2], CommitLogDescriptor.VERSION_20); + return null; + }, CommitLogReplayException.class); + } + + @Test + public void testRecoveryWithShortCheckSum() throws Exception + { + byte[] data = new byte[8]; + data[3] = 10; // make sure this is not a legacy end marker. + testRecovery(data, CommitLogReplayException.class); + } + + @Test + public void testRecoveryWithShortMutationSize() throws Exception + { + testRecoveryWithBadSizeArgument(9, 10); + } + + private void testRecoveryWithGarbageLog() throws Exception + { + byte[] garbage = new byte[100]; + (new java.util.Random()).nextBytes(garbage); + testRecovery(garbage, CommitLogDescriptor.current_version); + } + + @Test + public void testRecoveryWithGarbageLog_fail() throws Exception + { + runExpecting(() -> { + testRecoveryWithGarbageLog(); + return null; + }, CommitLogReplayException.class); + } + + @Test + public void testRecoveryWithGarbageLog_ignoredByProperty() throws Exception + { + try { + System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true"); + testRecoveryWithGarbageLog(); + } finally { + System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY); + } + } + + @Test + public void testRecoveryWithBadSizeChecksum() throws Exception + { + Checksum checksum = new CRC32(); + checksum.update(100); + testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue()); + } + + @Test + public void testRecoveryWithNegativeSizeArgument() throws Exception + { + // garbage from a partial/bad flush could be read as a negative size even if there is no EOF + testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF + } + + @Test + public void testDontDeleteIfDirty() throws Exception + { + CommitLog.instance.resetUnsafe(true); + ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2); + + // Roughly 32 MB mutation + Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k") + .clustering("bytes") + .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4)) + .build(); + + // Adding it 5 times + CommitLog.instance.add(m); + CommitLog.instance.add(m); + CommitLog.instance.add(m); + CommitLog.instance.add(m); + CommitLog.instance.add(m); + + // Adding new mutation on another CF + Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k") + .clustering("bytes") + .add("val", ByteBuffer.allocate(4)) + .build(); + CommitLog.instance.add(m2); + + assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments(); + + UUID cfid2 = m2.getColumnFamilyIds().iterator().next(); + CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext()); + + // Assert we still have both our segment + assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments(); + } + + @Test + public void testDeleteIfNotDirty() throws Exception + { + DatabaseDescriptor.getCommitLogSegmentSize(); + CommitLog.instance.resetUnsafe(true); + ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2); + + // Roughly 32 MB mutation + Mutation rm = new RowUpdateBuilder(cfs1.metadata, 0, "k") + .clustering("bytes") + .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1)) + .build(); + + // Adding it twice (won't change segment) + CommitLog.instance.add(rm); + CommitLog.instance.add(rm); + + assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); + + // "Flush": this won't delete anything + UUID cfid1 = rm.getColumnFamilyIds().iterator().next(); + CommitLog.instance.sync(true); + CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext()); + + assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); + + // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created + Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k") + .clustering("bytes") + .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/2) - 200)) + .build(); + CommitLog.instance.add(rm2); + // also forces a new segment, since each entry-with-overhead is just under half the CL size + CommitLog.instance.add(rm2); + CommitLog.instance.add(rm2); + + assert CommitLog.instance.activeSegments() == 3 : "Expecting 3 segments, got " + CommitLog.instance.activeSegments(); + + + // "Flush" second cf: The first segment should be deleted since we + // didn't write anything on cf1 since last flush (and we flush cf2) + + UUID cfid2 = rm2.getColumnFamilyIds().iterator().next(); + CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext()); + + // Assert we still have both our segment + assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); + } + + private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String cfName, String colName) + { + ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(cfName); + // We don't want to allocate a size of 0 as this is optimized under the hood and our computation would + // break testEqualRecordLimit + int allocSize = 1; + Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, key) + .clustering(colName) + .add("val", ByteBuffer.allocate(allocSize)).build(); + + int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2); + max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead + + // Note that the size of the value if vint encoded. So we first compute the ovehead of the mutation without the value and it's size + int mutationOverhead = (int)Mutation.serializer.serializedSize(rm, MessagingService.current_version) - (VIntCoding.computeVIntSize(allocSize) + allocSize); + max -= mutationOverhead; + + // Now, max is the max for both the value and it's size. But we want to know how much we can allocate, i.e. the size of the value. + int sizeOfMax = VIntCoding.computeVIntSize(max); + max -= sizeOfMax; + assert VIntCoding.computeVIntSize(max) == sizeOfMax; // sanity check that we're still encoded with the size we though we would + return max; + } + + private static int getMaxRecordDataSize() + { + return getMaxRecordDataSize(KEYSPACE1, bytes("k"), STANDARD1, "bytes"); + } + + // CASSANDRA-3615 + @Test + public void testEqualRecordLimit() throws Exception + { + CommitLog.instance.resetUnsafe(true); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k") + .clustering("bytes") + .add("val", ByteBuffer.allocate(getMaxRecordDataSize())) + .build(); + CommitLog.instance.add(rm); + } + + @Test + public void testExceedRecordLimit() throws Exception + { + CommitLog.instance.resetUnsafe(true); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + try + { + Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k") + .clustering("bytes") + .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize())) + .build(); + CommitLog.instance.add(rm); + throw new AssertionError("mutation larger than limit was accepted"); + } + catch (IllegalArgumentException e) + { + // IAE is thrown on too-large mutations + } + } + + protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception + { + Checksum checksum = new CRC32(); + checksum.update(size); + testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue()); + } + + protected void testRecoveryWithBadSizeArgument(int size, int dataSize, long checksum) throws Exception + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dout = new DataOutputStream(out); + dout.writeInt(size); + dout.writeLong(checksum); + dout.write(new byte[dataSize]); + dout.close(); + testRecovery(out.toByteArray(), CommitLogReplayException.class); + } + + protected File tmpFile(int version) throws IOException + { + File logFile = File.createTempFile("CommitLog-" + version + "-", ".log"); + logFile.deleteOnExit(); + assert logFile.length() == 0; + return logFile; + } + + protected Void testRecovery(byte[] logData, int version) throws Exception + { + File logFile = tmpFile(version); + try (OutputStream lout = new FileOutputStream(logFile)) + { + lout.write(logData); + //statics make it annoying to test things correctly + CommitLog.instance.recover(logFile.getPath()); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/ + } + return null; + } + + protected Void testRecovery(CommitLogDescriptor desc, byte[] logData) throws Exception + { + File logFile = tmpFile(desc.version); + CommitLogDescriptor fromFile = CommitLogDescriptor.fromFileName(logFile.getName()); + // Change id to match file. + desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression); + ByteBuffer buf = ByteBuffer.allocate(1024); + CommitLogDescriptor.writeHeader(buf, desc); + try (OutputStream lout = new FileOutputStream(logFile)) + { + lout.write(buf.array(), 0, buf.position()); + lout.write(logData); + //statics make it annoying to test things correctly + CommitLog.instance.recover(logFile.getPath()); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/ + } + return null; + } + + @Test + public void testRecoveryWithIdMismatch() throws Exception + { + CommitLogDescriptor desc = new CommitLogDescriptor(4, null); + File logFile = tmpFile(desc.version); + ByteBuffer buf = ByteBuffer.allocate(1024); + CommitLogDescriptor.writeHeader(buf, desc); + try (OutputStream lout = new FileOutputStream(logFile)) + { + lout.write(buf.array(), 0, buf.position()); + + runExpecting(() -> { + CommitLog.instance.recover(logFile.getPath()); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/ + return null; + }, CommitLogReplayException.class); + } + } + + @Test + public void testRecoveryWithBadCompressor() throws Exception + { + CommitLogDescriptor desc = new CommitLogDescriptor(4, new ParameterizedClass("UnknownCompressor", null)); + runExpecting(() -> { + testRecovery(desc, new byte[0]); + return null; + }, CommitLogReplayException.class); + } + + protected void runExpecting(Callable<Void> r, Class<?> expected) + { + JVMStabilityInspector.Killer originalKiller; + KillerForTests killerForTests; + + killerForTests = new KillerForTests(); + originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + + Throwable caught = null; + try + { + r.call(); + } + catch (Throwable t) + { + if (expected != t.getClass()) + throw new AssertionError("Expected exception " + expected + ", got " + t, t); + caught = t; + } + if (expected != null && caught == null) + Assert.fail("Expected exception " + expected + " but call completed successfully."); + + JVMStabilityInspector.replaceKiller(originalKiller); + assertEquals("JVM killed", expected != null, killerForTests.wasKilled()); + } + + protected void testRecovery(final byte[] logData, Class<?> expected) throws Exception + { + runExpecting(() -> testRecovery(logData, CommitLogDescriptor.VERSION_20), expected); + runExpecting(() -> testRecovery(new CommitLogDescriptor(4, null), logData), expected); + } + + @Test + public void testVersions() + { + Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log")); + Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log")); + Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log")); + Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log")); + Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log")); + + assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id); + + assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion()); + String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log"; + assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion()); + } + + @Test + public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException + { + boolean originalState = DatabaseDescriptor.isAutoSnapshot(); + try + { + CommitLog.instance.resetUnsafe(true); + boolean prev = DatabaseDescriptor.isAutoSnapshot(); + DatabaseDescriptor.setAutoSnapshot(false); + ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2); + + new RowUpdateBuilder(cfs1.metadata, 0, "k").clustering("bytes").add("val", ByteBuffer.allocate(100)).build().applyUnsafe(); + cfs1.truncateBlocking(); + DatabaseDescriptor.setAutoSnapshot(prev); + Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k") + .clustering("bytes") + .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4)) + .build(); + + for (int i = 0 ; i < 5 ; i++) + CommitLog.instance.add(m2); + + assertEquals(2, CommitLog.instance.activeSegments()); + ReplayPosition position = CommitLog.instance.getContext(); + for (Keyspace ks : Keyspace.system()) + for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores()) + CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position); + CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position); + assertEquals(1, CommitLog.instance.activeSegments()); + } + finally + { + DatabaseDescriptor.setAutoSnapshot(originalState); + } + } + + @Test + public void testTruncateWithoutSnapshotNonDurable() throws IOException + { + CommitLog.instance.resetUnsafe(true); + boolean originalState = DatabaseDescriptor.getAutoSnapshot(); + try + { + DatabaseDescriptor.setAutoSnapshot(false); + Keyspace notDurableKs = Keyspace.open(KEYSPACE2); + Assert.assertFalse(notDurableKs.getMetadata().params.durableWrites); + + ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1"); + new RowUpdateBuilder(cfs.metadata, 0, "key1") + .clustering("bytes").add("val", ByteBufferUtil.bytes("abcd")) + .build() + .applyUnsafe(); + + assertTrue(Util.getOnlyRow(Util.cmd(cfs).columns("val").build()) + .cells().iterator().next().value().equals(ByteBufferUtil.bytes("abcd"))); + + cfs.truncateBlocking(); + + Util.assertEmpty(Util.cmd(cfs).columns("val").build()); + } + finally + { + DatabaseDescriptor.setAutoSnapshot(originalState); + } + } + + private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException + { + ByteBuffer buf = ByteBuffer.allocate(1024); + CommitLogDescriptor.writeHeader(buf, desc); + long length = buf.position(); + // Put some extra data in the stream. + buf.putDouble(0.1); + buf.flip(); + FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0); + CommitLogDescriptor read = CommitLogDescriptor.readHeader(input); + Assert.assertEquals("Descriptor length", length, input.getFilePointer()); + Assert.assertEquals("Descriptors", desc, read); + } + + @Test + public void testDescriptorPersistence() throws IOException + { + testDescriptorPersistence(new CommitLogDescriptor(11, null)); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null)); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 15, null)); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 17, new ParameterizedClass("LZ4Compressor", null))); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 19, + new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null")))); + } + + @Test + public void testDescriptorInvalidParametersSize() throws IOException + { + Map<String, String> params = new HashMap<>(); + for (int i=0; i<65535; ++i) + params.put("key"+i, Integer.toString(i, 16)); + try { + CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, + 21, + new ParameterizedClass("LZ4Compressor", params)); + ByteBuffer buf = ByteBuffer.allocate(1024000); + CommitLogDescriptor.writeHeader(buf, desc); + Assert.fail("Parameter object too long should fail on writing descriptor."); + } catch (ConfigurationException e) + { + // correct path + } + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java index 7b0ab06..00a143b 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java @@ -30,6 +30,8 @@ import junit.framework.Assert; import com.google.common.base.Predicate; +import org.junit.After; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -43,6 +45,9 @@ import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.KillerForTests; +import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException; public class CommitLogUpgradeTest { @@ -56,6 +61,24 @@ public class CommitLogUpgradeTest static final String KEYSPACE = "Keyspace1"; static final String CELLNAME = "name"; + private JVMStabilityInspector.Killer originalKiller; + private KillerForTests killerForTests; + private boolean shouldBeKilled = false; + + @Before + public void prepareToBeKilled() + { + killerForTests = new KillerForTests(); + originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + } + + @After + public void cleanUp() + { + JVMStabilityInspector.replaceKiller(originalKiller); + Assert.assertEquals("JVM killed", shouldBeKilled, killerForTests.wasKilled()); + } + @Test public void test20() throws Exception { @@ -69,6 +92,7 @@ public class CommitLogUpgradeTest } @Test + public void test22() throws Exception { testRestore(DATA_DIR + "2.2"); @@ -86,6 +110,47 @@ public class CommitLogUpgradeTest testRestore(DATA_DIR + "2.2-snappy"); } + public void test22_truncated() throws Exception + { + testRestore(DATA_DIR + "2.2-lz4-truncated"); + } + + @Test(expected = CommitLogReplayException.class) + public void test22_bitrot() throws Exception + { + shouldBeKilled = true; + testRestore(DATA_DIR + "2.2-lz4-bitrot"); + } + + @Test + public void test22_bitrot_ignored() throws Exception + { + try { + System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true"); + testRestore(DATA_DIR + "2.2-lz4-bitrot"); + } finally { + System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY); + } + } + + @Test(expected = CommitLogReplayException.class) + public void test22_bitrot2() throws Exception + { + shouldBeKilled = true; + testRestore(DATA_DIR + "2.2-lz4-bitrot2"); + } + + @Test + public void test22_bitrot2_ignored() throws Exception + { + try { + System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true"); + testRestore(DATA_DIR + "2.2-lz4-bitrot2"); + } finally { + System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY); + } + } + @BeforeClass static public void initialize() throws FileNotFoundException, IOException, InterruptedException {