Author: jbellis Date: Tue Jan 4 21:34:11 2011 New Revision: 1055187 URL: http://svn.apache.org/viewvc?rev=1055187&view=rev Log: fix batch mutations post-#1530 patch by tjake; reviewed by jbellis for CASSANDRA-1931
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1055187&r1=1055186&r2=1055187&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Tue Jan 4 21:34:11 2011 @@ -123,7 +123,7 @@ public class StorageProxy implements Sto responseHandlers.add(responseHandler); // Multimap that holds onto all the messages and addresses meant for a specific datacenter - Multimap<String, Pair<Message, InetAddress>> dcMessages = HashMultimap.create(hintedEndpoints.size(), 10); + Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size()); Message unhintedMessage = null; for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet()) @@ -150,7 +150,16 @@ public class StorageProxy implements Sto } if (logger.isDebugEnabled()) logger.debug("insert writing key " + FBUtilities.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + "@" + destination); - dcMessages.put(dc, new Pair<Message, InetAddress>(unhintedMessage, destination)); + + + Multimap<Message, InetAddress> messages = dcMessages.get(dc); + if (messages == null) + { + messages = HashMultimap.create(); + dcMessages.put(dc, messages); + } + + messages.put(unhintedMessage, destination); } } else @@ -167,7 +176,16 @@ public class StorageProxy implements Sto } } responseHandler.addHintCallback(hintedMessage, destination); - dcMessages.put(dc, new Pair<Message, InetAddress>(hintedMessage, destination)); + + Multimap<Message, InetAddress> messages = dcMessages.get(dc); + + if (messages == null) + { + messages = HashMultimap.create(); + dcMessages.put(dc, messages); + } + + messages.put(hintedMessage, destination); } } @@ -194,53 +212,55 @@ public class StorageProxy implements Sto /** * for each datacenter, send a message to one node to relay the write to other replicas */ - private static void sendMessages(String localDataCenter, Multimap<String, Pair<Message, InetAddress>> dcMessages) + private static void sendMessages(String localDataCenter, Map<String, Multimap<Message, InetAddress>> dcMessages) throws IOException { - for (Map.Entry<String, Collection<Pair<Message, InetAddress>>> entry : dcMessages.asMap().entrySet()) + for (Map.Entry<String, Multimap<Message, InetAddress>> entry: dcMessages.entrySet()) { String dataCenter = entry.getKey(); // Grab a set of all the messages bound for this dataCenter and create an iterator over this set. - Collection<Pair<Message, InetAddress>> messagesForDataCenter = entry.getValue(); - Iterator<Pair<Message, InetAddress>> iter = messagesForDataCenter.iterator(); - assert iter.hasNext(); - - // First endpoint in list is the destination for this group - Pair<Message, InetAddress> messageAndDestination = iter.next(); - - Message primaryMessage = messageAndDestination.left; - InetAddress target = messageAndDestination.right; + Map<Message, Collection<InetAddress>> messagesForDataCenter = entry.getValue().asMap(); - // Add all the other destinations that are bound for the same dataCenter as a header in the primary message. - while (iter.hasNext()) + for (Map.Entry<Message, Collection<InetAddress>> messages: messagesForDataCenter.entrySet()) { - messageAndDestination = iter.next(); - assert messageAndDestination.left == primaryMessage; + Message message = messages.getKey(); + Iterator<InetAddress> iter = messages.getValue().iterator(); + assert iter.hasNext(); + + // First endpoint in list is the destination for this group + InetAddress target = iter.next(); + - if (dataCenter.equals(localDataCenter)) + // Add all the other destinations that are bound for the same dataCenter as a header in the primary message. + while (iter.hasNext()) { - // direct write to local DC - assert primaryMessage.getHeader(RowMutation.FORWARD_HEADER) == null; - MessagingService.instance().sendOneWay(primaryMessage, target); - } - else - { - // group all nodes in this DC as forward headers on the primary message - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - - // append to older addresses - byte[] previousHints = primaryMessage.getHeader(RowMutation.FORWARD_HEADER); - if (previousHints != null) - dos.write(previousHints); + InetAddress destination = iter.next(); + + if (dataCenter.equals(localDataCenter)) + { + // direct write to local DC + assert message.getHeader(RowMutation.FORWARD_HEADER) == null; + MessagingService.instance().sendOneWay(message, target); + } + else + { + // group all nodes in this DC as forward headers on the primary message + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + + // append to older addresses + byte[] previousHints = message.getHeader(RowMutation.FORWARD_HEADER); + if (previousHints != null) + dos.write(previousHints); - dos.write(messageAndDestination.right.getAddress()); - primaryMessage.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray()); + dos.write(destination.getAddress()); + message.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray()); + } } + + MessagingService.instance().sendOneWay(message, target); } - - MessagingService.instance().sendOneWay(primaryMessage, target); } }