Updated Branches: refs/heads/trunk fa7d87de8 -> 3d1cfd1c9
send dc-local replicas inline instead of putting them in a Map patch by jbellis; reviewed by dbrosius for CASSANDRA-5538 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d0dc5972 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d0dc5972 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d0dc5972 Branch: refs/heads/trunk Commit: d0dc597261b0b9949ecd26150cd30ef89ecc7a21 Parents: be0523a Author: Jonathan Ellis <jbel...@apache.org> Authored: Thu May 30 20:09:30 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Thu May 30 20:09:30 2013 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/service/StorageProxy.java | 68 +++++++------- 1 files changed, 34 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0dc5972/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 7e288ab..3da923b 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -603,7 +603,9 @@ public class StorageProxy implements StorageProxyMBean } else { - sendMessagesToOneDC(rm.createMessage(), endpoints, true, handler); + MessageOut<RowMutation> message = rm.createMessage(); + for (InetAddress target : endpoints) + MessagingService.instance().sendRR(message, target, handler); } } @@ -749,8 +751,10 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel consistency_level) throws OverloadedException { - // replicas grouped by datacenter + // extra-datacenter replicas, grouped by dc Map<String, Collection<InetAddress>> dcGroups = null; + // only need to create a Message for non-local writes + MessageOut<RowMutation> message = null; for (InetAddress destination : targets) { @@ -774,17 +778,28 @@ public class StorageProxy implements StorageProxyMBean else { // belongs on a different server + if (message == null) + message = rm.createMessage(); + String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination); - Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null; - if (messages == null) + // direct writes to local DC or old Cassandra versions + // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0) + if (localDataCenter.equals(dc) || MessagingService.instance().getVersion(destination) < MessagingService.VERSION_20) { - messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas - if (dcGroups == null) - dcGroups = new HashMap<String, Collection<InetAddress>>(); - dcGroups.put(dc, messages); + MessagingService.instance().sendRR(message, destination, responseHandler); + } + else + { + Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null; + if (messages == null) + { + messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas + if (dcGroups == null) + dcGroups = new HashMap<String, Collection<InetAddress>>(); + dcGroups.put(dc, messages); + } + messages.add(destination); } - - messages.add(destination); } } else @@ -799,16 +814,16 @@ public class StorageProxy implements StorageProxyMBean if (dcGroups != null) { - MessageOut<RowMutation> message = rm.createMessage(); // for each datacenter, send the message to one node to relay the write to other replicas - for (Map.Entry<String, Collection<InetAddress>> entry: dcGroups.entrySet()) + if (message == null) + message = rm.createMessage(); + + for (Collection<InetAddress> dcTargets : dcGroups.values()) { - boolean isLocalDC = entry.getKey().equals(localDataCenter); - Collection<InetAddress> dcTargets = entry.getValue(); - // a single message object is used for unhinted writes, so clean out any forwards - // from previous loop iterations + // clean out any forwards from previous loop iterations message = message.withHeaderRemoved(RowMutation.FORWARD_TO); - sendMessagesToOneDC(message, dcTargets, isLocalDC, responseHandler); + + sendMessagesToNonlocalDC(message, dcTargets, responseHandler); } } } @@ -860,27 +875,12 @@ public class StorageProxy implements StorageProxyMBean totalHints.incrementAndGet(); } - private static void sendMessagesToOneDC(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler) + private static void sendMessagesToNonlocalDC(MessageOut message, Collection<InetAddress> targets, AbstractWriteResponseHandler handler) { Iterator<InetAddress> iter = targets.iterator(); InetAddress target = iter.next(); - // direct writes to local DC or old Cassandra versions - // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0) - if (localDC || MessagingService.instance().getVersion(target) < MessagingService.VERSION_20) - { - // yes, the loop and non-loop code here are the same; this is clunky but we want to avoid - // creating a second iterator since we already have a perfectly good one - MessagingService.instance().sendRR(message, target, handler); - while (iter.hasNext()) - { - target = iter.next(); - MessagingService.instance().sendRR(message, target, handler); - } - return; - } - - // Add all the other destinations of the same message as a FORWARD_HEADER entry + // Add the other destinations of the same message as a FORWARD_HEADER entry DataOutputBuffer out = new DataOutputBuffer(); try {