Author: brandonwilliams Date: Fri Dec 10 17:35:14 2010 New Revision: 1044450
URL: http://svn.apache.org/viewvc?rev=1044450&view=rev Log: correct ordering of drain operations so CL.recover is no longer necessary. Patch by jbellis and brandonwilliams, reviewed by jbellis for CASSANDRA-1408 Modified: cassandra/branches/cassandra-0.6/CHANGES.txt cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java Modified: cassandra/branches/cassandra-0.6/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1044450&r1=1044449&r2=1044450&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.6/CHANGES.txt Fri Dec 10 17:35:14 2010 @@ -16,6 +16,8 @@ * reduce fat client timeout (CASSANDRA-1730) * cleanup smallest CFs first to increase free temp space for larger ones (CASSANDRA-1811) + * correct ordering of drain operations so CL.recover is no longer necessary + (CASSANDRA-1408) 0.6.8 Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1044450&r1=1044449&r2=1044450&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Dec 10 17:35:14 2010 @@ -71,27 +71,27 @@ public class ColumnFamilyStore implement * For BinaryMemtable that's about all that happens. For live Memtables there are two other things * that switchMemtable does (which should be the only caller of submitFlush in this case). * First, it puts the Memtable into memtablesPendingFlush, where it stays until the flush is complete - * and it's been added as an SSTableReader to ssTables_. Second, it adds an entry to commitLogUpdater + * and it's been added as an SSTableReader to ssTables_. Second, it adds an entry to postFlushExecutor * that waits for the flush to complete, then calls onMemtableFlush. This allows multiple flushes * to happen simultaneously on multicore systems, while still calling onMF in the correct order, * which is necessary for replay in case of a restart since CommitLog assumes that when onMF is * called, all data up to the given context has been persisted to SSTables. */ - private static ExecutorService flushSorter_ + private static final ExecutorService flushSorter = new JMXEnabledThreadPoolExecutor(1, Runtime.getRuntime().availableProcessors(), Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors()), new NamedThreadFactory("FLUSH-SORTER-POOL")); - private static ExecutorService flushWriter_ + private static final ExecutorService flushWriter = new JMXEnabledThreadPoolExecutor(1, DatabaseDescriptor.getAllDataFileLocations().length, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getAllDataFileLocations().length), new NamedThreadFactory("FLUSH-WRITER-POOL")); - private static ExecutorService commitLogUpdater_ = new JMXEnabledThreadPoolExecutor("MEMTABLE-POST-FLUSHER"); + public static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor("MEMTABLE-POST-FLUSHER"); private static final int KEY_RANGE_FILE_BUFFER_SIZE = 256 * 1024; @@ -480,7 +480,7 @@ public class ColumnFamilyStore implement memtable_ = new Memtable(this); // a second executor that makes sure the onMemtableFlushes get called in the right order, // while keeping the wait-for-flush (future.get) out of anything latency-sensitive. - return commitLogUpdater_.submit(new WrappedRunnable() + return postFlushExecutor.submit(new WrappedRunnable() { public void runMayThrow() throws InterruptedException, IOException { @@ -747,7 +747,7 @@ public class ColumnFamilyStore implement { logger_.info("Enqueuing flush of " + flushable); final Condition condition = new SimpleCondition(); - flushable.flushAndSignal(condition, flushSorter_, flushWriter_); + flushable.flushAndSignal(condition, flushSorter, flushWriter); return condition; } Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java?rev=1044450&r1=1044449&r2=1044450&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java Fri Dec 10 17:35:14 2010 @@ -1606,6 +1606,10 @@ public class StorageService implements I MessagingService.shutdown(); setMode("Draining: emptying MessageService pools", false); MessagingService.waitFor(); + + setMode("Draining: clearing mutation stage", false); + mutationStage.shutdown(); + mutationStage.awaitTermination(3600, TimeUnit.SECONDS); // lets flush. setMode("Draining: flushing column families", false); @@ -1614,18 +1618,11 @@ public class StorageService implements I f.get(); - setMode("Draining: replaying commit log", false); + ColumnFamilyStore.postFlushExecutor.shutdown(); + ColumnFamilyStore.postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS); CommitLog.instance().forceNewSegment(); // want to make sure that any segments deleted as a result of flushing are gone. DeletionService.waitFor(); - CommitLog.recover(); - - // commit log recovery just sends work to the mutation stage. (there could have already been work there anyway. - // Either way, we need to let this one drain naturally, and then we're finished. - setMode("Draining: clearing mutation stage", false); - mutationStage.shutdown(); - while (!mutationStage.isTerminated()) - mutationStage.awaitTermination(5, TimeUnit.SECONDS); setMode("Node is drained", true); }