Cleanup, scrub and upgrade may unmark compacting early (CASSANDRA-10274) If an error occured during cleanup, scrub or upgrade (or any parallelAllSSTableOperation), the caller was immediately notified of the problem, and the method exited, executing the finally block that unmarked all of the sstables as compacting.
Since the operations happen in parallel, many may still be running or waiting to run, and so another operation may operate over the same sstables, breaking the required mutual exclusivity. This patch ensures the method is not exited until all operations have completed, at which point the caller is notified of any exceptions. patch by benedict; reviewed by marcus for CASSANDRA-10274 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d769fcb3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d769fcb3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d769fcb3 Branch: refs/heads/cassandra-3.0 Commit: d769fcb397b6c5937561194b9e8f9dd596ffcd18 Parents: 0c0f1ff Author: Benedict Elliott Smith <bened...@apache.org> Authored: Mon Sep 7 12:23:57 2015 +0100 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Tue Sep 8 13:39:56 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 6 +++ .../db/compaction/CompactionManager.java | 51 +++++--------------- .../org/apache/cassandra/utils/FBUtilities.java | 19 ++++++-- .../org/apache/cassandra/utils/Throwables.java | 16 ++++++ 4 files changed, 51 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d769fcb3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5dffb9b..fdba8ed 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,4 @@ +<<<<<<< HEAD 2.2.2 * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209) * Retry snapshot deletion after compaction and gc on Windows (CASSANDRA-10222) @@ -6,6 +7,11 @@ * Fall back to 1/4 commitlog volume for commitlog_total_space on small disks (CASSANDRA-10199) Merged from 2.1: +======= +2.1.10 + * Scrub, Cleanup and Upgrade do not unmark compacting until all operations + have completed, regardless of the occurence of exceptions (CASSANDRA-10274) +>>>>>>> 04e789b... Cleanup, scrub and upgrade may unmark compacting early (CASSANDRA-10274) * Fix handling of streaming EOF (CASSANDRA-10206) * Only check KeyCache when it is enabled * Change streaming_socket_timeout_in_ms default to 1 hour (CASSANDRA-8611) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d769fcb3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 5e1b31c..495c5ab 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -41,8 +41,6 @@ import javax.management.ObjectName; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; -import com.google.common.base.Predicate; -import com.google.common.base.Throwables; import com.google.common.collect.*; import com.google.common.util.concurrent.*; import org.slf4j.Logger; @@ -71,13 +69,7 @@ import org.apache.cassandra.metrics.CompactionMetrics; import org.apache.cassandra.repair.Validator; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.CloseableIterator; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.MerkleTree; -import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.WrappedRunnable; -import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.Refs; @@ -246,6 +238,7 @@ public class CompactionManager implements CompactionManagerMBean @SuppressWarnings("resource") private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, OperationType operationType) throws ExecutionException, InterruptedException { + List<LifecycleTransaction> transactions = new ArrayList<>(); try (LifecycleTransaction compacting = cfs.markAllCompacting(operationType);) { Iterable<SSTableReader> sstables = Lists.newArrayList(operation.filterSSTables(compacting)); @@ -255,7 +248,7 @@ public class CompactionManager implements CompactionManagerMBean return AllSSTableOpStatus.SUCCESSFUL; } - List<Pair<LifecycleTransaction,Future<Object>>> futures = new ArrayList<>(); + List<Future<Object>> futures = new ArrayList<>(); for (final SSTableReader sstable : sstables) { @@ -266,7 +259,8 @@ public class CompactionManager implements CompactionManagerMBean } final LifecycleTransaction txn = compacting.split(singleton(sstable)); - futures.add(Pair.create(txn,executor.submit(new Callable<Object>() + transactions.add(txn); + futures.add(executor.submit(new Callable<Object>() { @Override public Object call() throws Exception @@ -274,39 +268,20 @@ public class CompactionManager implements CompactionManagerMBean operation.execute(txn); return this; } - }))); + })); } assert compacting.originals().isEmpty(); - - //Collect all exceptions - Exception exception = null; - - for (Pair<LifecycleTransaction, Future<Object>> f : futures) - { - try - { - f.right.get(); - } - catch (InterruptedException | ExecutionException e) - { - if (exception == null) - exception = new Exception(); - - exception.addSuppressed(e); - } - finally - { - f.left.close(); - } - } - - if (exception != null) - Throwables.propagate(exception); - + FBUtilities.waitOnFutures(futures); return AllSSTableOpStatus.SUCCESSFUL; } + finally + { + Throwable fail = Throwables.close(null, transactions); + if (fail != null) + logger.error("Failed to cleanup lifecycle transactions {}", fail); + } } private static interface OneSSTableOperation http://git-wip-us.apache.org/repos/asf/cassandra/blob/d769fcb3/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index ce118b9..b41bdab 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -334,10 +334,23 @@ public class FBUtilities return System.currentTimeMillis() * 1000; } - public static void waitOnFutures(Iterable<Future<?>> futures) + public static <T> List<T> waitOnFutures(Iterable<? extends Future<? extends T>> futures) { - for (Future f : futures) - waitOnFuture(f); + List<T> results = new ArrayList<>(); + Throwable fail = null; + for (Future<? extends T> f : futures) + { + try + { + results.add(f.get()); + } + catch (InterruptedException | ExecutionException e) + { + fail = Throwables.merge(fail, e); + } + } + Throwables.maybeFail(fail); + return results; } public static <T> T waitOnFuture(Future<T> future) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d769fcb3/src/java/org/apache/cassandra/utils/Throwables.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java index 0a2bd28..a895f31 100644 --- a/src/java/org/apache/cassandra/utils/Throwables.java +++ b/src/java/org/apache/cassandra/utils/Throwables.java @@ -34,4 +34,20 @@ public class Throwables if (fail != null) com.google.common.base.Throwables.propagate(fail); } + + public static Throwable close(Throwable accumulate, Iterable<? extends AutoCloseable> closeables) + { + for (AutoCloseable closeable : closeables) + { + try + { + closeable.close(); + } + catch (Throwable t) + { + accumulate = merge(accumulate, t); + } + } + return accumulate; + } }