Author: brandonwilliams
Date: Fri Dec 10 17:35:14 2010
New Revision: 1044450

URL: http://svn.apache.org/viewvc?rev=1044450&view=rev
Log:
correct ordering of drain operations so CL.recover is no longer necessary.  
Patch by jbellis and brandonwilliams, reviewed by jbellis for CASSANDRA-1408

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1044450&r1=1044449&r2=1044450&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Fri Dec 10 17:35:14 2010
@@ -16,6 +16,8 @@
  * reduce fat client timeout (CASSANDRA-1730)
  * cleanup smallest CFs first to increase free temp space for larger ones
    (CASSANDRA-1811)
+ * correct ordering of drain operations so CL.recover is no longer necessary
+   (CASSANDRA-1408)
 
 
 0.6.8

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1044450&r1=1044449&r2=1044450&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 Fri Dec 10 17:35:14 2010
@@ -71,27 +71,27 @@ public class ColumnFamilyStore implement
      * For BinaryMemtable that's about all that happens.  For live Memtables 
there are two other things
      * that switchMemtable does (which should be the only caller of 
submitFlush in this case).
      * First, it puts the Memtable into memtablesPendingFlush, where it stays 
until the flush is complete
-     * and it's been added as an SSTableReader to ssTables_.  Second, it adds 
an entry to commitLogUpdater
+     * and it's been added as an SSTableReader to ssTables_.  Second, it adds 
an entry to postFlushExecutor
      * that waits for the flush to complete, then calls onMemtableFlush.  This 
allows multiple flushes
      * to happen simultaneously on multicore systems, while still calling onMF 
in the correct order,
      * which is necessary for replay in case of a restart since CommitLog 
assumes that when onMF is
      * called, all data up to the given context has been persisted to SSTables.
      */
-    private static ExecutorService flushSorter_
+    private static final ExecutorService flushSorter
             = new JMXEnabledThreadPoolExecutor(1,
                                                
Runtime.getRuntime().availableProcessors(),
                                                Integer.MAX_VALUE,
                                                TimeUnit.SECONDS,
                                                new 
LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors()),
                                                new 
NamedThreadFactory("FLUSH-SORTER-POOL"));
-    private static ExecutorService flushWriter_
+    private static final ExecutorService flushWriter
             = new JMXEnabledThreadPoolExecutor(1,
                                                
DatabaseDescriptor.getAllDataFileLocations().length,
                                                Integer.MAX_VALUE,
                                                TimeUnit.SECONDS,
                                                new 
LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getAllDataFileLocations().length),
                                                new 
NamedThreadFactory("FLUSH-WRITER-POOL"));
-    private static ExecutorService commitLogUpdater_ = new 
JMXEnabledThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
+    public static final ExecutorService postFlushExecutor = new 
JMXEnabledThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
 
     private static final int KEY_RANGE_FILE_BUFFER_SIZE = 256 * 1024;
 
@@ -480,7 +480,7 @@ public class ColumnFamilyStore implement
             memtable_ = new Memtable(this);
             // a second executor that makes sure the onMemtableFlushes get 
called in the right order,
             // while keeping the wait-for-flush (future.get) out of anything 
latency-sensitive.
-            return commitLogUpdater_.submit(new WrappedRunnable()
+            return postFlushExecutor.submit(new WrappedRunnable()
             {
                 public void runMayThrow() throws InterruptedException, 
IOException
                 {
@@ -747,7 +747,7 @@ public class ColumnFamilyStore implement
     {
         logger_.info("Enqueuing flush of " + flushable);
         final Condition condition = new SimpleCondition();
-        flushable.flushAndSignal(condition, flushSorter_, flushWriter_);
+        flushable.flushAndSignal(condition, flushSorter, flushWriter);
         return condition;
     }
 

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java?rev=1044450&r1=1044449&r2=1044450&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
 Fri Dec 10 17:35:14 2010
@@ -1606,6 +1606,10 @@ public class StorageService implements I
         MessagingService.shutdown();
         setMode("Draining: emptying MessageService pools", false);
         MessagingService.waitFor();
+
+        setMode("Draining: clearing mutation stage", false);
+        mutationStage.shutdown();
+        mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
        
         // lets flush.
         setMode("Draining: flushing column families", false);
@@ -1614,18 +1618,11 @@ public class StorageService implements I
                 f.get();
        
 
-        setMode("Draining: replaying commit log", false);
+        ColumnFamilyStore.postFlushExecutor.shutdown();
+        ColumnFamilyStore.postFlushExecutor.awaitTermination(60, 
TimeUnit.SECONDS);
         CommitLog.instance().forceNewSegment();
         // want to make sure that any segments deleted as a result of flushing 
are gone.
         DeletionService.waitFor();
-        CommitLog.recover();
-       
-        // commit log recovery just sends work to the mutation stage. (there 
could have already been work there anyway.  
-        // Either way, we need to let this one drain naturally, and then we're 
finished.
-        setMode("Draining: clearing mutation stage", false);
-        mutationStage.shutdown();
-        while (!mutationStage.isTerminated())
-            mutationStage.awaitTermination(5, TimeUnit.SECONDS);
        
         setMode("Node is drained", true);
     }


Reply via email to