Author: jbellis
Date: Mon Jul 25 18:41:30 2011
New Revision: 1150837

URL: http://svn.apache.org/viewvc?rev=1150837&view=rev
Log:
add ability to drop local reads/writes that are going to timeout
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-2943

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1150837&r1=1150836&r2=1150837&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon Jul 25 18:41:30 2011
@@ -1,3 +1,8 @@
+0.8.3
+ * add ability to drop local reads/writes that are going to timeout
+   (CASSANDRA-2943)
+
+
 0.8.2
  * CQL: 
    - include only one row per unique key for IN queries (CASSANDRA-2717)

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1150837&r1=1150836&r2=1150837&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
 Mon Jul 25 18:41:30 2011
@@ -349,7 +349,7 @@ public class StorageProxy implements Sto
     {
         if (logger.isDebugEnabled())
             logger.debug("insert writing local " + rm.toString(true));
-        Runnable runnable = new WrappedRunnable()
+        Runnable runnable = new DroppableRunnable(StorageService.Verb.MUTATION)
         {
             public void runMayThrow() throws IOException
             {
@@ -431,7 +431,7 @@ public class StorageProxy implements Sto
         if (logger.isDebugEnabled())
             logger.debug("insert writing local & replicate " + 
mutation.toString(true));
 
-        Runnable runnable = new WrappedRunnable()
+        Runnable runnable = new DroppableRunnable(StorageService.Verb.MUTATION)
         {
             public void runMayThrow() throws IOException
             {
@@ -447,7 +447,7 @@ public class StorageProxy implements Sto
                 {
                     // We do the replication on another stage because it 
involves a read (see CM.makeReplicationMutation)
                     // and we want to avoid blocking too much the MUTATION 
stage
-                    
StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new WrappedRunnable()
+                    
StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new 
DroppableRunnable(StorageService.Verb.READ)
                     {
                         public void runMayThrow() throws IOException
                         {
@@ -620,7 +620,7 @@ public class StorageProxy implements Sto
         return rows;
     }
 
-    static class LocalReadRunnable extends WrappedRunnable
+    static class LocalReadRunnable extends DroppableRunnable
     {
         private final ReadCommand command;
         private final ReadCallback<Row> handler;
@@ -628,6 +628,7 @@ public class StorageProxy implements Sto
 
         LocalReadRunnable(ReadCommand command, ReadCallback<Row> handler)
         {
+            super(StorageService.Verb.READ);
             this.command = command;
             this.handler = handler;
         }
@@ -1082,4 +1083,35 @@ public class StorageProxy implements Sto
     {
         public void apply(IMutation mutation, Multimap<InetAddress, 
InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String 
localDataCenter, ConsistencyLevel consistency_level) throws IOException;
     }
+
+    private static abstract class DroppableRunnable implements Runnable
+    {
+        private final long constructionTime = System.currentTimeMillis();
+        private final StorageService.Verb verb;
+
+        public DroppableRunnable(StorageService.Verb verb)
+        {
+            this.verb = verb;
+        }
+
+        public final void run()
+        {
+            if (System.currentTimeMillis() > constructionTime + 
DatabaseDescriptor.getRpcTimeout())
+            {
+                MessagingService.instance().incrementDroppedMessages(verb);
+                return;
+            }
+
+            try
+            {
+                runMayThrow();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        abstract protected void runMayThrow() throws Exception;
+    }
 }


Reply via email to