Updated Branches: refs/heads/cassandra-2.0 55b5605b7 -> 9381b8d56
Add commit_failure_policy. Patch by belliottsmith, reviewed by marcuse for CASSANDRA-6364 CASSANDRA-6364 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9381b8d5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9381b8d5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9381b8d5 Branch: refs/heads/cassandra-2.0 Commit: 9381b8d569ae17cf2760bca266b5253e4bcd6ac2 Parents: 55b5605 Author: Marcus Eriksson <marc...@apache.org> Authored: Tue Feb 11 13:13:37 2014 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Feb 11 13:13:37 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 2 +- conf/cassandra.yaml | 8 +++++ .../org/apache/cassandra/config/Config.java | 8 +++++ .../cassandra/config/DatabaseDescriptor.java | 11 ++++++ .../BatchCommitLogExecutorService.java | 17 +++++++-- .../cassandra/db/commitlog/CommitLog.java | 24 +++++++++++++ .../db/commitlog/CommitLogAllocator.java | 37 +++++++++++++------- .../PeriodicCommitLogExecutorService.java | 26 ++++++++++++-- .../org/apache/cassandra/io/util/FileUtils.java | 20 ++--------- .../cassandra/service/StorageService.java | 19 ++++++++++ .../org/apache/cassandra/db/CommitLogTest.java | 32 +++++++++++++++++ 11 files changed, 169 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 93552ef..a8114a3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,13 +9,13 @@ * Account for range/row tombstones in tombstone drop time histogram (CASSANDRA-6522) * Stop CommitLogSegment.close() from calling sync() (CASSANDRA-6652) + * Make commitlog failure handling configurable (CASSANDRA-6364) Merged from 1.2: * Fix upgradesstables NPE for non-CF-based indexes (CASSANDRA-6645) * Fix partition and range deletes not triggering flush (CASSANDRA-6655) * Fix mean cells and mean row size per sstable calculations (CASSANDRA-6667) * Compact hints after partial replay to clean out tombstones (CASSANDRA-6666) - 2.0.5 * Reduce garbage generated by bloom filter lookups (CASSANDRA-6609) * Add ks.cf names to tombstone logging (CASSANDRA-6597) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index db924bb..bfe60c4 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -121,6 +121,14 @@ commitlog_directory: /var/lib/cassandra/commitlog # ignore: ignore fatal errors and let requests fail, as in pre-1.2 Cassandra disk_failure_policy: stop +# policy for commit disk failures: +# stop: shut down gossip and Thrift, leaving the node effectively dead, but +# can still be inspected via JMX. +# stop_commit: shutdown the commit log, letting writes collect but +# continuing to service reads, as in pre-2.0.5 Cassandra +# ignore: ignore fatal errors and let the batches fail +commit_failure_policy: stop + # Maximum size of the key cache in memory. # # Each key cache hit saves 1 seek and each row cache hit saves 2 seeks at the http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index a4e4e92..2fa49f3 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -45,6 +45,7 @@ public class Config public DiskAccessMode disk_access_mode = DiskAccessMode.auto; public DiskFailurePolicy disk_failure_policy = DiskFailurePolicy.ignore; + public CommitFailurePolicy commit_failure_policy = CommitFailurePolicy.stop; /* initial token in the ring */ public String initial_token; @@ -230,6 +231,13 @@ public class Config ignore, } + public static enum CommitFailurePolicy + { + stop, + stop_commit, + ignore, + } + public static enum RequestSchedulerId { keyspace http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index bd5db69..e1a95ab 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -194,6 +194,7 @@ public class DatabaseDescriptor } logger.info("disk_failure_policy is " + conf.disk_failure_policy); + logger.info("commit_failure_policy is " + conf.commit_failure_policy); /* Authentication and authorization backend, implementing IAuthenticator and IAuthorizer */ if (conf.authenticator != null) @@ -1082,6 +1083,16 @@ public class DatabaseDescriptor return conf.disk_failure_policy; } + public static void setCommitFailurePolicy(Config.CommitFailurePolicy policy) + { + conf.commit_failure_policy = policy; + } + + public static Config.CommitFailurePolicy getCommitFailurePolicy() + { + return conf.commit_failure_policy; + } + public static boolean isSnapshotBeforeCompaction() { return conf.snapshot_before_compaction; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java index d985f1f..9c2e2ac 100644 --- a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java +++ b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java @@ -20,12 +20,17 @@ package org.apache.cassandra.db.commitlog; import java.util.ArrayList; import java.util.concurrent.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.FSError; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService { + private final BlockingQueue<CheaterFutureTask> queue; private final Thread appendingThread; private volatile boolean run = true; @@ -44,8 +49,16 @@ class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService { while (run) { - if (processWithSyncBatch()) - completedTaskCount++; + try + { + if (processWithSyncBatch()) + completedTaskCount++; + } + catch (Throwable t) + { + if (!CommitLog.handleCommitError("Failed to persist commits to disk", t)) + return; + } } } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index e9507da..4bab83f 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -21,10 +21,13 @@ import java.io.*; import java.lang.management.ManagementFactory; import java.util.*; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import javax.management.ObjectName; +import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,9 +35,11 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; +import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.metrics.CommitLogMetrics; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; /* @@ -363,4 +368,23 @@ public class CommitLog implements CommitLogMBean return null; } } + + static boolean handleCommitError(String message, Throwable t) + { + switch (DatabaseDescriptor.getCommitFailurePolicy()) + { + case stop: + StorageService.instance.stopTransports(); + case stop_commit: + logger.error(String.format("%s. Commit disk failure policy is %s; terminating thread", message, DatabaseDescriptor.getCommitFailurePolicy()), t); + return false; + case ignore: + logger.error(message, t); + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + return true; + default: + throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy()); + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java index 706cf9e..575e3c3 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java @@ -36,6 +36,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; @@ -88,22 +89,32 @@ public class CommitLogAllocator { while (run) { - Runnable r = queue.poll(TICK_CYCLE_TIME, TimeUnit.MILLISECONDS); - - if (r != null) - { - r.run(); - } - else + try { - // no job, so we're clear to check to see if we're out of segments - // and ready a new one if needed. has the effect of ensuring there's - // almost always a segment available when it's needed. - if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments)) + + Runnable r = queue.poll(TICK_CYCLE_TIME, TimeUnit.MILLISECONDS); + + if (r != null) { - logger.debug("No segments in reserve; creating a fresh one"); - createFreshSegment(); + r.run(); } + else + { + // no job, so we're clear to check to see if we're out of segments + // and ready a new one if needed. has the effect of ensuring there's + // almost always a segment available when it's needed. + if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments)) + { + logger.debug("No segments in reserve; creating a fresh one"); + createFreshSegment(); + } + } + + } + catch (Throwable t) + { + if (!CommitLog.handleCommitError("Failed to allocate new commit log segments", t)) + return; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java index 30f33b6..00507c2 100644 --- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java +++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java @@ -25,9 +25,12 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; import com.google.common.util.concurrent.Uninterruptibles; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class PeriodicCommitLogExecutorService implements ICommitLogExecutorService { + private final BlockingQueue<Runnable> queue; protected volatile long completedTaskCount = 0; private final Thread appendingThread; @@ -69,8 +72,27 @@ class PeriodicCommitLogExecutorService implements ICommitLogExecutorService { while (run) { - FBUtilities.waitOnFuture(submit(syncer)); - Uninterruptibles.sleepUninterruptibly(DatabaseDescriptor.getCommitLogSyncPeriod(), TimeUnit.MILLISECONDS); + try + { + FBUtilities.waitOnFuture(submit(syncer)); + Uninterruptibles.sleepUninterruptibly(DatabaseDescriptor.getCommitLogSyncPeriod(), TimeUnit.MILLISECONDS); + } + catch (Throwable t) + { + if (!CommitLog.handleCommitError("Failed to persist commits to disk", t)) + { + PeriodicCommitLogExecutorService.this.run = false; + try + { + appendingThread.join(); + } + catch (InterruptedException e) + { + throw new IllegalStateException(); + } + return; + } + } } } }, "PERIODIC-COMMIT-LOG-SYNCER").start(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/io/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java index 0d8538e..e091465 100644 --- a/src/java/org/apache/cassandra/io/util/FileUtils.java +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java @@ -31,6 +31,7 @@ import java.util.Arrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.BlacklistedDirectories; import org.apache.cassandra.db.Keyspace; @@ -412,23 +413,7 @@ public class FileUtils switch (DatabaseDescriptor.getDiskFailurePolicy()) { case stop: - if (StorageService.instance.isInitialized()) - { - logger.error("Stopping gossiper"); - StorageService.instance.stopGossiping(); - } - - if (StorageService.instance.isRPCServerRunning()) - { - logger.error("Stopping RPC server"); - StorageService.instance.stopRPCServer(); - } - - if (StorageService.instance.isNativeTransportRunning()) - { - logger.error("Stopping native transport"); - StorageService.instance.stopNativeTransport(); - } + StorageService.instance.stopTransports(); break; case best_effort: // for both read and write errors mark the path as unwritable. @@ -447,4 +432,5 @@ public class FileUtils throw new IllegalStateException(); } } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index e181c44..09b93d7 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -357,6 +357,25 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return daemon.nativeServer.isRunning(); } + public void stopTransports() + { + if (isInitialized()) + { + logger.error("Stopping gossiper"); + stopGossiping(); + } + if (isRPCServerRunning()) + { + logger.error("Stopping RPC server"); + stopRPCServer(); + } + if (isNativeTransportRunning()) + { + logger.error("Stopping native transport"); + stopNativeTransport(); + } + } + private void shutdownClientServers() { stopRPCServer(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/test/unit/org/apache/cassandra/db/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java index 8e5f418..036ce15 100644 --- a/test/unit/org/apache/cassandra/db/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java @@ -22,17 +22,21 @@ package org.apache.cassandra.db; import java.io.*; import java.nio.ByteBuffer; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.zip.CRC32; import java.util.zip.Checksum; +import com.google.common.util.concurrent.Uninterruptibles; import org.junit.Assert; import org.junit.Test; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.CommitLogDescriptor; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.StorageService; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; @@ -225,4 +229,32 @@ public class CommitLogTest extends SchemaLoader String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log"; Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion()); } + + @Test + public void testCommitFailurePolicy_stop() + { + File commitDir = new File(DatabaseDescriptor.getCommitLogLocation()); + + try + { + + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop); + commitDir.setWritable(false); + RowMutation rm = new RowMutation("Keyspace1", bytes("k")); + rm.add("Standard1", bytes("c1"), ByteBuffer.allocate(100), 0); + + // Adding it twice (won't change segment) + CommitLog.instance.add(rm); + Uninterruptibles.sleepUninterruptibly((int) DatabaseDescriptor.getCommitLogSyncBatchWindow(), TimeUnit.MILLISECONDS); + Assert.assertFalse(StorageService.instance.isRPCServerRunning()); + Assert.assertFalse(StorageService.instance.isNativeTransportRunning()); + Assert.assertFalse(StorageService.instance.isInitialized()); + + } + finally + { + commitDir.setWritable(true); + } + } + }