Updated Branches: refs/heads/trunk a57981650 -> 53f7c328a
fix merge from #6132 patch by jbellis; reviewed by brandonwilliams for CASSANDRA-6154 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/53f7c328 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/53f7c328 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/53f7c328 Branch: refs/heads/trunk Commit: 53f7c328ae4aca0affaed4f0c73678011a4e152a Parents: a579816 Author: Jonathan Ellis <jbel...@apache.org> Authored: Mon Oct 7 16:47:41 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Mon Oct 7 16:47:47 2013 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/net/MessagingService.java | 15 ++++++++++----- .../org/apache/cassandra/service/StorageProxy.java | 10 ++++------ 2 files changed, 14 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f7c328/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 1776361..ff8a2c7 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -563,14 +563,16 @@ public final class MessagingService implements MessagingServiceMBean return idGen.incrementAndGet(); } - /* - * @see #sendRR(Message message, InetAddress to, IAsyncCallback cb, long timeout) - */ public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb) { return sendRR(message, to, cb, message.getTimeout()); } + public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout) + { + return sendRR(message, to, cb, timeout, null); + } + /** * Send a message to a given endpoint. This method specifies a callback * which is invoked with the actual response. @@ -584,11 +586,14 @@ public final class MessagingService implements MessagingServiceMBean * suggest that a timeout occurred to the invoker of the send(). * suggest that a timeout occurred to the invoker of the send(). * @param timeout the timeout used for expiration + * @param consistencyLevel the consistency level, for mutations; must be null otherwise * @return an reference to message id used to match with the result */ - public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout) + public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout, ConsistencyLevel consistencyLevel) { - int id = addCallback(cb, message, to, timeout); + int id = consistencyLevel == null + ? addCallback(cb, message, to, timeout) + : addCallback(cb, message, to, timeout, consistencyLevel); sendOneWay(message, id, to); return id; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f7c328/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 157079f..102d8b5 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -681,7 +681,7 @@ public class StorageProxy implements StorageProxyMBean { MessageOut<RowMutation> message = rm.createMessage(); for (InetAddress target : endpoints) - MessagingService.instance().sendRR(message, target, handler); + MessagingService.instance().sendRR(message, target, handler, message.getTimeout(), ConsistencyLevel.ONE); } } @@ -866,7 +866,7 @@ public class StorageProxy implements StorageProxyMBean // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0) if (localDataCenter.equals(dc)) { - MessagingService.instance().sendRR(message, destination, responseHandler); + MessagingService.instance().sendRR(message, destination, responseHandler, message.getTimeout(), consistency_level); } else { @@ -982,8 +982,7 @@ public class StorageProxy implements StorageProxyMBean } message = message.withParameter(RowMutation.FORWARD_TO, out.getData()); // send the combined message + forward headers - int id = MessagingService.instance().addCallback(handler, message, target, message.getTimeout(), handler.consistencyLevel); - MessagingService.instance().sendOneWay(message, id, target); + int id = MessagingService.instance().sendRR(message, target, handler, message.getTimeout(), handler.consistencyLevel); logger.trace("Sending message to {}@{}", id, target); } catch (IOException e) @@ -1044,8 +1043,7 @@ public class StorageProxy implements StorageProxyMBean Tracing.trace("Enqueuing counter update to {}", endpoint); MessageOut<CounterMutation> message = cm.makeMutationMessage(); - int id = MessagingService.instance().addCallback(responseHandler, message, endpoint, message.getTimeout(), cm.consistency()); - MessagingService.instance().sendOneWay(message, id, endpoint); + MessagingService.instance().sendRR(message, endpoint, responseHandler, message.getTimeout(), cm.consistency()); return responseHandler; } }