Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/695065e2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/695065e2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/695065e2 Branch: refs/heads/cassandra-3.0 Commit: 695065e27a16c30019f34fc4c626a1841616d037 Parents: 45d0176 be6e6ea Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Fri Oct 7 16:51:10 2016 +0800 Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com> Committed: Fri Oct 7 16:52:01 2016 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../DebuggableScheduledThreadPoolExecutor.java | 2 +- .../concurrent/ScheduledExecutors.java | 4 - .../db/compaction/CompactionManager.java | 127 +++++++++++-------- .../db/lifecycle/LifecycleTransaction.java | 14 +- .../io/sstable/format/SSTableReader.java | 15 ++- .../apache/cassandra/net/MessagingService.java | 3 +- .../cassandra/service/StorageService.java | 12 +- .../org/apache/cassandra/utils/ExpiringMap.java | 4 +- 9 files changed, 116 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/695065e2/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 827a208,54425fa..894113a --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,19 -1,10 +1,20 @@@ -2.2.9 +3.0.10 + * Make stress use client mode to avoid checking commit log size on startup (CASSANDRA-12478) + * Fix exceptions with new vnode allocation (CASSANDRA-12715) + * Unify drain and shutdown processes (CASSANDRA-12509) + * Fix NPE in ComponentOfSlice.isEQ() (CASSANDRA-12706) + * Fix failure in LogTransactionTest (CASSANDRA-12632) + * Fix potentially incomplete non-frozen UDT values when querying with the + full primary key specified (CASSANDRA-12605) + * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670) + * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060) + * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516) + * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472) + * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499) +Merged from 2.2: + * Fix leak errors and execution rejected exceptions when draining (CASSANDRA-12457) * Fix merkle tree depth calculation (CASSANDRA-12580) * Make Collections deserialization more robust (CASSANDRA-12618) - - -2.2.8 * Fix exceptions when enabling gossip on nodes that haven't joined the ring (CASSANDRA-12253) * Fix authentication problem when invoking clqsh copy from a SOURCE command (CASSANDRA-12642) * Decrement pending range calculator jobs counter in finally block http://git-wip-us.apache.org/repos/asf/cassandra/blob/695065e2/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/695065e2/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 4d1757e,626bd27..478b896 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -171,17 -164,15 +171,15 @@@ public class CompactionManager implemen logger.trace("Scheduling a background task check for {}.{} with {}", cfs.keyspace.getName(), cfs.name, - cfs.getCompactionStrategy().getName()); + cfs.getCompactionStrategyManager().getName()); - List<Future<?>> futures = new ArrayList<>(); - // we must schedule it at least once, otherwise compaction will stop for a CF until next flush - if (executor.isShutdown()) + + List<Future<?>> futures = new ArrayList<>(1); + Future<?> fut = executor.submitIfRunning(new BackgroundCompactionCandidate(cfs), "background task"); + if (!fut.isCancelled()) { - logger.info("Executor has shut down, not submitting background task"); - return Collections.emptyList(); + compactingCF.add(cfs); + futures.add(fut); } - compactingCF.add(cfs); - futures.add(executor.submit(new BackgroundCompactionCandidate(cfs))); - return futures; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/695065e2/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java index 91515aa,a95c4a8..582c9d8 --- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java @@@ -200,14 -155,10 +206,16 @@@ public class LifecycleTransaction exten public Throwable doCommit(Throwable accumulate) { assert staged.isEmpty() : "must be no actions introduced between prepareToCommit and a commit"; - logger.trace("Committing update:{}, obsolete:{}", staged.update, staged.obsolete); + + if (logger.isTraceEnabled()) + logger.trace("Committing transaction over {} staged: {}, logged: {}", originals, staged, logged); + // accumulate must be null if we have been used correctly, so fail immediately if it is not + maybeFail(accumulate); + + // transaction log commit failure means we must abort; safe commit is not possible + maybeFail(log.commit(null)); + // this is now the point of no return; we cannot safely rollback, so we ignore exceptions until we're done // we restore state by obsoleting our obsolete files, releasing our references to them, and updating our size // and notification status for the obsolete and new files @@@ -226,12 -175,10 +234,12 @@@ public Throwable doAbort(Throwable accumulate) { if (logger.isTraceEnabled()) - logger.trace("Aborting transaction over {}, with ({},{}) logged and ({},{}) staged", originals, logged.update, logged.obsolete, staged.update, staged.obsolete); + logger.trace("Aborting transaction over {} staged: {}, logged: {}", originals, staged, logged); + accumulate = abortObsoletion(obsoletions, accumulate); + if (logged.isEmpty() && staged.isEmpty()) - return accumulate; + return log.abort(accumulate); // mark obsolete all readers that are not versions of those present in the original set Iterable<SSTableReader> obsolete = filterOut(concatUniq(staged.update, logged.update), originals); http://git-wip-us.apache.org/repos/asf/cassandra/blob/695065e2/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 9f2663e,fddf058..9f31af1 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@@ -2049,12 -2028,16 +2049,12 @@@ public abstract class SSTableReader ext private Runnable runOnClose; private boolean isReplaced = false; - // a reference to our shared per-Descriptor.Type tidy instance, that + // a reference to our shared tidy instance, that // we will release when we are ourselves released - private Ref<DescriptorTypeTidy> typeRef; - - // a convenience stashing of the shared per-descriptor-type tidy instance itself - // and the per-logical-sstable globally shared state that it is linked to - private DescriptorTypeTidy type; + private Ref<GlobalTidy> globalRef; private GlobalTidy global; - private boolean setup; + private volatile boolean setup; void setup(SSTableReader reader, boolean trackHotness) { @@@ -2108,7 -2102,10 +2118,10 @@@ dfile.close(); if (ifile != null) ifile.close(); - typeRef.release(); + globalRef.release(); + + if (logger.isTraceEnabled()) + logger.trace("Async instance tidier for {}, completed", descriptor); } }); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/695065e2/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/695065e2/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index 0be5d92,db86294..24ad73d --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -4013,143 -3933,88 +4013,149 @@@ public class StorageService extends Not */ public synchronized void drain() throws IOException, InterruptedException, ExecutionException { - inShutdownHook = true; + drain(false); + } + protected synchronized void drain(boolean isFinalShutdown) throws IOException, InterruptedException, ExecutionException + { ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION); + ExecutorService viewMutationStage = StageManager.getStage(Stage.VIEW_MUTATION); ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION); - if (mutationStage.isTerminated() && counterMutationStage.isTerminated()) + + if (mutationStage.isTerminated() + && counterMutationStage.isTerminated() + && viewMutationStage.isTerminated()) { - logger.warn("Cannot drain node (did it already happen?)"); + if (!isFinalShutdown) + logger.warn("Cannot drain node (did it already happen?)"); return; } - setMode(Mode.DRAINING, "starting drain process", true); - shutdownClientServers(); - ScheduledExecutors.optionalTasks.shutdown(); - Gossiper.instance.stop(); - setMode(Mode.DRAINING, "shutting down MessageService", false); - MessagingService.instance().shutdown(); + assert !isShutdown; + isShutdown = true; - setMode(Mode.DRAINING, "clearing mutation stage", false); - counterMutationStage.shutdown(); - mutationStage.shutdown(); - counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS); - mutationStage.awaitTermination(3600, TimeUnit.SECONDS); + try + { + setMode(Mode.DRAINING, "starting drain process", !isFinalShutdown); - StorageProxy.instance.verifyNoHintsInProgress(); + BatchlogManager.instance.shutdown(); + HintsService.instance.pauseDispatch(); - setMode(Mode.DRAINING, "flushing column families", false); - // count CFs first, since forceFlush could block for the flushWriter to get a queue slot empty - totalCFs = 0; - for (Keyspace keyspace : Keyspace.nonSystem()) - totalCFs += keyspace.getColumnFamilyStores().size(); - remainingCFs = totalCFs; - // flush - List<Future<?>> flushes = new ArrayList<>(); - for (Keyspace keyspace : Keyspace.nonSystem()) - { - for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) - flushes.add(cfs.forceFlush()); - } - // wait for the flushes. - // TODO this is a godawful way to track progress, since they flush in parallel. a long one could - // thus make several short ones "instant" if we wait for them later. - for (Future f : flushes) - { - FBUtilities.waitOnFuture(f); - remainingCFs--; - } + if (daemon != null) + shutdownClientServers(); + ScheduledExecutors.optionalTasks.shutdown(); + Gossiper.instance.stop(); - BatchlogManager.shutdown(); + if (!isFinalShutdown) + setMode(Mode.DRAINING, "shutting down MessageService", false); + + // In-progress writes originating here could generate hints to be written, so shut down MessagingService + // before mutation stage, so we can get all the hints saved before shutting down + MessagingService.instance().shutdown(); + + if (!isFinalShutdown) + setMode(Mode.DRAINING, "clearing mutation stage", false); + viewMutationStage.shutdown(); + counterMutationStage.shutdown(); + mutationStage.shutdown(); + viewMutationStage.awaitTermination(3600, TimeUnit.SECONDS); + counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS); + mutationStage.awaitTermination(3600, TimeUnit.SECONDS); + + StorageProxy.instance.verifyNoHintsInProgress(); + + if (!isFinalShutdown) + setMode(Mode.DRAINING, "flushing column families", false); + + // disable autocompaction - we don't want to start any new compactions while we are draining + for (Keyspace keyspace : Keyspace.all()) + for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) + cfs.disableAutoCompaction(); + + // count CFs first, since forceFlush could block for the flushWriter to get a queue slot empty + totalCFs = 0; + for (Keyspace keyspace : Keyspace.nonSystem()) + totalCFs += keyspace.getColumnFamilyStores().size(); + remainingCFs = totalCFs; + // flush + List<Future<?>> flushes = new ArrayList<>(); + for (Keyspace keyspace : Keyspace.nonSystem()) + { + for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) + flushes.add(cfs.forceFlush()); + } + // wait for the flushes. + // TODO this is a godawful way to track progress, since they flush in parallel. a long one could + // thus make several short ones "instant" if we wait for them later. + for (Future f : flushes) + { + try + { + FBUtilities.waitOnFuture(f); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + // don't let this stop us from shutting down the commitlog and other thread pools + logger.warn("Caught exception while waiting for memtable flushes during shutdown hook", t); + } - // Interrupt on going compaction and shutdown to prevent further compaction - CompactionManager.instance.forceShutdown(); + remainingCFs--; + } - // flush the system ones after all the rest are done, just in case flushing modifies any system state - // like CASSANDRA-5151. don't bother with progress tracking since system data is tiny. + - // Flush the system tables after all other tables are flushed, just in case flushing modifies any system state - // like CASSANDRA-5151. Don't bother with progress tracking since system data is tiny. - // Flush system tables after stopping the batchlog manager and compactions since they both modify - // system tables (for example compactions can obsolete sstables and the tidiers in SSTableReader update - // system tables, see SSTableReader.GlobalTidy) - flushes.clear(); - for (Keyspace keyspace : Keyspace.system()) - { - for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) - flushes.add(cfs.forceFlush()); - } - FBUtilities.waitOnFutures(flushes); ++ // Interrupt ongoing compactions and shutdown CM to prevent further compactions. ++ CompactionManager.instance.forceShutdown(); ++ // Flush the system tables after all other tables are flushed, just in case flushing modifies any system state ++ // like CASSANDRA-5151. Don't bother with progress tracking since system data is tiny. ++ // Flush system tables after stopping compactions since they modify ++ // system tables (for example compactions can obsolete sstables and the tidiers in SSTableReader update ++ // system tables, see SSTableReader.GlobalTidy) + flushes.clear(); + for (Keyspace keyspace : Keyspace.system()) + { + for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) + flushes.add(cfs.forceFlush()); + } + FBUtilities.waitOnFutures(flushes); + + HintsService.instance.shutdownBlocking(); - // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure - // there are no segments to replay, so we force the recycling of any remaining (should be at most one) - CommitLog.instance.forceRecycleAllSegments(); + // Interrupt ongoing compactions and shutdown CM to prevent further compactions. + CompactionManager.instance.forceShutdown(); - CommitLog.instance.shutdownBlocking(); + // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure + // there are no segments to replay, so we force the recycling of any remaining (should be at most one) + CommitLog.instance.forceRecycleAllSegments(); - // wait for miscellaneous tasks like sstable and commitlog segment deletion - ScheduledExecutors.nonPeriodicTasks.shutdown(); - if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES)) - logger.warn("Failed to wait for non periodic tasks to shutdown"); + CommitLog.instance.shutdownBlocking(); - ColumnFamilyStore.shutdownPostFlushExecutor(); + // wait for miscellaneous tasks like sstable and commitlog segment deletion + ScheduledExecutors.nonPeriodicTasks.shutdown(); + if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES)) - logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown"); ++ logger.warn("Failed to wait for non periodic tasks to shutdown"); - setMode(Mode.DRAINED, true); + ColumnFamilyStore.shutdownPostFlushExecutor(); + setMode(Mode.DRAINED, !isFinalShutdown); + } + catch (Throwable t) + { + logger.error("Caught an exception while draining ", t); + } + } + + /** + * Some services are shutdown during draining and we should not attempt to start them again. + * + * @param service - the name of the service we are trying to start. + * @throws IllegalStateException - an exception that nodetool is able to convert into a message to display to the user + */ + synchronized void checkServiceAllowedToStart(String service) + { + if (isDraining()) // when draining isShutdown is also true, so we check first to return a more accurate message + throw new IllegalStateException(String.format("Unable to start %s because the node is draining.", service)); + + if (isShutdown()) // do not rely on operationMode in case it gets changed to decomissioned or other + throw new IllegalStateException(String.format("Unable to start %s because the node was drained.", service)); } // Never ever do this at home. Used by tests.