Author: gdusbabek Date: Thu May 6 21:14:59 2010 New Revision: 941933 URL: http://svn.apache.org/viewvc?rev=941933&view=rev Log: make concurrent_reads, concurrent_writes configurable at runtime via JMX. Patch by Roger Schildmeijer, reviewed by Gary Dusbabek. CASSANDRA-1060
Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java Modified: cassandra/branches/cassandra-0.6/CHANGES.txt cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java Modified: cassandra/branches/cassandra-0.6/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=941933&r1=941932&r2=941933&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu May 6 21:14:59 2010 @@ -7,6 +7,8 @@ as a tiebreaker (CASSANDRA-1039) * Add option to turn off Hinted Handoff (CASSANDRA-894) * fix windows startup (CASSANDRA-948) + * make concurrent_reads, concurrent_writes configurable at runtime via JMX + (CASSANDRA-1060) 0.6.1 Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java?rev=941933&view=auto ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java (added) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java Thu May 6 21:14:59 2010 @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.concurrent; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +public class JMXConfigurableThreadPoolExecutor extends JMXEnabledThreadPoolExecutor implements JMXConfigurableThreadPoolExecutorMBean +{ + + public JMXConfigurableThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue<Runnable> workQueue, + NamedThreadFactory threadFactory) + { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + +} \ No newline at end of file Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java?rev=941933&view=auto ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java (added) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java Thu May 6 21:14:59 2010 @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.concurrent; + +public interface JMXConfigurableThreadPoolExecutorMBean extends JMXEnabledThreadPoolExecutorMBean +{ + + void setCorePoolSize(int n); + + int getCorePoolSize(); + +} \ No newline at end of file Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=941933&r1=941932&r2=941933&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java Thu May 6 21:14:59 2010 @@ -50,8 +50,8 @@ public class StageManager static { - stages.put(MUTATION_STAGE, multiThreadedStage(MUTATION_STAGE, getConcurrentWriters())); - stages.put(READ_STAGE, multiThreadedStage(READ_STAGE, getConcurrentReaders())); + stages.put(MUTATION_STAGE, multiThreadedConfigurableStage(MUTATION_STAGE, getConcurrentWriters())); + stages.put(READ_STAGE, multiThreadedConfigurableStage(READ_STAGE, getConcurrentReaders())); stages.put(RESPONSE_STAGE, multiThreadedStage("RESPONSE-STAGE", Math.max(2, Runtime.getRuntime().availableProcessors()))); // the rest are all single-threaded stages.put(STREAM_STAGE, new JMXEnabledThreadPoolExecutor(STREAM_STAGE)); @@ -73,6 +73,18 @@ public class StageManager new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getStageQueueSize()), new NamedThreadFactory(name)); } + + private static ThreadPoolExecutor multiThreadedConfigurableStage(String name, int numThreads) + { + assert numThreads > 1 : "multi-threaded stages must have at least 2 threads"; + + return new JMXConfigurableThreadPoolExecutor(numThreads, + numThreads, + Integer.MAX_VALUE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getStageQueueSize()), + new NamedThreadFactory(name)); + } /** * Retrieve a stage from the StageManager