Author: jbellis
Date: Tue Sep 7 16:27:19 2010
New Revision: 993417
URL: http://svn.apache.org/viewvc?rev=993417&view=rev
Log:
correct ordering of drain operations so CL.recover is no longer necessary.
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1408
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=993417&r1=993416&r2=993417&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Sep 7 16:27:19 2010
@@ -59,6 +59,8 @@ dev
* apply reversed flag during collation from different data sources
(CASSANDRA-1450)
* make failure to remove comitlog segment non-fatal (CASSANDRA-1348)
+ * correct ordering of drain operations so CL.recover is no longer
+ necessary (CASSANDRA-1408)
0.7-beta1
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=993417&r1=993416&r2=993417&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue
Sep 7 16:27:19 2010
@@ -88,21 +88,21 @@ public class ColumnFamilyStore implement
* which is necessary for replay in case of a restart since CommitLog
assumes that when onMF is
* called, all data up to the given context has been persisted to SSTables.
*/
- private static ExecutorService flushSorter
+ private static final ExecutorService flushSorter
= new JMXEnabledThreadPoolExecutor(1,
Runtime.getRuntime().availableProcessors(),
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
new
LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors()),
new
NamedThreadFactory("FLUSH-SORTER-POOL"));
- private static ExecutorService flushWriter
+ private static final ExecutorService flushWriter
= new JMXEnabledThreadPoolExecutor(1,
DatabaseDescriptor.getFlushWriters(),
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
new
LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushWriters()),
new
NamedThreadFactory("FLUSH-WRITER-POOL"));
- private static ExecutorService postFlushExecutor = new
JMXEnabledThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
+ public static final ExecutorService postFlushExecutor = new
JMXEnabledThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
private Set<Memtable> memtablesPendingFlush = new
ConcurrentSkipListSet<Memtable>();
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=993417&r1=993416&r2=993417&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Tue Sep 7 16:27:19 2010
@@ -1669,25 +1669,22 @@ public class StorageService implements I
setMode("Draining: emptying MessageService pools", false);
MessagingService.waitFor();
+ setMode("Draining: clearing mutation stage", false);
+ mutationStage.shutdown();
+ mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
+
// lets flush.
setMode("Draining: flushing column families", false);
for (String tableName : DatabaseDescriptor.getNonSystemTables())
for (Future f : Table.open(tableName).flush())
f.get();
-
- setMode("Draining: replaying commit log", false);
+ ColumnFamilyStore.postFlushExecutor.shutdown();
+ ColumnFamilyStore.postFlushExecutor.awaitTermination(60,
TimeUnit.SECONDS);
+
// want to make sure that any segments deleted as a result of flushing
are gone.
DeletionService.waitFor();
- CommitLog.recover();
-
- // commit log recovery just sends work to the mutation stage. (there
could have already been work there anyway.
- // Either way, we need to let this one drain naturally, and then we're
finished.
- setMode("Draining: clearing mutation stage", false);
- mutationStage.shutdown();
- while (!mutationStage.isTerminated())
- mutationStage.awaitTermination(5, TimeUnit.SECONDS);
-
+
setMode("Node is drained", true);
}