Author: jbellis Date: Mon Jul 25 18:41:30 2011 New Revision: 1150837 URL: http://svn.apache.org/viewvc?rev=1150837&view=rev Log: add ability to drop local reads/writes that are going to timeout patch by jbellis; reviewed by brandonwilliams for CASSANDRA-2943
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1150837&r1=1150836&r2=1150837&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon Jul 25 18:41:30 2011 @@ -1,3 +1,8 @@ +0.8.3 + * add ability to drop local reads/writes that are going to timeout + (CASSANDRA-2943) + + 0.8.2 * CQL: - include only one row per unique key for IN queries (CASSANDRA-2717) Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1150837&r1=1150836&r2=1150837&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java Mon Jul 25 18:41:30 2011 @@ -349,7 +349,7 @@ public class StorageProxy implements Sto { if (logger.isDebugEnabled()) logger.debug("insert writing local " + rm.toString(true)); - Runnable runnable = new WrappedRunnable() + Runnable runnable = new DroppableRunnable(StorageService.Verb.MUTATION) { public void runMayThrow() throws IOException { @@ -431,7 +431,7 @@ public class StorageProxy implements Sto if (logger.isDebugEnabled()) logger.debug("insert writing local & replicate " + mutation.toString(true)); - Runnable runnable = new WrappedRunnable() + Runnable runnable = new DroppableRunnable(StorageService.Verb.MUTATION) { public void runMayThrow() throws IOException { @@ -447,7 +447,7 @@ public class StorageProxy implements Sto { // We do the replication on another stage because it involves a read (see CM.makeReplicationMutation) // and we want to avoid blocking too much the MUTATION stage - StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new WrappedRunnable() + StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new DroppableRunnable(StorageService.Verb.READ) { public void runMayThrow() throws IOException { @@ -620,7 +620,7 @@ public class StorageProxy implements Sto return rows; } - static class LocalReadRunnable extends WrappedRunnable + static class LocalReadRunnable extends DroppableRunnable { private final ReadCommand command; private final ReadCallback<Row> handler; @@ -628,6 +628,7 @@ public class StorageProxy implements Sto LocalReadRunnable(ReadCommand command, ReadCallback<Row> handler) { + super(StorageService.Verb.READ); this.command = command; this.handler = handler; } @@ -1082,4 +1083,35 @@ public class StorageProxy implements Sto { public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException; } + + private static abstract class DroppableRunnable implements Runnable + { + private final long constructionTime = System.currentTimeMillis(); + private final StorageService.Verb verb; + + public DroppableRunnable(StorageService.Verb verb) + { + this.verb = verb; + } + + public final void run() + { + if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout()) + { + MessagingService.instance().incrementDroppedMessages(verb); + return; + } + + try + { + runMayThrow(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + abstract protected void runMayThrow() throws Exception; + } }