Avoids having replicate on write tasks stacking up at CL.ONE

patch by slebresne; reviewed by jbellis for CASSANDRA-2889


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2ca2fb3f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2ca2fb3f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2ca2fb3f

Branch: refs/heads/trunk
Commit: 2ca2fb3fdc1636e2d3d7feb446f66f6ed8043cf4
Parents: 2af8591
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Fri Apr 27 19:39:31 2012 +0200
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Fri Apr 27 19:39:31 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../apache/cassandra/concurrent/StageManager.java  |   14 +++++++++++++-
 2 files changed, 14 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ca2fb3f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 91a8fbe..918c146 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,7 @@
  * Move CfDef and KsDef validation out of thrift (CASSANDRA-4037)
  * Expose repairing by a user provided range (CASSANDRA-3912)
  * Add way to force the cassandra-cli to refresh it's schema (CASSANDRA-4052)
+ * Avoids having replicate on write tasks stacking up at CL.ONE 
(CASSANDRA-2889)
 Merged from 1.0:
  * Fix super columns bug where cache is not updated (CASSANDRA-4190)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ca2fb3f/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java 
b/src/java/org/apache/cassandra/concurrent/StageManager.java
index c57b593..4bcb75d 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -37,13 +37,15 @@ public class StageManager
 
     public static final long KEEPALIVE = 60; // seconds to keep "extra" 
threads alive for when idle
 
+    public static final int MAX_REPLICATE_ON_WRITE_TASKS = 1024 * 
Runtime.getRuntime().availableProcessors();
+
     static
     {
         stages.put(Stage.MUTATION, 
multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters()));
         stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, 
getConcurrentReaders()));
         stages.put(Stage.REQUEST_RESPONSE, 
multiThreadedStage(Stage.REQUEST_RESPONSE, 
Runtime.getRuntime().availableProcessors()));
         stages.put(Stage.INTERNAL_RESPONSE, 
multiThreadedStage(Stage.INTERNAL_RESPONSE, 
Runtime.getRuntime().availableProcessors()));
-        stages.put(Stage.REPLICATE_ON_WRITE, 
multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE, 
getConcurrentReplicators()));
+        stages.put(Stage.REPLICATE_ON_WRITE, 
multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE, 
getConcurrentReplicators(), MAX_REPLICATE_ON_WRITE_TASKS));
         // the rest are all single-threaded
         stages.put(Stage.STREAM, new 
JMXEnabledThreadPoolExecutor(Stage.STREAM));
         stages.put(Stage.GOSSIP, new 
JMXEnabledThreadPoolExecutor(Stage.GOSSIP));
@@ -73,6 +75,16 @@ public class StageManager
                                                      stage.getJmxType());
     }
 
+    private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage 
stage, int numThreads, int maxTasksBeforeBlock)
+    {
+        return new JMXConfigurableThreadPoolExecutor(numThreads,
+                                                     KEEPALIVE,
+                                                     TimeUnit.SECONDS,
+                                                     new 
LinkedBlockingQueue<Runnable>(maxTasksBeforeBlock),
+                                                     new 
NamedThreadFactory(stage.getJmxName()),
+                                                     stage.getJmxType());
+    }
+
     /**
      * Retrieve a stage from the StageManager
      * @param stage name of the stage to be retrieved.

Reply via email to