Author: jbellis Date: Fri Aug 26 03:54:32 2011 New Revision: 1161983 URL: http://svn.apache.org/viewvc?rev=1161983&view=rev Log: Add timeouts to client request schedulers patch by Stu Hood; reviewed by Melvin Wang for CASSANDRA-3079
Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1161983&r1=1161982&r2=1161983&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Fri Aug 26 03:54:32 2011 @@ -42,6 +42,7 @@ * Add "install" command to cassandra.bat (CASSANDRA-292) * clean up KSMetadata, CFMetadata from unnecessary Thrift<->Avro conversion methods (CASSANDRA-3032) + * Add timeouts to client request schedulers (CASSANDRA-3079) 0.8.5 Modified: cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java?rev=1161983&r1=1161982&r2=1161983&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java Fri Aug 26 03:54:32 2011 @@ -20,6 +20,8 @@ package org.apache.cassandra.scheduler; * */ +import java.util.concurrent.TimeoutException; + /** * Implementors of IRequestScheduler must provide a constructor taking a RequestSchedulerOptions object. */ @@ -30,8 +32,9 @@ public interface IRequestScheduler * * @param t Thread handing the request * @param id Scheduling parameter, an id to distinguish profiles (users/keyspace) + * @param timeout The max time in milliseconds to spend blocking for a slot */ - public void queue(Thread t, String id); + public void queue(Thread t, String id, long timeoutMS) throws TimeoutException; /** * A convenience method for indicating when a particular request has completed Modified: cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java?rev=1161983&r1=1161982&r2=1161983&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java Fri Aug 26 03:54:32 2011 @@ -34,7 +34,7 @@ public class NoScheduler implements IReq public NoScheduler() {} - public void queue(Thread t, String id) {} + public void queue(Thread t, String id, long timeoutMS) {} public void release() {} } Modified: cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java?rev=1161983&r1=1161982&r2=1161983&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java Fri Aug 26 03:54:32 2011 @@ -23,6 +23,7 @@ package org.apache.cassandra.scheduler; import java.util.Map; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,14 +79,14 @@ public class RoundRobinScheduler impleme logger.info("Started the RoundRobin Request Scheduler"); } - public void queue(Thread t, String id) + public void queue(Thread t, String id, long timeoutMS) throws TimeoutException { WeightedQueue weightedQueue = getWeightedQueue(id); try { queueSize.release(); - weightedQueue.put(t); + weightedQueue.put(t, timeoutMS); } catch (InterruptedException e) { Modified: cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java?rev=1161983&r1=1161982&r2=1161983&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java Fri Aug 26 03:54:32 2011 @@ -22,6 +22,8 @@ package org.apache.cassandra.scheduler; */ import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; import java.lang.management.ManagementFactory; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -56,9 +58,9 @@ class WeightedQueue implements WeightedQ } } - public void put(Thread t) throws InterruptedException + public void put(Thread t, long timeoutMS) throws InterruptedException, TimeoutException { - queue.put(new WeightedQueue.Entry(t)); + queue.offer(new WeightedQueue.Entry(t), timeoutMS, TimeUnit.MILLISECONDS); } public Thread poll() Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1161983&r1=1161982&r2=1161983&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Fri Aug 26 03:54:32 2011 @@ -122,9 +122,9 @@ public class CassandraServer implements List<Row> rows; try { + schedule(DatabaseDescriptor.getRpcTimeout()); try { - schedule(); rows = StorageProxy.read(commands, consistency_level); } finally @@ -625,23 +625,23 @@ public class CassandraServer implements private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations) throws UnavailableException, TimedOutException, InvalidRequestException { ThriftValidation.validateConsistencyLevel(state().getKeyspace(), consistency_level); + if (mutations.isEmpty()) + return; try { - schedule(); - + schedule(DatabaseDescriptor.getRpcTimeout()); try { - if (!mutations.isEmpty()) - StorageProxy.mutate(mutations, consistency_level); + StorageProxy.mutate(mutations, consistency_level); } - catch (TimeoutException e) + finally { - throw new TimedOutException(); + release(); } } - finally + catch (TimeoutException e) { - release(); + throw new TimedOutException(); } } @@ -686,9 +686,9 @@ public class CassandraServer implements { bounds = new Bounds(p.getToken(range.start_key), p.getToken(range.end_key)); } + schedule(DatabaseDescriptor.getRpcTimeout()); try { - schedule(); rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, bounds, range.count), consistency_level); } finally @@ -829,9 +829,9 @@ public class CassandraServer implements /** * Schedule the current thread for access to the required services */ - private void schedule() + private void schedule(long timeoutMS) throws TimeoutException { - requestScheduler.queue(Thread.currentThread(), state().getSchedulingValue()); + requestScheduler.queue(Thread.currentThread(), state().getSchedulingValue(), timeoutMS); } /** @@ -1085,8 +1085,15 @@ public class CassandraServer implements state().hasColumnFamilyAccess(cfname, Permission.WRITE); try { - schedule(); - StorageProxy.truncateBlocking(state().getKeyspace(), cfname); + schedule(DatabaseDescriptor.getRpcTimeout()); + try + { + StorageProxy.truncateBlocking(state().getKeyspace(), cfname); + } + finally + { + release(); + } } catch (TimeoutException e) { @@ -1096,10 +1103,6 @@ public class CassandraServer implements { throw (UnavailableException) new UnavailableException().initCause(e); } - finally - { - release(); - } } public void set_keyspace(String keyspace) throws InvalidRequestException, TException