Author: jbellis
Date: Fri Aug 26 03:54:32 2011
New Revision: 1161983

URL: http://svn.apache.org/viewvc?rev=1161983&view=rev
Log:
Add timeouts to client request schedulers
patch by Stu Hood; reviewed by Melvin Wang for CASSANDRA-3079

Modified:
    cassandra/trunk/CHANGES.txt
    
cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java
    cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java
    
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
    cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1161983&r1=1161982&r2=1161983&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Aug 26 03:54:32 2011
@@ -42,6 +42,7 @@
  * Add "install" command to cassandra.bat (CASSANDRA-292)
  * clean up KSMetadata, CFMetadata from unnecessary
    Thrift<->Avro conversion methods (CASSANDRA-3032)
+ * Add timeouts to client request schedulers (CASSANDRA-3079)
 
 
 0.8.5

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java?rev=1161983&r1=1161982&r2=1161983&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java 
Fri Aug 26 03:54:32 2011
@@ -20,6 +20,8 @@ package org.apache.cassandra.scheduler;
  * 
  */
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * Implementors of IRequestScheduler must provide a constructor taking a 
RequestSchedulerOptions object.
  */
@@ -30,8 +32,9 @@ public interface IRequestScheduler
      * 
      * @param t Thread handing the request
      * @param id    Scheduling parameter, an id to distinguish profiles 
(users/keyspace)
+     * @param timeout   The max time in milliseconds to spend blocking for a 
slot
      */
-    public void queue(Thread t, String id);
+    public void queue(Thread t, String id, long timeoutMS) throws 
TimeoutException;
 
     /**
      * A convenience method for indicating when a particular request has 
completed

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java?rev=1161983&r1=1161982&r2=1161983&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java 
Fri Aug 26 03:54:32 2011
@@ -34,7 +34,7 @@ public class NoScheduler implements IReq
 
     public NoScheduler() {}
 
-    public void queue(Thread t, String id) {}
+    public void queue(Thread t, String id, long timeoutMS) {}
 
     public void release() {}
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java?rev=1161983&r1=1161982&r2=1161983&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
 Fri Aug 26 03:54:32 2011
@@ -23,6 +23,7 @@ package org.apache.cassandra.scheduler;
 
 import java.util.Map;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeoutException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,14 +79,14 @@ public class RoundRobinScheduler impleme
         logger.info("Started the RoundRobin Request Scheduler");
     }
 
-    public void queue(Thread t, String id)
+    public void queue(Thread t, String id, long timeoutMS) throws 
TimeoutException
     {
         WeightedQueue weightedQueue = getWeightedQueue(id);
 
         try
         {
             queueSize.release();
-            weightedQueue.put(t);
+            weightedQueue.put(t, timeoutMS);
         }
         catch (InterruptedException e)
         {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java?rev=1161983&r1=1161982&r2=1161983&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java 
Fri Aug 26 03:54:32 2011
@@ -22,6 +22,8 @@ package org.apache.cassandra.scheduler;
  */
 
 import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
 import java.lang.management.ManagementFactory;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -56,9 +58,9 @@ class WeightedQueue implements WeightedQ
         }
     }
 
-    public void put(Thread t) throws InterruptedException
+    public void put(Thread t, long timeoutMS) throws InterruptedException, 
TimeoutException
     {
-        queue.put(new WeightedQueue.Entry(t));
+        queue.offer(new WeightedQueue.Entry(t), timeoutMS, 
TimeUnit.MILLISECONDS);
     }
 
     public Thread poll()

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1161983&r1=1161982&r2=1161983&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java 
Fri Aug 26 03:54:32 2011
@@ -122,9 +122,9 @@ public class CassandraServer implements 
         List<Row> rows;
         try
         {
+            schedule(DatabaseDescriptor.getRpcTimeout());
             try
             {
-                schedule();
                 rows = StorageProxy.read(commands, consistency_level);
             }
             finally
@@ -625,23 +625,23 @@ public class CassandraServer implements 
     private void doInsert(ConsistencyLevel consistency_level, List<? extends 
IMutation> mutations) throws UnavailableException, TimedOutException, 
InvalidRequestException
     {
         ThriftValidation.validateConsistencyLevel(state().getKeyspace(), 
consistency_level);
+        if (mutations.isEmpty())
+            return;
         try
         {
-            schedule();
-
+            schedule(DatabaseDescriptor.getRpcTimeout());
             try
             {
-                if (!mutations.isEmpty())
-                    StorageProxy.mutate(mutations, consistency_level);
+                StorageProxy.mutate(mutations, consistency_level);
             }
-            catch (TimeoutException e)
+            finally
             {
-                throw new TimedOutException();
+                release();
             }
         }
-        finally
+        catch (TimeoutException e)
         {
-            release();
+            throw new TimedOutException();
         }
     }
 
@@ -686,9 +686,9 @@ public class CassandraServer implements 
             {
                 bounds = new Bounds(p.getToken(range.start_key), 
p.getToken(range.end_key));
             }
+            schedule(DatabaseDescriptor.getRpcTimeout());
             try
             {
-                schedule();
                 rows = StorageProxy.getRangeSlice(new 
RangeSliceCommand(keyspace, column_parent, predicate, bounds, range.count), 
consistency_level);
             }
             finally
@@ -829,9 +829,9 @@ public class CassandraServer implements 
     /**
      * Schedule the current thread for access to the required services
      */
-    private void schedule()
+    private void schedule(long timeoutMS) throws TimeoutException
     {
-        requestScheduler.queue(Thread.currentThread(), 
state().getSchedulingValue());
+        requestScheduler.queue(Thread.currentThread(), 
state().getSchedulingValue(), timeoutMS);
     }
 
     /**
@@ -1085,8 +1085,15 @@ public class CassandraServer implements 
         state().hasColumnFamilyAccess(cfname, Permission.WRITE);
         try
         {
-            schedule();
-            StorageProxy.truncateBlocking(state().getKeyspace(), cfname);
+            schedule(DatabaseDescriptor.getRpcTimeout());
+            try
+            {
+                StorageProxy.truncateBlocking(state().getKeyspace(), cfname);
+            }
+            finally
+            {
+                release();
+            }
         }
         catch (TimeoutException e)
         {
@@ -1096,10 +1103,6 @@ public class CassandraServer implements 
         {
             throw (UnavailableException) new 
UnavailableException().initCause(e);
         }
-        finally
-        {
-            release();
-        }
     }
 
     public void set_keyspace(String keyspace) throws InvalidRequestException, 
TException


Reply via email to