Disable passing control to post-flush after flush failure to prevent data loss.
patch by Branimir Lambov; reviewed by Sylvain Lebresne for CASSANDRA-11828 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bd665473 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bd665473 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bd665473 Branch: refs/heads/cassandra-3.0 Commit: bd6654733dded3513c2c7acf96df2c364b0c043e Parents: bc0d1da Author: Branimir Lambov <branimir.lam...@datastax.com> Authored: Wed Aug 3 11:32:48 2016 +0300 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Aug 5 15:35:25 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 45 ++++-- .../apache/cassandra/cql3/OutOfSpaceBase.java | 87 ++++++++++ .../cassandra/cql3/OutOfSpaceDieTest.java | 68 ++++++++ .../cassandra/cql3/OutOfSpaceIgnoreTest.java | 60 +++++++ .../cassandra/cql3/OutOfSpaceStopTest.java | 63 ++++++++ .../apache/cassandra/cql3/OutOfSpaceTest.java | 157 ------------------- 7 files changed, 311 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8ecc787..1275631 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.16 + * Disable passing control to post-flush after flush failure to prevent data loss (CASSANDRA-11828) * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040) * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850) * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index b64d5de..6e82745 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -99,6 +99,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean new NamedThreadFactory("MemtablePostFlush"), "internal"); + // If a flush fails with an error the post-flush is never allowed to continue. This stores the error that caused it + // to be able to show an error on following flushes instead of blindly continuing. + private static volatile FSWriteError previousFlushFailure = null; + private static final ExecutorService reclaimExecutor = new JMXEnabledThreadPoolExecutor(1, StageManager.KEEPALIVE, TimeUnit.SECONDS, @@ -869,12 +873,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { synchronized (data) { + if (previousFlushFailure != null) + throw new IllegalStateException("A flush previously failed with the error below. To prevent data loss, " + + "no flushes can be carried out until the node is restarted.", + previousFlushFailure); logFlush(); Flush flush = new Flush(false); - flushExecutor.execute(flush); + ListenableFutureTask<?> flushTask = ListenableFutureTask.create(flush, null); + flushExecutor.submit(flushTask); ListenableFutureTask<?> task = ListenableFutureTask.create(flush.postFlush, null); postFlushExecutor.submit(task); - return task; + + @SuppressWarnings("unchecked") + ListenableFuture<?> future = Futures.allAsList(flushTask, task); + return future; } } @@ -967,7 +979,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean final OpOrder.Barrier writeBarrier; final CountDownLatch latch = new CountDownLatch(1); final ReplayPosition lastReplayPosition; - volatile FSWriteError flushFailure = null; private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition lastReplayPosition) { @@ -1010,16 +1021,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // must check lastReplayPosition != null because Flush may find that all memtables are clean // and so not set a lastReplayPosition - // If a flush errored out but the error was ignored, make sure we don't discard the commit log. - if (lastReplayPosition != null && flushFailure == null) + if (lastReplayPosition != null) { CommitLog.instance.discardCompletedSegments(metadata.cfId, lastReplayPosition); } metric.pendingFlushes.dec(); - - if (flushFailure != null) - throw flushFailure; } } @@ -1127,16 +1134,28 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable()); reclaim(memtable); } + + // signal the post-flush we've done our work + // Note: This should not be done in case of error. Read more below. + postFlush.latch.countDown(); } catch (FSWriteError e) { JVMStabilityInspector.inspectThrowable(e); - // If we weren't killed, try to continue work but do not allow CommitLog to be discarded. - postFlush.flushFailure = e; + // The call above may kill the process or the transports, or ignore the error. + // In any case we should not be passing on control to post-flush as a subsequent succeeding flush + // could mask the error and: + // - let the commit log discard unpersisted data, resulting in data loss + // - let truncations proceed, with the possibility of resurrecting the unflushed data + // - let snapshots succeed with incomplete data + + // Not passing control on means that all flushes from the moment of failure cannot complete + // (including snapshots). + // If the disk failure policy is ignore, this will cause memtables and the commit log to grow + // unboundedly until the node eventually fails. + previousFlushFailure = e; + throw e; } - - // signal the post-flush we've done our work - postFlush.latch.countDown(); } private void reclaim(final Memtable memtable) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java b/test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java new file mode 100644 index 0000000..c0023dc --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3; + +import static junit.framework.Assert.fail; + +import java.io.IOError; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +import org.junit.Assert; + +import org.apache.cassandra.db.BlacklistedDirectories; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories.DataDirectory; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.commitlog.CommitLogSegment; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.io.FSWriteError; + +/** + * Test that exceptions during flush are treated according to the disk failure policy. + * We cannot recover after a failed flush due to postFlushExecutor being stuck, so each test needs to run separately. + */ +public class OutOfSpaceBase extends CQLTester +{ + public void makeTable() throws Throwable + { + createTable("CREATE TABLE %s (a text, b text, c text, PRIMARY KEY (a, b));"); + + for (int i = 0; i < 10; i++) + execute("INSERT INTO %s (a, b, c) VALUES ('key', 'column" + i + "', null);"); + } + + public void markDirectoriesUnwriteable() + { + ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()); + try + { + for ( ; ; ) + { + DataDirectory dir = cfs.directories.getWriteableLocation(1); + BlacklistedDirectories.maybeMarkUnwritable(cfs.directories.getLocationForDisk(dir)); + } + } + catch (IOError e) + { + // Expected -- marked all directories as unwritable + } + } + + public void flushAndExpectError() throws InterruptedException, ExecutionException + { + try + { + Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable()).forceFlush().get(); + fail("FSWriteError expected."); + } + catch (ExecutionException e) + { + // Correct path. + Assert.assertTrue(e.getCause() instanceof FSWriteError); + } + + // Make sure commit log wasn't discarded. + UUID cfid = currentTableMetadata().cfId; + for (CommitLogSegment segment : CommitLog.instance.allocator.getActiveSegments()) + if (segment.getDirtyCFIDs().contains(cfid)) + return; + fail("Expected commit log to remain dirty for the affected table."); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/test/unit/org/apache/cassandra/cql3/OutOfSpaceDieTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceDieTest.java b/test/unit/org/apache/cassandra/cql3/OutOfSpaceDieTest.java new file mode 100644 index 0000000..46d71e4 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceDieTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3; + +import static junit.framework.Assert.fail; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.config.Config.DiskFailurePolicy; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.KillerForTests; + +/** + * Test that exceptions during flush are treated according to the disk failure policy. + */ +public class OutOfSpaceDieTest extends OutOfSpaceBase +{ + @Test + public void testFlushUnwriteableDie() throws Throwable + { + makeTable(); + markDirectoriesUnwriteable(); + + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy(); + try + { + DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.die); + flushAndExpectError(); + Assert.assertTrue(killerForTests.wasKilled()); + Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure + } + finally + { + DatabaseDescriptor.setDiskFailurePolicy(oldPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + } + + makeTable(); + try + { + flush(); + fail("Subsequent flushes expected to fail."); + } + catch (RuntimeException e) + { + // correct path + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/test/unit/org/apache/cassandra/cql3/OutOfSpaceIgnoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceIgnoreTest.java b/test/unit/org/apache/cassandra/cql3/OutOfSpaceIgnoreTest.java new file mode 100644 index 0000000..854de80 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceIgnoreTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3; + +import static junit.framework.Assert.fail; + +import org.junit.Test; + +import org.apache.cassandra.config.Config.DiskFailurePolicy; +import org.apache.cassandra.config.DatabaseDescriptor; + +/** + * Test that exceptions during flush are treated according to the disk failure policy. + */ +public class OutOfSpaceIgnoreTest extends OutOfSpaceBase +{ + @Test + public void testFlushUnwriteableIgnore() throws Throwable + { + makeTable(); + markDirectoriesUnwriteable(); + + DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy(); + try + { + DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.ignore); + flushAndExpectError(); + } + finally + { + DatabaseDescriptor.setDiskFailurePolicy(oldPolicy); + } + + makeTable(); + try + { + flush(); + fail("Subsequent flushes expected to fail."); + } + catch (RuntimeException e) + { + // correct path + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/test/unit/org/apache/cassandra/cql3/OutOfSpaceStopTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceStopTest.java b/test/unit/org/apache/cassandra/cql3/OutOfSpaceStopTest.java new file mode 100644 index 0000000..b48df56 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceStopTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3; + +import static junit.framework.Assert.fail; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.config.Config.DiskFailurePolicy; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.gms.Gossiper; + +/** + * Test that exceptions during flush are treated according to the disk failure policy. + */ +public class OutOfSpaceStopTest extends OutOfSpaceBase +{ + @Test + public void testFlushUnwriteableStop() throws Throwable + { + makeTable(); + markDirectoriesUnwriteable(); + + DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy(); + try + { + DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.stop); + flushAndExpectError(); + Assert.assertFalse(Gossiper.instance.isEnabled()); + } + finally + { + DatabaseDescriptor.setDiskFailurePolicy(oldPolicy); + } + + makeTable(); + try + { + flush(); + fail("Subsequent flushes expected to fail."); + } + catch (RuntimeException e) + { + // correct path + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java b/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java deleted file mode 100644 index 8304aff..0000000 --- a/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.cql3; - -import static junit.framework.Assert.fail; - -import java.io.IOError; -import java.util.UUID; -import java.util.concurrent.ExecutionException; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.cassandra.config.Config.DiskFailurePolicy; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.BlacklistedDirectories; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Directories.DataDirectory; -import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.db.commitlog.CommitLogSegment; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.KillerForTests; - -/** - * Test that TombstoneOverwhelmingException gets thrown when it should be and doesn't when it shouldn't be. - */ -public class OutOfSpaceTest extends CQLTester -{ - @Test - public void testFlushUnwriteableDie() throws Throwable - { - makeTable(); - markDirectoriesUnwriteable(); - - KillerForTests killerForTests = new KillerForTests(); - JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); - DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy(); - try - { - DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.die); - flushAndExpectError(); - Assert.assertTrue(killerForTests.wasKilled()); - Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure - } - finally - { - DatabaseDescriptor.setDiskFailurePolicy(oldPolicy); - JVMStabilityInspector.replaceKiller(originalKiller); - } - } - - @Test - public void testFlushUnwriteableStop() throws Throwable - { - makeTable(); - markDirectoriesUnwriteable(); - - DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy(); - try - { - DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.stop); - flushAndExpectError(); - Assert.assertFalse(Gossiper.instance.isEnabled()); - } - finally - { - DatabaseDescriptor.setDiskFailurePolicy(oldPolicy); - } - } - - @Test - public void testFlushUnwriteableIgnore() throws Throwable - { - makeTable(); - markDirectoriesUnwriteable(); - - DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy(); - try - { - DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.ignore); - flushAndExpectError(); - } - finally - { - DatabaseDescriptor.setDiskFailurePolicy(oldPolicy); - } - - // Next flush should succeed. - makeTable(); - flush(); - } - - public void makeTable() throws Throwable - { - createTable("CREATE TABLE %s (a text, b text, c text, PRIMARY KEY (a, b));"); - - // insert exactly the amount of tombstones that shouldn't trigger an exception - for (int i = 0; i < 10; i++) - execute("INSERT INTO %s (a, b, c) VALUES ('key', 'column" + i + "', null);"); - } - - public void markDirectoriesUnwriteable() - { - ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()); - try - { - for ( ; ; ) - { - DataDirectory dir = cfs.directories.getWriteableLocation(1); - BlacklistedDirectories.maybeMarkUnwritable(cfs.directories.getLocationForDisk(dir)); - } - } - catch (IOError e) - { - // Expected -- marked all directories as unwritable - } - } - - public void flushAndExpectError() throws InterruptedException, ExecutionException - { - try - { - Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable()).forceFlush().get(); - fail("FSWriteError expected."); - } - catch (ExecutionException e) - { - // Correct path. - Assert.assertTrue(e.getCause() instanceof FSWriteError); - } - - // Make sure commit log wasn't discarded. - UUID cfid = currentTableMetadata().cfId; - for (CommitLogSegment segment : CommitLog.instance.allocator.getActiveSegments()) - if (segment.getDirtyCFIDs().contains(cfid)) - return; - fail("Expected commit log to remain dirty for the affected table."); - } -}