Merge branch 'cassandra-2.1' into cassandra-2.2 Conflicts: src/java/org/apache/cassandra/service/CassandraDaemon.java test/unit/org/apache/cassandra/db/CommitLogTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/08715339 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/08715339 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/08715339 Branch: refs/heads/cassandra-3.0 Commit: 0871533945b19bb5ccf96ab7a28233cadd537a59 Parents: 6fd41ab 3b7934f Author: Benedict Elliott Smith <bened...@apache.org> Authored: Sun Aug 9 09:40:09 2015 +0200 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Sun Aug 9 09:40:09 2015 +0200 ---------------------------------------------------------------------- .../cassandra/service/CassandraDaemon.java | 8 + .../cassandra/utils/JVMStabilityInspector.java | 9 +- .../org/apache/cassandra/db/CommitLogTest.java | 159 ++++++++++++++++++- .../apache/cassandra/utils/KillerForTests.java | 11 +- 4 files changed, 182 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/08715339/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java index 10aa4b2,d078203..2020201 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@@ -37,11 -34,9 +37,13 @@@ import javax.management.remote.JMXConne import javax.management.remote.JMXServiceURL; import javax.management.remote.rmi.RMIConnectorServer; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistryListener; +import com.codahale.metrics.SharedMetricRegistries; + import com.google.common.annotations.VisibleForTesting; + import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.cassandra.metrics.DefaultNameFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/cassandra/blob/08715339/test/unit/org/apache/cassandra/db/CommitLogTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/CommitLogTest.java index c53d371,1c3daab..536f0cb --- a/test/unit/org/apache/cassandra/db/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java @@@ -19,74 -19,42 +19,82 @@@ package org.apache.cassandra.db; -import java.io.*; +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; + import java.util.concurrent.ExecutionException; import java.util.zip.CRC32; import java.util.zip.Checksum; +import com.google.common.collect.ImmutableMap; + import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; - import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; + import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.CommitLogDescriptor; - import org.apache.cassandra.db.commitlog.CommitLogSegment; + import org.apache.cassandra.db.commitlog.CommitLogSegmentManager; import org.apache.cassandra.db.commitlog.ReplayPosition; + import org.apache.cassandra.db.commitlog.CommitLogSegment; +import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.composites.CellNameType; import org.apache.cassandra.db.filter.NamesQueryFilter; import org.apache.cassandra.exceptions.ConfigurationException; + import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.io.util.ByteBufferDataInput; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.net.MessagingService; + import org.apache.cassandra.service.CassandraDaemon; + import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.JVMStabilityInspector; + import org.apache.cassandra.utils.KillerForTests; -import static org.apache.cassandra.utils.ByteBufferUtil.bytes; - -public class CommitLogTest extends SchemaLoader +public class CommitLogTest { + private static final String KEYSPACE1 = "CommitLogTest"; + private static final String KEYSPACE2 = "CommitLogTestNonDurable"; + private static final String CF1 = "Standard1"; + private static final String CF2 = "Standard2"; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + SimpleStrategy.class, + KSMetaData.optsWithRF(1), + SchemaLoader.standardCFMD(KEYSPACE1, CF1), + SchemaLoader.standardCFMD(KEYSPACE1, CF2)); + SchemaLoader.createKeyspace(KEYSPACE2, + false, + true, + SimpleStrategy.class, + KSMetaData.optsWithRF(1), + SchemaLoader.standardCFMD(KEYSPACE1, CF1), + SchemaLoader.standardCFMD(KEYSPACE1, CF2)); + System.setProperty("cassandra.commitlog.stop_on_errors", "true"); + CompactionManager.instance.disableAutoCompaction(); + } + @Test public void testRecoveryWithEmptyLog() throws Exception { @@@ -311,15 -279,162 +319,162 @@@ } @Test - public void testTruncateWithoutSnapshot() throws IOException + public void testCommitFailurePolicy_stop() throws ConfigurationException + { + CassandraDaemon daemon = new CassandraDaemon(); + daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy + StorageService.instance.registerDaemon(daemon); + + // Need storage service active so stop policy can shutdown gossip + StorageService.instance.initServer(); + Assert.assertTrue(Gossiper.instance.isEnabled()); + + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop); + CommitLog.handleCommitError("Test stop error", new Throwable()); + Assert.assertFalse(Gossiper.instance.isEnabled()); + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + } + } + + @Test + public void testCommitFailurePolicy_die() + { + CassandraDaemon daemon = new CassandraDaemon(); + daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy + StorageService.instance.registerDaemon(daemon); + + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die); + CommitLog.handleCommitError("Testing die policy", new Throwable()); + Assert.assertTrue(killerForTests.wasKilled()); + Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + } + } + + @Test + public void testCommitFailurePolicy_mustDieIfNotStartedUp() + { + //startup was not completed successfuly (since method completeSetup() was not called) + CassandraDaemon daemon = new CassandraDaemon(); + StorageService.instance.registerDaemon(daemon); + + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + //even though policy is ignore, JVM must die because Daemon has not finished initializing + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore); + CommitLog.handleCommitError("Testing die policy", new Throwable()); + Assert.assertTrue(killerForTests.wasKilled()); + Assert.assertTrue(killerForTests.wasKilledQuietly()); //killed quietly due to startup failure + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + } + } + + @Test + public void testCommitLogFailureBeforeInitialization_mustKillJVM() throws Exception + { + //startup was not completed successfuly (since method completeSetup() was not called) + CassandraDaemon daemon = new CassandraDaemon(); + StorageService.instance.registerDaemon(daemon); + + //let's make the commit log directory non-writable + File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation()); + commitLogDir.setWritable(false); + + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore); + + //now let's create a commit log segment manager and wait for it to fail - new CommitLogSegmentManager(); ++ new CommitLogSegmentManager(CommitLog.instance); + + //busy wait since commitlogsegmentmanager spawns another thread + int retries = 0; + while (!killerForTests.wasKilled() && retries++ < 5) + Thread.sleep(10); + + //since failure was before CassandraDaemon startup, the JVM must be killed + Assert.assertTrue(killerForTests.wasKilled()); + Assert.assertTrue(killerForTests.wasKilledQuietly()); //killed quietly due to startup failure + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + commitLogDir.setWritable(true); + } + } + + @Test + public void testCommitLogFailureAfterInitialization_mustRespectFailurePolicy() throws Exception + { + //startup was not completed successfuly (since method completeSetup() was not called) + CassandraDaemon daemon = new CassandraDaemon(); + daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy + StorageService.instance.registerDaemon(daemon); + + //let's make the commit log directory non-writable + File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation()); + commitLogDir.setWritable(false); + + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore); + + //now let's create a commit log segment manager and wait for it to fail - new CommitLogSegmentManager(); ++ new CommitLogSegmentManager(CommitLog.instance); + + //wait commit log segment manager thread to execute + Thread.sleep(50); + + //error policy is set to IGNORE, so JVM must not be killed if error ocurs after startup + Assert.assertFalse(killerForTests.wasKilled()); + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + commitLogDir.setWritable(true); + } + } + + @Test - public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException ++ public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException { - CommitLog.instance.resetUnsafe(); + CommitLog.instance.resetUnsafe(true); boolean prev = DatabaseDescriptor.isAutoSnapshot(); DatabaseDescriptor.setAutoSnapshot(false); - ColumnFamilyStore cfs1 = Keyspace.open("Keyspace1").getColumnFamilyStore("Standard1"); - ColumnFamilyStore cfs2 = Keyspace.open("Keyspace1").getColumnFamilyStore("Standard2"); + ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1"); + ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2"); - final Mutation rm1 = new Mutation("Keyspace1", bytes("k")); + final Mutation rm1 = new Mutation(KEYSPACE1, bytes("k")); rm1.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 0); rm1.apply(); cfs1.truncateBlocking();