Author: jbellis
Date: Tue Sep  7 16:27:19 2010
New Revision: 993417

URL: http://svn.apache.org/viewvc?rev=993417&view=rev
Log:
correct ordering of drain operations so CL.recover is no longer necessary.
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1408

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=993417&r1=993416&r2=993417&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Sep  7 16:27:19 2010
@@ -59,6 +59,8 @@ dev
  * apply reversed flag during collation from different data sources
    (CASSANDRA-1450)
  * make failure to remove comitlog segment non-fatal (CASSANDRA-1348)
+ * correct ordering of drain operations so CL.recover is no longer 
+   necessary (CASSANDRA-1408)
 
 
 0.7-beta1

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=993417&r1=993416&r2=993417&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue 
Sep  7 16:27:19 2010
@@ -88,21 +88,21 @@ public class ColumnFamilyStore implement
      * which is necessary for replay in case of a restart since CommitLog 
assumes that when onMF is
      * called, all data up to the given context has been persisted to SSTables.
      */
-    private static ExecutorService flushSorter
+    private static final ExecutorService flushSorter
             = new JMXEnabledThreadPoolExecutor(1,
                                                
Runtime.getRuntime().availableProcessors(),
                                                StageManager.KEEPALIVE,
                                                TimeUnit.SECONDS,
                                                new 
LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors()),
                                                new 
NamedThreadFactory("FLUSH-SORTER-POOL"));
-    private static ExecutorService flushWriter
+    private static final ExecutorService flushWriter
             = new JMXEnabledThreadPoolExecutor(1,
                                                
DatabaseDescriptor.getFlushWriters(),
                                                StageManager.KEEPALIVE,
                                                TimeUnit.SECONDS,
                                                new 
LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushWriters()),
                                                new 
NamedThreadFactory("FLUSH-WRITER-POOL"));
-    private static ExecutorService postFlushExecutor = new 
JMXEnabledThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
+    public static final ExecutorService postFlushExecutor = new 
JMXEnabledThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
     
     private Set<Memtable> memtablesPendingFlush = new 
ConcurrentSkipListSet<Memtable>();
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=993417&r1=993416&r2=993417&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
Tue Sep  7 16:27:19 2010
@@ -1669,25 +1669,22 @@ public class StorageService implements I
         setMode("Draining: emptying MessageService pools", false);
         MessagingService.waitFor();
        
+        setMode("Draining: clearing mutation stage", false);
+        mutationStage.shutdown();
+        mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
+
         // lets flush.
         setMode("Draining: flushing column families", false);
         for (String tableName : DatabaseDescriptor.getNonSystemTables())
             for (Future f : Table.open(tableName).flush())
                 f.get();
-       
 
-        setMode("Draining: replaying commit log", false);
+        ColumnFamilyStore.postFlushExecutor.shutdown();
+        ColumnFamilyStore.postFlushExecutor.awaitTermination(60, 
TimeUnit.SECONDS);
+       
         // want to make sure that any segments deleted as a result of flushing 
are gone.
         DeletionService.waitFor();
-        CommitLog.recover();
-       
-        // commit log recovery just sends work to the mutation stage. (there 
could have already been work there anyway.  
-        // Either way, we need to let this one drain naturally, and then we're 
finished.
-        setMode("Draining: clearing mutation stage", false);
-        mutationStage.shutdown();
-        while (!mutationStage.isTerminated())
-            mutationStage.awaitTermination(5, TimeUnit.SECONDS);
-       
+
         setMode("Node is drained", true);
     }
 


Reply via email to