Author: jbellis Date: Tue Jan 25 20:12:15 2011 New Revision: 1063435 URL: http://svn.apache.org/viewvc?rev=1063435&view=rev Log: clean out forward headers from message between loops patch by ivancso and jbellis; reviewed by tjake for CASSANDRA-2051
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1063435&r1=1063434&r2=1063435&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Tue Jan 25 20:12:15 2011 @@ -226,31 +226,29 @@ public class StorageProxy implements Sto { String dataCenter = entry.getKey(); - // Grab a set of all the messages bound for this dataCenter and create an iterator over this set. - Map<Message, Collection<InetAddress>> messagesForDataCenter = entry.getValue().asMap(); - - for (Map.Entry<Message, Collection<InetAddress>> messages: messagesForDataCenter.entrySet()) + // send the messages corresponding to this datacenter + for (Map.Entry<Message, Collection<InetAddress>> messages: entry.getValue().asMap().entrySet()) { Message message = messages.getKey(); - Iterator<InetAddress> iter = messages.getValue().iterator(); - assert iter.hasNext(); - - // First endpoint in list is the destination for this group - InetAddress target = iter.next(); + // a single message object is used for unhinted writes, so clean out any forwards + // from previous loop iterations + message.removeHeader(RowMutation.FORWARD_HEADER); - // Add all the other destinations that are bound for the same dataCenter as a header in the primary message. - while (iter.hasNext()) + if (dataCenter.equals(localDataCenter)) { - InetAddress destination = iter.next(); - - if (dataCenter.equals(localDataCenter)) - { - // direct write to local DC - assert message.getHeader(RowMutation.FORWARD_HEADER) == null; + // direct writes to local DC + for (InetAddress destination : messages.getValue()) MessagingService.instance().sendOneWay(message, destination); - } - else + } + else + { + // Non-local DC. First endpoint in list is the destination for this group + Iterator<InetAddress> iter = messages.getValue().iterator(); + InetAddress target = iter.next(); + // Add all the other destinations of the same message as a header in the primary message. + while (iter.hasNext()) { + InetAddress destination = iter.next(); // group all nodes in this DC as forward headers on the primary message ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); @@ -263,9 +261,9 @@ public class StorageProxy implements Sto dos.write(destination.getAddress()); message.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray()); } + // send the combined message + forward headers + MessagingService.instance().sendOneWay(message, target); } - - MessagingService.instance().sendOneWay(message, target); } } }