Updated Branches:
  refs/heads/trunk fa7d87de8 -> 3d1cfd1c9

send dc-local replicas inline instead of putting them in a Map
patch by jbellis; reviewed by dbrosius for CASSANDRA-5538


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d0dc5972
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d0dc5972
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d0dc5972

Branch: refs/heads/trunk
Commit: d0dc597261b0b9949ecd26150cd30ef89ecc7a21
Parents: be0523a
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Thu May 30 20:09:30 2013 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Thu May 30 20:09:30 2013 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/service/StorageProxy.java |   68 +++++++-------
 1 files changed, 34 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0dc5972/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 7e288ab..3da923b 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -603,7 +603,9 @@ public class StorageProxy implements StorageProxyMBean
         }
         else
         {
-            sendMessagesToOneDC(rm.createMessage(), endpoints, true, handler);
+            MessageOut<RowMutation> message = rm.createMessage();
+            for (InetAddress target : endpoints)
+                MessagingService.instance().sendRR(message, target, handler);
         }
     }
 
@@ -749,8 +751,10 @@ public class StorageProxy implements StorageProxyMBean
                                              ConsistencyLevel 
consistency_level)
     throws OverloadedException
     {
-        // replicas grouped by datacenter
+        // extra-datacenter replicas, grouped by dc
         Map<String, Collection<InetAddress>> dcGroups = null;
+        // only need to create a Message for non-local writes
+        MessageOut<RowMutation> message = null;
 
         for (InetAddress destination : targets)
         {
@@ -774,17 +778,28 @@ public class StorageProxy implements StorageProxyMBean
                 else
                 {
                     // belongs on a different server
+                    if (message == null)
+                        message = rm.createMessage();
+
                     String dc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
-                    Collection<InetAddress> messages = (dcGroups != null) ? 
dcGroups.get(dc) : null;
-                    if (messages == null)
+                    // direct writes to local DC or old Cassandra versions
+                    // (1.1 knows how to forward old-style String message IDs; 
updated to int in 2.0)
+                    if (localDataCenter.equals(dc) || 
MessagingService.instance().getVersion(destination) < 
MessagingService.VERSION_20)
                     {
-                        messages = new ArrayList<InetAddress>(3); // most DCs 
will have <= 3 replicas
-                        if (dcGroups == null)
-                            dcGroups = new HashMap<String, 
Collection<InetAddress>>();
-                        dcGroups.put(dc, messages);
+                        MessagingService.instance().sendRR(message, 
destination, responseHandler);
+                    }
+                    else
+                    {
+                        Collection<InetAddress> messages = (dcGroups != null) 
? dcGroups.get(dc) : null;
+                        if (messages == null)
+                        {
+                            messages = new ArrayList<InetAddress>(3); // most 
DCs will have <= 3 replicas
+                            if (dcGroups == null)
+                                dcGroups = new HashMap<String, 
Collection<InetAddress>>();
+                            dcGroups.put(dc, messages);
+                        }
+                        messages.add(destination);
                     }
-
-                    messages.add(destination);
                 }
             }
             else
@@ -799,16 +814,16 @@ public class StorageProxy implements StorageProxyMBean
 
         if (dcGroups != null)
         {
-            MessageOut<RowMutation> message = rm.createMessage();
             // for each datacenter, send the message to one node to relay the 
write to other replicas
-            for (Map.Entry<String, Collection<InetAddress>> entry: 
dcGroups.entrySet())
+            if (message == null)
+                message = rm.createMessage();
+
+            for (Collection<InetAddress> dcTargets : dcGroups.values())
             {
-                boolean isLocalDC = entry.getKey().equals(localDataCenter);
-                Collection<InetAddress> dcTargets = entry.getValue();
-                // a single message object is used for unhinted writes, so 
clean out any forwards
-                // from previous loop iterations
+                // clean out any forwards from previous loop iterations
                 message = message.withHeaderRemoved(RowMutation.FORWARD_TO);
-                sendMessagesToOneDC(message, dcTargets, isLocalDC, 
responseHandler);
+
+                sendMessagesToNonlocalDC(message, dcTargets, responseHandler);
             }
         }
     }
@@ -860,27 +875,12 @@ public class StorageProxy implements StorageProxyMBean
         totalHints.incrementAndGet();
     }
 
-    private static void sendMessagesToOneDC(MessageOut message, 
Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler 
handler)
+    private static void sendMessagesToNonlocalDC(MessageOut message, 
Collection<InetAddress> targets, AbstractWriteResponseHandler handler)
     {
         Iterator<InetAddress> iter = targets.iterator();
         InetAddress target = iter.next();
 
-        // direct writes to local DC or old Cassandra versions
-        // (1.1 knows how to forward old-style String message IDs; updated to 
int in 2.0)
-        if (localDC || MessagingService.instance().getVersion(target) < 
MessagingService.VERSION_20)
-        {
-            // yes, the loop and non-loop code here are the same; this is 
clunky but we want to avoid
-            // creating a second iterator since we already have a perfectly 
good one
-            MessagingService.instance().sendRR(message, target, handler);
-            while (iter.hasNext())
-            {
-                target = iter.next();
-                MessagingService.instance().sendRR(message, target, handler);
-            }
-            return;
-        }
-
-        // Add all the other destinations of the same message as a 
FORWARD_HEADER entry
+        // Add the other destinations of the same message as a FORWARD_HEADER 
entry
         DataOutputBuffer out = new DataOutputBuffer();
         try
         {

Reply via email to