Merge commit 'bd6654733dded3513c2c7acf96df2c364b0c043e' into cassandra-2.2

* commit 'bd6654733dded3513c2c7acf96df2c364b0c043e':
  Disable passing control to post-flush after flush failure to prevent data 
loss.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6dc1745e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6dc1745e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6dc1745e

Branch: refs/heads/cassandra-2.2
Commit: 6dc1745edd8d3861d853ee56f49ac67633a753b0
Parents: 0398521 bd66547
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Fri Aug 5 15:36:29 2016 +0200
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Fri Aug 5 15:37:11 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  67 +++++---
 .../apache/cassandra/cql3/OutOfSpaceBase.java   |  95 +++++++++++
 .../cassandra/cql3/OutOfSpaceDieTest.java       |  68 ++++++++
 .../cassandra/cql3/OutOfSpaceIgnoreTest.java    |  60 +++++++
 .../cassandra/cql3/OutOfSpaceStopTest.java      |  63 ++++++++
 .../apache/cassandra/cql3/OutOfSpaceTest.java   | 157 -------------------
 7 files changed, 336 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6dc1745e/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 87228d3,1275631..7fcf373
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,55 -1,13 +1,58 @@@
 +2.2.8
 + * Release sstables of failed stream sessions only when outgoing transfers 
are finished (CASSANDRA-11345)
 + * Revert CASSANDRA-11427 (CASSANDRA-12351)
 + * Wait for tracing events before returning response and query at same 
consistency level client side (CASSANDRA-11465)
 + * cqlsh copyutil should get host metadata by connected address 
(CASSANDRA-11979)
 + * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)
 + * Synchronize ThriftServer::stop() (CASSANDRA-12105)
 + * Use dedicated thread for JMX notifications (CASSANDRA-12146)
 + * NPE when trying to remove purgable tombstones from result (CASSANDRA-12143)
 + * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
 + * MemoryUtil.getShort() should return an unsigned short also for 
architectures not supporting unaligned memory accesses (CASSANDRA-11973)
 + * Don't write shadowed range tombstone (CASSANDRA-12030)
 +Merged from 2.1:
++=======
+ 2.1.16
+  * Disable passing control to post-flush after flush failure to prevent data 
loss (CASSANDRA-11828)
   * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
   * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)
 - * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
   * Improve digest calculation in the presence of overlapping tombstones 
(CASSANDRA-11349)
 + * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
 + * Account for partition deletions in tombstone histogram (CASSANDRA-12112)
  
  
 -2.1.15
 - * Account for partition deletions in tombstone histogram (CASSANDRA-12112)
 +2.2.7
 + * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
 + * Validate bloom_filter_fp_chance against lowest supported
 +   value when the table is created (CASSANDRA-11920)
 + * RandomAccessReader: call isEOF() only when rebuffering, not for every read 
operation (CASSANDRA-12013)
 + * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038)
 + * StorageService shutdown hook should use a volatile variable 
(CASSANDRA-11984)
 + * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
 + * Run CommitLog tests with different compression settings (CASSANDRA-9039)
 + * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
 + * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
 + * Fix possible race condition in CommitLog.recover (CASSANDRA-11743)
 + * Enable client encryption in sstableloader with cli options 
(CASSANDRA-11708)
 + * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
 + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
 + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
 + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
 + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
 + * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
 + * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
 + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
 + * Restore ability to filter on clustering columns when using a 2i 
(CASSANDRA-11510)
 + * JSON datetime formatting needs timezone (CASSANDRA-11137)
 + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
 + * Remove unnescessary file existence check during anticompaction 
(CASSANDRA-11660)
 + * Add missing files to debian packages (CASSANDRA-11642)
 + * Avoid calling Iterables::concat in loops during 
ModificationStatement::getFunctions (CASSANDRA-11621)
 + * cqlsh: COPY FROM should use regular inserts for single statement batches 
and
 +   report errors correctly if workers processes crash on initialization 
(CASSANDRA-11474)
 + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
 + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
 +Merged from 2.1:
   * Avoid stalling paxos when the paxos state expires (CASSANDRA-12043)
   * Remove finished incoming streaming connections from MessagingService 
(CASSANDRA-11854)
   * Don't try to get sstables for non-repairing column families 
(CASSANDRA-12077)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6dc1745e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d0cb200,6e82745..0835a28
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -891,12 -873,20 +895,30 @@@ public class ColumnFamilyStore implemen
      {
          synchronized (data)
          {
+             if (previousFlushFailure != null)
+                 throw new IllegalStateException("A flush previously failed 
with the error below. To prevent data loss, "
+                                               + "no flushes can be carried 
out until the node is restarted.",
+                                                 previousFlushFailure);
              logFlush();
              Flush flush = new Flush(false);
-             flushExecutor.execute(flush);
+             ListenableFutureTask<?> flushTask = 
ListenableFutureTask.create(flush, null);
+             flushExecutor.submit(flushTask);
 -            ListenableFutureTask<?> task = 
ListenableFutureTask.create(flush.postFlush, null);
 +            ListenableFutureTask<ReplayPosition> task = 
ListenableFutureTask.create(flush.postFlush);
              postFlushExecutor.submit(task);
-             return task;
+ 
+             @SuppressWarnings("unchecked")
 -            ListenableFuture<?> future = Futures.allAsList(flushTask, task);
++            ListenableFuture<ReplayPosition> future = 
++                    // If either of the two tasks errors out, resulting 
future must also error out.
++                    // Combine the two futures and only return post-flush 
result after both have completed.
++                    Futures.transform(Futures.allAsList(flushTask, task),
++                                      new Function<List<Object>, 
ReplayPosition>()
++                                      {
++                                          public ReplayPosition 
apply(List<Object> input)
++                                          {
++                                              return (ReplayPosition) 
input.get(1);
++                                          }
++                                      });
+             return future;
          }
      }
  
@@@ -999,13 -978,9 +1021,12 @@@
          final boolean flushSecondaryIndexes;
          final OpOrder.Barrier writeBarrier;
          final CountDownLatch latch = new CountDownLatch(1);
 -        final ReplayPosition lastReplayPosition;
 +        final ReplayPosition commitLogUpperBound;
 +        final List<Memtable> memtables;
 +        final List<SSTableReader> readers;
-         volatile FSWriteError flushFailure = null;
  
 -        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier 
writeBarrier, ReplayPosition lastReplayPosition)
 +        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier 
writeBarrier, ReplayPosition commitLogUpperBound,
 +                          List<Memtable> memtables, List<SSTableReader> 
readers)
          {
              this.writeBarrier = writeBarrier;
              this.flushSecondaryIndexes = flushSecondaryIndexes;
@@@ -1049,23 -1019,14 +1070,17 @@@
                  throw new IllegalStateException();
              }
  
-             // If a flush errored out but the error was ignored, make sure we 
don't discard the commit log.
-             if (flushFailure == null)
 -            // must check lastReplayPosition != null because Flush may find 
that all memtables are clean
 -            // and so not set a lastReplayPosition
 -            if (lastReplayPosition != null)
++            CommitLog.instance.discardCompletedSegments(metadata.cfId, 
commitLogUpperBound);
++            for (int i = 0 ; i < memtables.size() ; i++)
              {
-                 CommitLog.instance.discardCompletedSegments(metadata.cfId, 
commitLogUpperBound);
-                 for (int i = 0 ; i < memtables.size() ; i++)
-                 {
-                     Memtable memtable = memtables.get(i);
-                     SSTableReader reader = readers.get(i);
-                     memtable.cfs.data.permitCompactionOfFlushed(reader);
-                     
memtable.cfs.compactionStrategyWrapper.replaceFlushed(memtable, reader);
-                 }
 -                CommitLog.instance.discardCompletedSegments(metadata.cfId, 
lastReplayPosition);
++                Memtable memtable = memtables.get(i);
++                SSTableReader reader = readers.get(i);
++                memtable.cfs.data.permitCompactionOfFlushed(reader);
++                
memtable.cfs.compactionStrategyWrapper.replaceFlushed(memtable, reader);
              }
 -
              metric.pendingFlushes.dec();
 +
-             if (flushFailure != null)
-                 throw flushFailure;
 +            return commitLogUpperBound;
          }
      }
  
@@@ -1162,11 -1131,13 +1177,15 @@@
                  for (Memtable memtable : memtables)
                  {
                      // flush the memtable
 -                    
MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable());
 +                    SSTableReader reader = memtable.flush();
 +                    memtable.cfs.data.replaceFlushed(memtable, reader);
                      reclaim(memtable);
 +                    readers.add(reader);
                  }
+ 
+                 // signal the post-flush we've done our work
+                 // Note: This should not be done in case of error. Read more 
below.
+                 postFlush.latch.countDown();
              }
              catch (FSWriteError e)
              {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6dc1745e/test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java
index 0000000,c0023dc..826d6e6
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java
+++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java
@@@ -1,0 -1,87 +1,95 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.cql3;
+ 
+ import static junit.framework.Assert.fail;
+ 
+ import java.io.IOError;
+ import java.util.UUID;
+ import java.util.concurrent.ExecutionException;
+ 
++import org.junit.After;
+ import org.junit.Assert;
+ 
+ import org.apache.cassandra.db.BlacklistedDirectories;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Directories.DataDirectory;
+ import org.apache.cassandra.db.commitlog.CommitLog;
+ import org.apache.cassandra.db.commitlog.CommitLogSegment;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.io.FSWriteError;
+ 
+ /**
+  * Test that exceptions during flush are treated according to the disk 
failure policy.
+  * We cannot recover after a failed flush due to postFlushExecutor being 
stuck, so each test needs to run separately.
+  */
+ public class OutOfSpaceBase extends CQLTester
+ {
+     public void makeTable() throws Throwable
+     {
+         createTable("CREATE TABLE %s (a text, b text, c text, PRIMARY KEY (a, 
b));");
+ 
+         for (int i = 0; i < 10; i++)
+             execute("INSERT INTO %s (a, b, c) VALUES ('key', 'column" + i + 
"', null);");
+     }
+ 
+     public void markDirectoriesUnwriteable()
+     {
+         ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(currentTable());
+         try
+         {
+             for ( ; ; )
+             {
+                 DataDirectory dir = cfs.directories.getWriteableLocation(1);
+                 
BlacklistedDirectories.maybeMarkUnwritable(cfs.directories.getLocationForDisk(dir));
+             }
+         }
+         catch (IOError e)
+         {
+             // Expected -- marked all directories as unwritable
+         }
+     }
+ 
+     public void flushAndExpectError() throws InterruptedException, 
ExecutionException
+     {
+         try
+         {
+             
Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable()).forceFlush().get();
+             fail("FSWriteError expected.");
+         }
+         catch (ExecutionException e)
+         {
+             // Correct path.
+             Assert.assertTrue(e.getCause() instanceof FSWriteError);
+         }
+ 
+         // Make sure commit log wasn't discarded.
+         UUID cfid = currentTableMetadata().cfId;
+         for (CommitLogSegment segment : 
CommitLog.instance.allocator.getActiveSegments())
+             if (segment.getDirtyCFIDs().contains(cfid))
+                 return;
+         fail("Expected commit log to remain dirty for the affected table.");
+     }
++
++
++    @After
++    public void afterTest() throws Throwable
++    {
++        // Override CQLTester's afterTest method; clean-up will fail due to 
flush failing.
++    }
+ }

Reply via email to