Merge commit 'bd6654733dded3513c2c7acf96df2c364b0c043e' into cassandra-2.2 * commit 'bd6654733dded3513c2c7acf96df2c364b0c043e': Disable passing control to post-flush after flush failure to prevent data loss.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6dc1745e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6dc1745e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6dc1745e Branch: refs/heads/cassandra-2.2 Commit: 6dc1745edd8d3861d853ee56f49ac67633a753b0 Parents: 0398521 bd66547 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Fri Aug 5 15:36:29 2016 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Aug 5 15:37:11 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../apache/cassandra/db/ColumnFamilyStore.java | 67 +++++--- .../apache/cassandra/cql3/OutOfSpaceBase.java | 95 +++++++++++ .../cassandra/cql3/OutOfSpaceDieTest.java | 68 ++++++++ .../cassandra/cql3/OutOfSpaceIgnoreTest.java | 60 +++++++ .../cassandra/cql3/OutOfSpaceStopTest.java | 63 ++++++++ .../apache/cassandra/cql3/OutOfSpaceTest.java | 157 ------------------- 7 files changed, 336 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6dc1745e/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 87228d3,1275631..7fcf373 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,55 -1,13 +1,58 @@@ +2.2.8 + * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345) + * Revert CASSANDRA-11427 (CASSANDRA-12351) + * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465) + * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979) + * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214) + * Synchronize ThriftServer::stop() (CASSANDRA-12105) + * Use dedicated thread for JMX notifications (CASSANDRA-12146) + * NPE when trying to remove purgable tombstones from result (CASSANDRA-12143) + * Improve streaming synchronization and fault tolerance (CASSANDRA-11414) + * MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973) + * Don't write shadowed range tombstone (CASSANDRA-12030) +Merged from 2.1: ++======= + 2.1.16 + * Disable passing control to post-flush after flush failure to prevent data loss (CASSANDRA-11828) * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040) * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850) - * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907) * Improve digest calculation in the presence of overlapping tombstones (CASSANDRA-11349) + * Fix filtering on clustering columns when 2i is used (CASSANDRA-11907) + * Account for partition deletions in tombstone histogram (CASSANDRA-12112) -2.1.15 - * Account for partition deletions in tombstone histogram (CASSANDRA-12112) +2.2.7 + * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755) + * Validate bloom_filter_fp_chance against lowest supported + value when the table is created (CASSANDRA-11920) + * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013) + * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038) + * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984) + * Persist local metadata earlier in startup sequence (CASSANDRA-11742) + * Run CommitLog tests with different compression settings (CASSANDRA-9039) + * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664) + * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587) + * Fix possible race condition in CommitLog.recover (CASSANDRA-11743) + * Enable client encryption in sstableloader with cli options (CASSANDRA-11708) + * Possible memory leak in NIODataInputStream (CASSANDRA-11867) + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669) + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753) + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395) + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626) + * Exit JVM if JMX server fails to startup (CASSANDRA-11540) + * Produce a heap dump when exiting on OOM (CASSANDRA-9861) + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427) + * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510) + * JSON datetime formatting needs timezone (CASSANDRA-11137) + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502) + * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660) + * Add missing files to debian packages (CASSANDRA-11642) + * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621) + * cqlsh: COPY FROM should use regular inserts for single statement batches and + report errors correctly if workers processes crash on initialization (CASSANDRA-11474) + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553) + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988) +Merged from 2.1: * Avoid stalling paxos when the paxos state expires (CASSANDRA-12043) * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854) * Don't try to get sstables for non-repairing column families (CASSANDRA-12077) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6dc1745e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index d0cb200,6e82745..0835a28 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -891,12 -873,20 +895,30 @@@ public class ColumnFamilyStore implemen { synchronized (data) { + if (previousFlushFailure != null) + throw new IllegalStateException("A flush previously failed with the error below. To prevent data loss, " + + "no flushes can be carried out until the node is restarted.", + previousFlushFailure); logFlush(); Flush flush = new Flush(false); - flushExecutor.execute(flush); + ListenableFutureTask<?> flushTask = ListenableFutureTask.create(flush, null); + flushExecutor.submit(flushTask); - ListenableFutureTask<?> task = ListenableFutureTask.create(flush.postFlush, null); + ListenableFutureTask<ReplayPosition> task = ListenableFutureTask.create(flush.postFlush); postFlushExecutor.submit(task); - return task; + + @SuppressWarnings("unchecked") - ListenableFuture<?> future = Futures.allAsList(flushTask, task); ++ ListenableFuture<ReplayPosition> future = ++ // If either of the two tasks errors out, resulting future must also error out. ++ // Combine the two futures and only return post-flush result after both have completed. ++ Futures.transform(Futures.allAsList(flushTask, task), ++ new Function<List<Object>, ReplayPosition>() ++ { ++ public ReplayPosition apply(List<Object> input) ++ { ++ return (ReplayPosition) input.get(1); ++ } ++ }); + return future; } } @@@ -999,13 -978,9 +1021,12 @@@ final boolean flushSecondaryIndexes; final OpOrder.Barrier writeBarrier; final CountDownLatch latch = new CountDownLatch(1); - final ReplayPosition lastReplayPosition; + final ReplayPosition commitLogUpperBound; + final List<Memtable> memtables; + final List<SSTableReader> readers; - volatile FSWriteError flushFailure = null; - private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition lastReplayPosition) + private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition commitLogUpperBound, + List<Memtable> memtables, List<SSTableReader> readers) { this.writeBarrier = writeBarrier; this.flushSecondaryIndexes = flushSecondaryIndexes; @@@ -1049,23 -1019,14 +1070,17 @@@ throw new IllegalStateException(); } - // If a flush errored out but the error was ignored, make sure we don't discard the commit log. - if (flushFailure == null) - // must check lastReplayPosition != null because Flush may find that all memtables are clean - // and so not set a lastReplayPosition - if (lastReplayPosition != null) ++ CommitLog.instance.discardCompletedSegments(metadata.cfId, commitLogUpperBound); ++ for (int i = 0 ; i < memtables.size() ; i++) { - CommitLog.instance.discardCompletedSegments(metadata.cfId, commitLogUpperBound); - for (int i = 0 ; i < memtables.size() ; i++) - { - Memtable memtable = memtables.get(i); - SSTableReader reader = readers.get(i); - memtable.cfs.data.permitCompactionOfFlushed(reader); - memtable.cfs.compactionStrategyWrapper.replaceFlushed(memtable, reader); - } - CommitLog.instance.discardCompletedSegments(metadata.cfId, lastReplayPosition); ++ Memtable memtable = memtables.get(i); ++ SSTableReader reader = readers.get(i); ++ memtable.cfs.data.permitCompactionOfFlushed(reader); ++ memtable.cfs.compactionStrategyWrapper.replaceFlushed(memtable, reader); } - metric.pendingFlushes.dec(); + - if (flushFailure != null) - throw flushFailure; + return commitLogUpperBound; } } @@@ -1162,11 -1131,13 +1177,15 @@@ for (Memtable memtable : memtables) { // flush the memtable - MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable()); + SSTableReader reader = memtable.flush(); + memtable.cfs.data.replaceFlushed(memtable, reader); reclaim(memtable); + readers.add(reader); } + + // signal the post-flush we've done our work + // Note: This should not be done in case of error. Read more below. + postFlush.latch.countDown(); } catch (FSWriteError e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6dc1745e/test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java index 0000000,c0023dc..826d6e6 mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java +++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java @@@ -1,0 -1,87 +1,95 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.cql3; + + import static junit.framework.Assert.fail; + + import java.io.IOError; + import java.util.UUID; + import java.util.concurrent.ExecutionException; + ++import org.junit.After; + import org.junit.Assert; + + import org.apache.cassandra.db.BlacklistedDirectories; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.Directories.DataDirectory; + import org.apache.cassandra.db.commitlog.CommitLog; + import org.apache.cassandra.db.commitlog.CommitLogSegment; + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.io.FSWriteError; + + /** + * Test that exceptions during flush are treated according to the disk failure policy. + * We cannot recover after a failed flush due to postFlushExecutor being stuck, so each test needs to run separately. + */ + public class OutOfSpaceBase extends CQLTester + { + public void makeTable() throws Throwable + { + createTable("CREATE TABLE %s (a text, b text, c text, PRIMARY KEY (a, b));"); + + for (int i = 0; i < 10; i++) + execute("INSERT INTO %s (a, b, c) VALUES ('key', 'column" + i + "', null);"); + } + + public void markDirectoriesUnwriteable() + { + ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()); + try + { + for ( ; ; ) + { + DataDirectory dir = cfs.directories.getWriteableLocation(1); + BlacklistedDirectories.maybeMarkUnwritable(cfs.directories.getLocationForDisk(dir)); + } + } + catch (IOError e) + { + // Expected -- marked all directories as unwritable + } + } + + public void flushAndExpectError() throws InterruptedException, ExecutionException + { + try + { + Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable()).forceFlush().get(); + fail("FSWriteError expected."); + } + catch (ExecutionException e) + { + // Correct path. + Assert.assertTrue(e.getCause() instanceof FSWriteError); + } + + // Make sure commit log wasn't discarded. + UUID cfid = currentTableMetadata().cfId; + for (CommitLogSegment segment : CommitLog.instance.allocator.getActiveSegments()) + if (segment.getDirtyCFIDs().contains(cfid)) + return; + fail("Expected commit log to remain dirty for the affected table."); + } ++ ++ ++ @After ++ public void afterTest() throws Throwable ++ { ++ // Override CQLTester's afterTest method; clean-up will fail due to flush failing. ++ } + }