git commit: Avoids having replicate on write tasks stacking up at CL.ONE
Updated Branches: refs/heads/cassandra-1.1 2af8591bd - 2ca2fb3fd Avoids having replicate on write tasks stacking up at CL.ONE patch by slebresne; reviewed by jbellis for CASSANDRA-2889 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2ca2fb3f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2ca2fb3f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2ca2fb3f Branch: refs/heads/cassandra-1.1 Commit: 2ca2fb3fdc1636e2d3d7feb446f66f6ed8043cf4 Parents: 2af8591 Author: Sylvain Lebresne sylv...@datastax.com Authored: Fri Apr 27 19:39:31 2012 +0200 Committer: Sylvain Lebresne sylv...@datastax.com Committed: Fri Apr 27 19:39:31 2012 +0200 -- CHANGES.txt|1 + .../apache/cassandra/concurrent/StageManager.java | 14 +- 2 files changed, 14 insertions(+), 1 deletions(-) -- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ca2fb3f/CHANGES.txt -- diff --git a/CHANGES.txt b/CHANGES.txt index 91a8fbe..918c146 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -19,6 +19,7 @@ * Move CfDef and KsDef validation out of thrift (CASSANDRA-4037) * Expose repairing by a user provided range (CASSANDRA-3912) * Add way to force the cassandra-cli to refresh it's schema (CASSANDRA-4052) + * Avoids having replicate on write tasks stacking up at CL.ONE (CASSANDRA-2889) Merged from 1.0: * Fix super columns bug where cache is not updated (CASSANDRA-4190) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ca2fb3f/src/java/org/apache/cassandra/concurrent/StageManager.java -- diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java index c57b593..4bcb75d 100644 --- a/src/java/org/apache/cassandra/concurrent/StageManager.java +++ b/src/java/org/apache/cassandra/concurrent/StageManager.java @@ -37,13 +37,15 @@ public class StageManager public static final long KEEPALIVE = 60; // seconds to keep extra threads alive for when idle +public static final int MAX_REPLICATE_ON_WRITE_TASKS = 1024 * Runtime.getRuntime().availableProcessors(); + static { stages.put(Stage.MUTATION, multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters())); stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, getConcurrentReaders())); stages.put(Stage.REQUEST_RESPONSE, multiThreadedStage(Stage.REQUEST_RESPONSE, Runtime.getRuntime().availableProcessors())); stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, Runtime.getRuntime().availableProcessors())); -stages.put(Stage.REPLICATE_ON_WRITE, multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE, getConcurrentReplicators())); +stages.put(Stage.REPLICATE_ON_WRITE, multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE, getConcurrentReplicators(), MAX_REPLICATE_ON_WRITE_TASKS)); // the rest are all single-threaded stages.put(Stage.STREAM, new JMXEnabledThreadPoolExecutor(Stage.STREAM)); stages.put(Stage.GOSSIP, new JMXEnabledThreadPoolExecutor(Stage.GOSSIP)); @@ -73,6 +75,16 @@ public class StageManager stage.getJmxType()); } +private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock) +{ +return new JMXConfigurableThreadPoolExecutor(numThreads, + KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueueRunnable(maxTasksBeforeBlock), + new NamedThreadFactory(stage.getJmxName()), + stage.getJmxType()); +} + /** * Retrieve a stage from the StageManager * @param stage name of the stage to be retrieved.
[4/6] git commit: Avoids having replicate on write tasks stacking up at CL.ONE
Avoids having replicate on write tasks stacking up at CL.ONE patch by slebresne; reviewed by jbellis for CASSANDRA-2889 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2ca2fb3f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2ca2fb3f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2ca2fb3f Branch: refs/heads/trunk Commit: 2ca2fb3fdc1636e2d3d7feb446f66f6ed8043cf4 Parents: 2af8591 Author: Sylvain Lebresne sylv...@datastax.com Authored: Fri Apr 27 19:39:31 2012 +0200 Committer: Sylvain Lebresne sylv...@datastax.com Committed: Fri Apr 27 19:39:31 2012 +0200 -- CHANGES.txt|1 + .../apache/cassandra/concurrent/StageManager.java | 14 +- 2 files changed, 14 insertions(+), 1 deletions(-) -- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ca2fb3f/CHANGES.txt -- diff --git a/CHANGES.txt b/CHANGES.txt index 91a8fbe..918c146 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -19,6 +19,7 @@ * Move CfDef and KsDef validation out of thrift (CASSANDRA-4037) * Expose repairing by a user provided range (CASSANDRA-3912) * Add way to force the cassandra-cli to refresh it's schema (CASSANDRA-4052) + * Avoids having replicate on write tasks stacking up at CL.ONE (CASSANDRA-2889) Merged from 1.0: * Fix super columns bug where cache is not updated (CASSANDRA-4190) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ca2fb3f/src/java/org/apache/cassandra/concurrent/StageManager.java -- diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java index c57b593..4bcb75d 100644 --- a/src/java/org/apache/cassandra/concurrent/StageManager.java +++ b/src/java/org/apache/cassandra/concurrent/StageManager.java @@ -37,13 +37,15 @@ public class StageManager public static final long KEEPALIVE = 60; // seconds to keep extra threads alive for when idle +public static final int MAX_REPLICATE_ON_WRITE_TASKS = 1024 * Runtime.getRuntime().availableProcessors(); + static { stages.put(Stage.MUTATION, multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters())); stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, getConcurrentReaders())); stages.put(Stage.REQUEST_RESPONSE, multiThreadedStage(Stage.REQUEST_RESPONSE, Runtime.getRuntime().availableProcessors())); stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, Runtime.getRuntime().availableProcessors())); -stages.put(Stage.REPLICATE_ON_WRITE, multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE, getConcurrentReplicators())); +stages.put(Stage.REPLICATE_ON_WRITE, multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE, getConcurrentReplicators(), MAX_REPLICATE_ON_WRITE_TASKS)); // the rest are all single-threaded stages.put(Stage.STREAM, new JMXEnabledThreadPoolExecutor(Stage.STREAM)); stages.put(Stage.GOSSIP, new JMXEnabledThreadPoolExecutor(Stage.GOSSIP)); @@ -73,6 +75,16 @@ public class StageManager stage.getJmxType()); } +private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock) +{ +return new JMXConfigurableThreadPoolExecutor(numThreads, + KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueueRunnable(maxTasksBeforeBlock), + new NamedThreadFactory(stage.getJmxName()), + stage.getJmxType()); +} + /** * Retrieve a stage from the StageManager * @param stage name of the stage to be retrieved.