Author: jbellis
Date: Tue Jan 25 20:12:15 2011
New Revision: 1063435

URL: http://svn.apache.org/viewvc?rev=1063435&view=rev
Log:
clean out forward headers from message between loops
patch by ivancso and jbellis; reviewed by tjake for CASSANDRA-2051

Modified:
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1063435&r1=1063434&r2=1063435&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
 Tue Jan 25 20:12:15 2011
@@ -226,31 +226,29 @@ public class StorageProxy implements Sto
         {
             String dataCenter = entry.getKey();
 
-            // Grab a set of all the messages bound for this dataCenter and 
create an iterator over this set.
-            Map<Message, Collection<InetAddress>> messagesForDataCenter = 
entry.getValue().asMap();
-
-            for (Map.Entry<Message, Collection<InetAddress>> messages: 
messagesForDataCenter.entrySet())
+            // send the messages corresponding to this datacenter
+            for (Map.Entry<Message, Collection<InetAddress>> messages: 
entry.getValue().asMap().entrySet())
             {
                 Message message = messages.getKey();
-                Iterator<InetAddress> iter = messages.getValue().iterator();
-                assert iter.hasNext();
-
-                // First endpoint in list is the destination for this group
-                InetAddress target = iter.next();
+                // a single message object is used for unhinted writes, so 
clean out any forwards
+                // from previous loop iterations
+                message.removeHeader(RowMutation.FORWARD_HEADER);
 
-                // Add all the other destinations that are bound for the same 
dataCenter as a header in the primary message.
-                while (iter.hasNext())
+                if (dataCenter.equals(localDataCenter))
                 {
-                    InetAddress destination = iter.next();
-
-                    if (dataCenter.equals(localDataCenter))
-                    {
-                        // direct write to local DC
-                        assert message.getHeader(RowMutation.FORWARD_HEADER) 
== null;
+                    // direct writes to local DC
+                    for (InetAddress destination : messages.getValue())
                         MessagingService.instance().sendOneWay(message, 
destination);
-                    }
-                    else
+                }
+                else
+                {
+                    // Non-local DC. First endpoint in list is the destination 
for this group
+                    Iterator<InetAddress> iter = 
messages.getValue().iterator();
+                    InetAddress target = iter.next();
+                    // Add all the other destinations of the same message as a 
header in the primary message.
+                    while (iter.hasNext())
                     {
+                        InetAddress destination = iter.next();
                         // group all nodes in this DC as forward headers on 
the primary message
                         ByteArrayOutputStream bos = new 
ByteArrayOutputStream();
                         DataOutputStream dos = new DataOutputStream(bos);
@@ -263,9 +261,9 @@ public class StorageProxy implements Sto
                         dos.write(destination.getAddress());
                         message.setHeader(RowMutation.FORWARD_HEADER, 
bos.toByteArray());
                     }
+                    // send the combined message + forward headers
+                    MessagingService.instance().sendOneWay(message, target);
                 }
-            
-                MessagingService.instance().sendOneWay(message, target);
             }
         }
     }


Reply via email to