Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 79a16e5e9 -> 5115c106d refs/heads/cassandra-3.X c089b2697 -> 2e18adf25 refs/heads/trunk dbd783917 -> 723d055d8
Unify drain and shutdown processes Patch by Alex Petrov; reviewed by Stefania Alborghetti for CASSANDRA-12509 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5115c106 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5115c106 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5115c106 Branch: refs/heads/cassandra-3.0 Commit: 5115c106db198e684b47c614b237925c45c71da8 Parents: 79a16e5 Author: Alex Petrov <oleksandr.pet...@gmail.com> Authored: Fri Sep 30 10:00:38 2016 +0800 Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com> Committed: Tue Oct 4 09:51:51 2016 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/logback.xml | 3 +- .../DebuggableScheduledThreadPoolExecutor.java | 2 +- .../apache/cassandra/service/StorageProxy.java | 8 +- .../cassandra/service/StorageService.java | 285 ++++++++++--------- .../cassandra/service/StorageServiceMBean.java | 2 + .../org/apache/cassandra/tools/NodeProbe.java | 10 + .../cql3/validation/entities/UFTest.java | 4 +- 8 files changed, 178 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5115c106/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index cbf9ab1..21eec4b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.10 + * Unify drain and shutdown processes (CASSANDRA-12509) * Fix NPE in ComponentOfSlice.isEQ() (CASSANDRA-12706) * Fix failure in LogTransactionTest (CASSANDRA-12632) * Fix potentially incomplete non-frozen UDT values when querying with the http://git-wip-us.apache.org/repos/asf/cassandra/blob/5115c106/conf/logback.xml ---------------------------------------------------------------------- diff --git a/conf/logback.xml b/conf/logback.xml index a47740d..9f1e49a 100644 --- a/conf/logback.xml +++ b/conf/logback.xml @@ -24,7 +24,8 @@ appender reference in the root level section below. <configuration scan="true"> <jmxConfigurator /> - <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/> + + <!-- No shutdown hook; we run it ourselves in StorageService after shutdown --> <!-- SYSTEMLOG rolling file appender to system.log (INFO level) --> http://git-wip-us.apache.org/repos/asf/cassandra/blob/5115c106/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java index a722b87..49d8510 100644 --- a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java @@ -47,7 +47,7 @@ public class DebuggableScheduledThreadPoolExecutor extends ScheduledThreadPoolEx { if (executor.isShutdown()) { - if (!StorageService.instance.isInShutdownHook()) + if (!StorageService.instance.isShutdown()) throw new RejectedExecutionException("ScheduledThreadPoolExecutor has shut down."); //Give some notification to the caller the task isn't going to run http://git-wip-us.apache.org/repos/asf/cassandra/blob/5115c106/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 8a151f2..cffd63c 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -2302,7 +2302,13 @@ public class StorageProxy implements StorageProxyMBean public void setHintedHandoffEnabled(boolean b) { - DatabaseDescriptor.setHintedHandoffEnabled(b); + synchronized (StorageService.instance) + { + if (b) + StorageService.instance.checkServiceAllowedToStart("hinted handoff"); + + DatabaseDescriptor.setHintedHandoffEnabled(b); + } } public void enableHintsForDC(String dc) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5115c106/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 15a0146..0be5d92 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -44,6 +44,7 @@ import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.jmx.JMXConfiguratorMBean; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.Appender; +import ch.qos.logback.core.hook.DelayingShutdownHook; import org.apache.cassandra.auth.AuthKeyspace; import org.apache.cassandra.auth.AuthMigrationListener; import org.apache.cassandra.batchlog.BatchRemoveVerbHandler; @@ -132,13 +133,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public volatile VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(tokenMetadata.partitioner); private Thread drainOnShutdown = null; - private volatile boolean inShutdownHook = false; + private volatile boolean isShutdown = false; public static final StorageService instance = new StorageService(); + @Deprecated public boolean isInShutdownHook() { - return inShutdownHook; + return isShutdown(); + } + + public boolean isShutdown() + { + return isShutdown; } public Collection<Range<Token>> getLocalRanges(String keyspaceName) @@ -176,7 +183,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private double traceProbability = 0.0; private static enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED } - private Mode operationMode = Mode.STARTING; + private volatile Mode operationMode = Mode.STARTING; /* Used for tracking drain progress */ private volatile int totalCFs, remainingCFs; @@ -301,10 +308,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } // should only be called via JMX - public void startGossiping() + public synchronized void startGossiping() { if (!initialized) { + checkServiceAllowedToStart("gossip"); + logger.warn("Starting gossip by operator request"); Collection<Token> tokens = SystemKeyspace.getSavedTokens(); @@ -330,8 +339,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } // should only be called via JMX - public void startRPCServer() + public synchronized void startRPCServer() { + checkServiceAllowedToStart("thrift"); + if (daemon == null) { throw new IllegalStateException("No configured daemon"); @@ -358,8 +369,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return daemon.thriftServer.isRunning(); } - public void startNativeTransport() + public synchronized void startNativeTransport() { + checkServiceAllowedToStart("native transport"); + if (daemon == null) { throw new IllegalStateException("No configured daemon"); @@ -591,65 +604,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE drainOnShutdown = new Thread(new WrappedRunnable() { @Override - public void runMayThrow() throws InterruptedException + public void runMayThrow() throws InterruptedException, ExecutionException, IOException { - inShutdownHook = true; - ExecutorService viewMutationStage = StageManager.getStage(Stage.VIEW_MUTATION); - ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION); - ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION); - if (mutationStage.isShutdown() - && counterMutationStage.isShutdown() - && viewMutationStage.isShutdown()) - return; // drained already - - if (daemon != null) - shutdownClientServers(); - ScheduledExecutors.optionalTasks.shutdown(); - Gossiper.instance.stop(); - - // In-progress writes originating here could generate hints to be written, so shut down MessagingService - // before mutation stage, so we can get all the hints saved before shutting down - MessagingService.instance().shutdown(); - viewMutationStage.shutdown(); - BatchlogManager.instance.shutdown(); - HintsService.instance.pauseDispatch(); - counterMutationStage.shutdown(); - mutationStage.shutdown(); - viewMutationStage.awaitTermination(3600, TimeUnit.SECONDS); - counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS); - mutationStage.awaitTermination(3600, TimeUnit.SECONDS); - StorageProxy.instance.verifyNoHintsInProgress(); - - List<Future<?>> flushes = new ArrayList<>(); - for (Keyspace keyspace : Keyspace.all()) - { - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace.getName()); - if (!ksm.params.durableWrites) - for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) - flushes.add(cfs.forceFlush()); - } - try - { - FBUtilities.waitOnFutures(flushes); - } - catch (Throwable t) - { - JVMStabilityInspector.inspectThrowable(t); - // don't let this stop us from shutting down the commitlog and other thread pools - logger.warn("Caught exception while waiting for memtable flushes during shutdown hook", t); - } - - CommitLog.instance.shutdownBlocking(); + drain(true); if (FBUtilities.isWindows()) WindowsTimer.endTimerPeriod(DatabaseDescriptor.getWindowsTimerInterval()); - HintsService.instance.shutdownBlocking(); - - // wait for miscellaneous tasks like sstable and commitlog segment deletion - ScheduledExecutors.nonPeriodicTasks.shutdown(); - if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES)) - logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown"); + // Cleanup logback + DelayingShutdownHook logbackHook = new DelayingShutdownHook(); + logbackHook.setContext((LoggerContext)LoggerFactory.getILoggerFactory()); + logbackHook.run(); } }, "StorageServiceShutdownHook"); Runtime.getRuntime().addShutdownHook(drainOnShutdown); @@ -1079,7 +1044,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void rebuild(String sourceDc) { - // check on going rebuild + // check ongoing rebuild if (!isRebuilding.compareAndSet(false, true)) { throw new IllegalStateException("Node is still rebuilding. Check nodetool netstats."); @@ -4028,6 +3993,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return operationMode == Mode.JOINING; } + public boolean isDrained() + { + return operationMode == Mode.DRAINED; + } + + public boolean isDraining() + { + return operationMode == Mode.DRAINING; + } + public String getDrainProgress() { return String.format("Drained %s/%s ColumnFamilies", remainingCFs, totalCFs); @@ -4035,102 +4010,146 @@ public class StorageService extends NotificationBroadcasterSupport implements IE /** * Shuts node off to writes, empties memtables and the commit log. - * There are two differences between drain and the normal shutdown hook: - * - Drain waits for in-progress streaming to complete - * - Drain flushes *all* columnfamilies (shutdown hook only flushes non-durable CFs) */ public synchronized void drain() throws IOException, InterruptedException, ExecutionException { - inShutdownHook = true; - - BatchlogManager.instance.shutdown(); - - HintsService.instance.pauseDispatch(); + drain(false); + } + protected synchronized void drain(boolean isFinalShutdown) throws IOException, InterruptedException, ExecutionException + { ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION); ExecutorService viewMutationStage = StageManager.getStage(Stage.VIEW_MUTATION); ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION); + if (mutationStage.isTerminated() && counterMutationStage.isTerminated() && viewMutationStage.isTerminated()) { - logger.warn("Cannot drain node (did it already happen?)"); + if (!isFinalShutdown) + logger.warn("Cannot drain node (did it already happen?)"); return; } - setMode(Mode.DRAINING, "starting drain process", true); - shutdownClientServers(); - ScheduledExecutors.optionalTasks.shutdown(); - Gossiper.instance.stop(); - setMode(Mode.DRAINING, "shutting down MessageService", false); - MessagingService.instance().shutdown(); + assert !isShutdown; + isShutdown = true; + + try + { + setMode(Mode.DRAINING, "starting drain process", !isFinalShutdown); - setMode(Mode.DRAINING, "clearing mutation stage", false); - viewMutationStage.shutdown(); - counterMutationStage.shutdown(); - mutationStage.shutdown(); - viewMutationStage.awaitTermination(3600, TimeUnit.SECONDS); - counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS); - mutationStage.awaitTermination(3600, TimeUnit.SECONDS); + BatchlogManager.instance.shutdown(); + HintsService.instance.pauseDispatch(); - StorageProxy.instance.verifyNoHintsInProgress(); + if (daemon != null) + shutdownClientServers(); + ScheduledExecutors.optionalTasks.shutdown(); + Gossiper.instance.stop(); - setMode(Mode.DRAINING, "flushing column families", false); + if (!isFinalShutdown) + setMode(Mode.DRAINING, "shutting down MessageService", false); + + // In-progress writes originating here could generate hints to be written, so shut down MessagingService + // before mutation stage, so we can get all the hints saved before shutting down + MessagingService.instance().shutdown(); + + if (!isFinalShutdown) + setMode(Mode.DRAINING, "clearing mutation stage", false); + viewMutationStage.shutdown(); + counterMutationStage.shutdown(); + mutationStage.shutdown(); + viewMutationStage.awaitTermination(3600, TimeUnit.SECONDS); + counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS); + mutationStage.awaitTermination(3600, TimeUnit.SECONDS); + + StorageProxy.instance.verifyNoHintsInProgress(); + + if (!isFinalShutdown) + setMode(Mode.DRAINING, "flushing column families", false); + + // disable autocompaction - we don't want to start any new compactions while we are draining + for (Keyspace keyspace : Keyspace.all()) + for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) + cfs.disableAutoCompaction(); + + // count CFs first, since forceFlush could block for the flushWriter to get a queue slot empty + totalCFs = 0; + for (Keyspace keyspace : Keyspace.nonSystem()) + totalCFs += keyspace.getColumnFamilyStores().size(); + remainingCFs = totalCFs; + // flush + List<Future<?>> flushes = new ArrayList<>(); + for (Keyspace keyspace : Keyspace.nonSystem()) + { + for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) + flushes.add(cfs.forceFlush()); + } + // wait for the flushes. + // TODO this is a godawful way to track progress, since they flush in parallel. a long one could + // thus make several short ones "instant" if we wait for them later. + for (Future f : flushes) + { + try + { + FBUtilities.waitOnFuture(f); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + // don't let this stop us from shutting down the commitlog and other thread pools + logger.warn("Caught exception while waiting for memtable flushes during shutdown hook", t); + } - // disable autocompaction - we don't want to start any new compactions while we are draining - for (Keyspace keyspace : Keyspace.all()) - for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) - cfs.disableAutoCompaction(); + remainingCFs--; + } + // flush the system ones after all the rest are done, just in case flushing modifies any system state + // like CASSANDRA-5151. don't bother with progress tracking since system data is tiny. + flushes.clear(); + for (Keyspace keyspace : Keyspace.system()) + { + for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) + flushes.add(cfs.forceFlush()); + } + FBUtilities.waitOnFutures(flushes); - // count CFs first, since forceFlush could block for the flushWriter to get a queue slot empty - totalCFs = 0; - for (Keyspace keyspace : Keyspace.nonSystem()) - totalCFs += keyspace.getColumnFamilyStores().size(); - remainingCFs = totalCFs; - // flush - List<Future<?>> flushes = new ArrayList<>(); - for (Keyspace keyspace : Keyspace.nonSystem()) - { - for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) - flushes.add(cfs.forceFlush()); - } - // wait for the flushes. - // TODO this is a godawful way to track progress, since they flush in parallel. a long one could - // thus make several short ones "instant" if we wait for them later. - for (Future f : flushes) - { - FBUtilities.waitOnFuture(f); - remainingCFs--; - } - // flush the system ones after all the rest are done, just in case flushing modifies any system state - // like CASSANDRA-5151. don't bother with progress tracking since system data is tiny. - flushes.clear(); - for (Keyspace keyspace : Keyspace.system()) - { - for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) - flushes.add(cfs.forceFlush()); - } - FBUtilities.waitOnFutures(flushes); + HintsService.instance.shutdownBlocking(); - HintsService.instance.shutdownBlocking(); + // Interrupt ongoing compactions and shutdown CM to prevent further compactions. + CompactionManager.instance.forceShutdown(); - // Interrupt on going compaction and shutdown to prevent further compaction - CompactionManager.instance.forceShutdown(); + // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure + // there are no segments to replay, so we force the recycling of any remaining (should be at most one) + CommitLog.instance.forceRecycleAllSegments(); - // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure - // there are no segments to replay, so we force the recycling of any remaining (should be at most one) - CommitLog.instance.forceRecycleAllSegments(); + CommitLog.instance.shutdownBlocking(); - CommitLog.instance.shutdownBlocking(); + // wait for miscellaneous tasks like sstable and commitlog segment deletion + ScheduledExecutors.nonPeriodicTasks.shutdown(); + if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES)) + logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown"); - // wait for miscellaneous tasks like sstable and commitlog segment deletion - ScheduledExecutors.nonPeriodicTasks.shutdown(); - if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES)) - logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown"); + ColumnFamilyStore.shutdownPostFlushExecutor(); + setMode(Mode.DRAINED, !isFinalShutdown); + } + catch (Throwable t) + { + logger.error("Caught an exception while draining ", t); + } + } - ColumnFamilyStore.shutdownPostFlushExecutor(); + /** + * Some services are shutdown during draining and we should not attempt to start them again. + * + * @param service - the name of the service we are trying to start. + * @throws IllegalStateException - an exception that nodetool is able to convert into a message to display to the user + */ + synchronized void checkServiceAllowedToStart(String service) + { + if (isDraining()) // when draining isShutdown is also true, so we check first to return a more accurate message + throw new IllegalStateException(String.format("Unable to start %s because the node is draining.", service)); - setMode(Mode.DRAINED, true); + if (isShutdown()) // do not rely on operationMode in case it gets changed to decomissioned or other + throw new IllegalStateException(String.format("Unable to start %s because the node was drained.", service)); } // Never ever do this at home. Used by tests. @@ -4536,8 +4555,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } - public void enableAutoCompaction(String ks, String... tables) throws IOException + public synchronized void enableAutoCompaction(String ks, String... tables) throws IOException { + checkServiceAllowedToStart("auto compaction"); + for (ColumnFamilyStore cfs : getValidColumnFamilies(true, true, ks, tables)) { cfs.enableAutoCompaction(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5115c106/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index e111bc4..f6e4209 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -489,6 +489,8 @@ public interface StorageServiceMBean extends NotificationEmitter // allows a node that have been started without joining the ring to join it public void joinRing() throws IOException; public boolean isJoined(); + public boolean isDrained(); + public boolean isDraining(); public void setStreamThroughputMbPerSec(int value); public int getStreamThroughputMbPerSec(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5115c106/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 80fab15..6a95e99 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -580,6 +580,16 @@ public class NodeProbe implements AutoCloseable return ssProxy.isJoined(); } + public boolean isDrained() + { + return ssProxy.isDrained(); + } + + public boolean isDraining() + { + return ssProxy.isDraining(); + } + public void joinRing() throws IOException { ssProxy.joinRing(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5115c106/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java index cc0e806..c52e178 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java @@ -2320,7 +2320,7 @@ public class UFTest extends CQLTester '}'}, {"org.apache.cassandra.service.StorageService", "try {" + - " org.apache.cassandra.service.StorageService v = org.apache.cassandra.service.StorageService.instance; v.isInShutdownHook(); return 0d;" + + " org.apache.cassandra.service.StorageService v = org.apache.cassandra.service.StorageService.instance; v.isShutdown(); return 0d;" + "} catch (Exception t) {" + " throw new RuntimeException(t);" + '}'}, @@ -2360,7 +2360,7 @@ public class UFTest extends CQLTester "RETURNS NULL ON NULL INPUT " + "RETURNS double " + "LANGUAGE javascript\n" + - "AS 'org.apache.cassandra.service.StorageService.instance.isInShutdownHook(); 0;';"); + "AS 'org.apache.cassandra.service.StorageService.instance.isShutdown(); 0;';"); execute("SELECT " + fName + "(dval) FROM %s WHERE key=1"); Assert.fail("Javascript security check failed"); }