Disable passing control to post-flush after flush failure to prevent
data loss.

patch by Branimir Lambov; reviewed by Sylvain Lebresne for
CASSANDRA-11828

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bd665473
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bd665473
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bd665473

Branch: refs/heads/cassandra-3.0
Commit: bd6654733dded3513c2c7acf96df2c364b0c043e
Parents: bc0d1da
Author: Branimir Lambov <branimir.lam...@datastax.com>
Authored: Wed Aug 3 11:32:48 2016 +0300
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Fri Aug 5 15:35:25 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  45 ++++--
 .../apache/cassandra/cql3/OutOfSpaceBase.java   |  87 ++++++++++
 .../cassandra/cql3/OutOfSpaceDieTest.java       |  68 ++++++++
 .../cassandra/cql3/OutOfSpaceIgnoreTest.java    |  60 +++++++
 .../cassandra/cql3/OutOfSpaceStopTest.java      |  63 ++++++++
 .../apache/cassandra/cql3/OutOfSpaceTest.java   | 157 -------------------
 7 files changed, 311 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8ecc787..1275631 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.16
+ * Disable passing control to post-flush after flush failure to prevent data 
loss (CASSANDRA-11828)
  * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
  * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)
  * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index b64d5de..6e82745 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -99,6 +99,10 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                                                                                
               new NamedThreadFactory("MemtablePostFlush"),
                                                                                
               "internal");
 
+    // If a flush fails with an error the post-flush is never allowed to 
continue. This stores the error that caused it
+    // to be able to show an error on following flushes instead of blindly 
continuing.
+    private static volatile FSWriteError previousFlushFailure = null;
+
     private static final ExecutorService reclaimExecutor = new 
JMXEnabledThreadPoolExecutor(1,
                                                                                
             StageManager.KEEPALIVE,
                                                                                
             TimeUnit.SECONDS,
@@ -869,12 +873,20 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     {
         synchronized (data)
         {
+            if (previousFlushFailure != null)
+                throw new IllegalStateException("A flush previously failed 
with the error below. To prevent data loss, "
+                                              + "no flushes can be carried out 
until the node is restarted.",
+                                                previousFlushFailure);
             logFlush();
             Flush flush = new Flush(false);
-            flushExecutor.execute(flush);
+            ListenableFutureTask<?> flushTask = 
ListenableFutureTask.create(flush, null);
+            flushExecutor.submit(flushTask);
             ListenableFutureTask<?> task = 
ListenableFutureTask.create(flush.postFlush, null);
             postFlushExecutor.submit(task);
-            return task;
+
+            @SuppressWarnings("unchecked")
+            ListenableFuture<?> future = Futures.allAsList(flushTask, task);
+            return future;
         }
     }
 
@@ -967,7 +979,6 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         final OpOrder.Barrier writeBarrier;
         final CountDownLatch latch = new CountDownLatch(1);
         final ReplayPosition lastReplayPosition;
-        volatile FSWriteError flushFailure = null;
 
         private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier 
writeBarrier, ReplayPosition lastReplayPosition)
         {
@@ -1010,16 +1021,12 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
             // must check lastReplayPosition != null because Flush may find 
that all memtables are clean
             // and so not set a lastReplayPosition
-            // If a flush errored out but the error was ignored, make sure we 
don't discard the commit log.
-            if (lastReplayPosition != null && flushFailure == null)
+            if (lastReplayPosition != null)
             {
                 CommitLog.instance.discardCompletedSegments(metadata.cfId, 
lastReplayPosition);
             }
 
             metric.pendingFlushes.dec();
-
-            if (flushFailure != null)
-                throw flushFailure;
         }
     }
 
@@ -1127,16 +1134,28 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                     
MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable());
                     reclaim(memtable);
                 }
+
+                // signal the post-flush we've done our work
+                // Note: This should not be done in case of error. Read more 
below.
+                postFlush.latch.countDown();
             }
             catch (FSWriteError e)
             {
                 JVMStabilityInspector.inspectThrowable(e);
-                // If we weren't killed, try to continue work but do not allow 
CommitLog to be discarded.
-                postFlush.flushFailure = e;
+                // The call above may kill the process or the transports, or 
ignore the error.
+                // In any case we should not be passing on control to 
post-flush as a subsequent succeeding flush
+                // could mask the error and:
+                //   - let the commit log discard unpersisted data, resulting 
in data loss
+                //   - let truncations proceed, with the possibility of 
resurrecting the unflushed data
+                //   - let snapshots succeed with incomplete data
+
+                // Not passing control on means that all flushes from the 
moment of failure cannot complete
+                // (including snapshots).
+                // If the disk failure policy is ignore, this will cause 
memtables and the commit log to grow
+                // unboundedly until the node eventually fails.
+                previousFlushFailure = e;
+                throw e;
             }
-
-            // signal the post-flush we've done our work
-            postFlush.latch.countDown();
         }
 
         private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java 
b/test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java
new file mode 100644
index 0000000..c0023dc
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import static junit.framework.Assert.fail;
+
+import java.io.IOError;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Assert;
+
+import org.apache.cassandra.db.BlacklistedDirectories;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories.DataDirectory;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.CommitLogSegment;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.io.FSWriteError;
+
+/**
+ * Test that exceptions during flush are treated according to the disk failure 
policy.
+ * We cannot recover after a failed flush due to postFlushExecutor being 
stuck, so each test needs to run separately.
+ */
+public class OutOfSpaceBase extends CQLTester
+{
+    public void makeTable() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a text, b text, c text, PRIMARY KEY (a, 
b));");
+
+        for (int i = 0; i < 10; i++)
+            execute("INSERT INTO %s (a, b, c) VALUES ('key', 'column" + i + 
"', null);");
+    }
+
+    public void markDirectoriesUnwriteable()
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(currentTable());
+        try
+        {
+            for ( ; ; )
+            {
+                DataDirectory dir = cfs.directories.getWriteableLocation(1);
+                
BlacklistedDirectories.maybeMarkUnwritable(cfs.directories.getLocationForDisk(dir));
+            }
+        }
+        catch (IOError e)
+        {
+            // Expected -- marked all directories as unwritable
+        }
+    }
+
+    public void flushAndExpectError() throws InterruptedException, 
ExecutionException
+    {
+        try
+        {
+            
Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable()).forceFlush().get();
+            fail("FSWriteError expected.");
+        }
+        catch (ExecutionException e)
+        {
+            // Correct path.
+            Assert.assertTrue(e.getCause() instanceof FSWriteError);
+        }
+
+        // Make sure commit log wasn't discarded.
+        UUID cfid = currentTableMetadata().cfId;
+        for (CommitLogSegment segment : 
CommitLog.instance.allocator.getActiveSegments())
+            if (segment.getDirtyCFIDs().contains(cfid))
+                return;
+        fail("Expected commit log to remain dirty for the affected table.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/test/unit/org/apache/cassandra/cql3/OutOfSpaceDieTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceDieTest.java 
b/test/unit/org/apache/cassandra/cql3/OutOfSpaceDieTest.java
new file mode 100644
index 0000000..46d71e4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceDieTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import static junit.framework.Assert.fail;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.Config.DiskFailurePolicy;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.KillerForTests;
+
+/**
+ * Test that exceptions during flush are treated according to the disk failure 
policy.
+ */
+public class OutOfSpaceDieTest extends OutOfSpaceBase
+{
+    @Test
+    public void testFlushUnwriteableDie() throws Throwable
+    {
+        makeTable();
+        markDirectoriesUnwriteable();
+
+        KillerForTests killerForTests = new KillerForTests();
+        JVMStabilityInspector.Killer originalKiller = 
JVMStabilityInspector.replaceKiller(killerForTests);
+        DiskFailurePolicy oldPolicy = 
DatabaseDescriptor.getDiskFailurePolicy();
+        try
+        {
+            DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.die);
+            flushAndExpectError();
+            Assert.assertTrue(killerForTests.wasKilled());
+            Assert.assertFalse(killerForTests.wasKilledQuietly()); //only 
killed quietly on startup failure
+        }
+        finally
+        {
+            DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
+            JVMStabilityInspector.replaceKiller(originalKiller);
+        }
+
+        makeTable();
+        try
+        {
+            flush();
+            fail("Subsequent flushes expected to fail.");
+        }
+        catch (RuntimeException e)
+        {
+            // correct path
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/test/unit/org/apache/cassandra/cql3/OutOfSpaceIgnoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceIgnoreTest.java 
b/test/unit/org/apache/cassandra/cql3/OutOfSpaceIgnoreTest.java
new file mode 100644
index 0000000..854de80
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceIgnoreTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import static junit.framework.Assert.fail;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.Config.DiskFailurePolicy;
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+/**
+ * Test that exceptions during flush are treated according to the disk failure 
policy.
+ */
+public class OutOfSpaceIgnoreTest extends OutOfSpaceBase
+{
+    @Test
+    public void testFlushUnwriteableIgnore() throws Throwable
+    {
+        makeTable();
+        markDirectoriesUnwriteable();
+
+        DiskFailurePolicy oldPolicy = 
DatabaseDescriptor.getDiskFailurePolicy();
+        try
+        {
+            DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.ignore);
+            flushAndExpectError();
+        }
+        finally
+        {
+            DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
+        }
+
+        makeTable();
+        try
+        {
+            flush();
+            fail("Subsequent flushes expected to fail.");
+        }
+        catch (RuntimeException e)
+        {
+            // correct path
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/test/unit/org/apache/cassandra/cql3/OutOfSpaceStopTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceStopTest.java 
b/test/unit/org/apache/cassandra/cql3/OutOfSpaceStopTest.java
new file mode 100644
index 0000000..b48df56
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceStopTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import static junit.framework.Assert.fail;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.Config.DiskFailurePolicy;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.Gossiper;
+
+/**
+ * Test that exceptions during flush are treated according to the disk failure 
policy.
+ */
+public class OutOfSpaceStopTest extends OutOfSpaceBase
+{
+    @Test
+    public void testFlushUnwriteableStop() throws Throwable
+    {
+        makeTable();
+        markDirectoriesUnwriteable();
+
+        DiskFailurePolicy oldPolicy = 
DatabaseDescriptor.getDiskFailurePolicy();
+        try
+        {
+            DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.stop);
+            flushAndExpectError();
+            Assert.assertFalse(Gossiper.instance.isEnabled());
+        }
+        finally
+        {
+            DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
+        }
+
+        makeTable();
+        try
+        {
+            flush();
+            fail("Subsequent flushes expected to fail.");
+        }
+        catch (RuntimeException e)
+        {
+            // correct path
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java 
b/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
deleted file mode 100644
index 8304aff..0000000
--- a/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.cql3;
-
-import static junit.framework.Assert.fail;
-
-import java.io.IOError;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.cassandra.config.Config.DiskFailurePolicy;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.BlacklistedDirectories;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Directories.DataDirectory;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.CommitLogSegment;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.KillerForTests;
-
-/**
- * Test that TombstoneOverwhelmingException gets thrown when it should be and 
doesn't when it shouldn't be.
- */
-public class OutOfSpaceTest extends CQLTester
-{
-    @Test
-    public void testFlushUnwriteableDie() throws Throwable
-    {
-        makeTable();
-        markDirectoriesUnwriteable();
-
-        KillerForTests killerForTests = new KillerForTests();
-        JVMStabilityInspector.Killer originalKiller = 
JVMStabilityInspector.replaceKiller(killerForTests);
-        DiskFailurePolicy oldPolicy = 
DatabaseDescriptor.getDiskFailurePolicy();
-        try
-        {
-            DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.die);
-            flushAndExpectError();
-            Assert.assertTrue(killerForTests.wasKilled());
-            Assert.assertFalse(killerForTests.wasKilledQuietly()); //only 
killed quietly on startup failure
-        }
-        finally
-        {
-            DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
-            JVMStabilityInspector.replaceKiller(originalKiller);
-        }
-    }
-
-    @Test
-    public void testFlushUnwriteableStop() throws Throwable
-    {
-        makeTable();
-        markDirectoriesUnwriteable();
-
-        DiskFailurePolicy oldPolicy = 
DatabaseDescriptor.getDiskFailurePolicy();
-        try
-        {
-            DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.stop);
-            flushAndExpectError();
-            Assert.assertFalse(Gossiper.instance.isEnabled());
-        }
-        finally
-        {
-            DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
-        }
-    }
-
-    @Test
-    public void testFlushUnwriteableIgnore() throws Throwable
-    {
-        makeTable();
-        markDirectoriesUnwriteable();
-
-        DiskFailurePolicy oldPolicy = 
DatabaseDescriptor.getDiskFailurePolicy();
-        try
-        {
-            DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.ignore);
-            flushAndExpectError();
-        }
-        finally
-        {
-            DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
-        }
-
-        // Next flush should succeed.
-        makeTable();
-        flush();
-    }
-
-    public void makeTable() throws Throwable
-    {
-        createTable("CREATE TABLE %s (a text, b text, c text, PRIMARY KEY (a, 
b));");
-
-        // insert exactly the amount of tombstones that shouldn't trigger an 
exception
-        for (int i = 0; i < 10; i++)
-            execute("INSERT INTO %s (a, b, c) VALUES ('key', 'column" + i + 
"', null);");
-    }
-
-    public void markDirectoriesUnwriteable()
-    {
-        ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(currentTable());
-        try
-        {
-            for ( ; ; )
-            {
-                DataDirectory dir = cfs.directories.getWriteableLocation(1);
-                
BlacklistedDirectories.maybeMarkUnwritable(cfs.directories.getLocationForDisk(dir));
-            }
-        }
-        catch (IOError e)
-        {
-            // Expected -- marked all directories as unwritable
-        }
-    }
-
-    public void flushAndExpectError() throws InterruptedException, 
ExecutionException
-    {
-        try
-        {
-            
Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable()).forceFlush().get();
-            fail("FSWriteError expected.");
-        }
-        catch (ExecutionException e)
-        {
-            // Correct path.
-            Assert.assertTrue(e.getCause() instanceof FSWriteError);
-        }
-
-        // Make sure commit log wasn't discarded.
-        UUID cfid = currentTableMetadata().cfId;
-        for (CommitLogSegment segment : 
CommitLog.instance.allocator.getActiveSegments())
-            if (segment.getDirtyCFIDs().contains(cfid))
-                return;
-        fail("Expected commit log to remain dirty for the affected table.");
-    }
-}

Reply via email to