Author: jbellis Date: Thu Dec 8 20:31:23 2011 New Revision: 1212088 URL: http://svn.apache.org/viewvc?rev=1212088&view=rev Log: multi-dc replication optimization supporting CL > ONE patch by Vijay and jbellis for CASSANDRA-3577
Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1212088&r1=1212087&r2=1212088&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Thu Dec 8 20:31:23 2011 @@ -1,4 +1,5 @@ 1.1-dev + * multi-dc replication optimization supporting CL > ONE (CASSANDRA-3577) * add command to stop compactions (CASSANDRA-1740, 3566) * multithreaded streaming (CASSANDRA-3494) * removed in-tree redhat spec (CASSANDRA-3567) Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1212088&r1=1212087&r2=1212088&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Thu Dec 8 20:31:23 2011 @@ -18,18 +18,18 @@ package org.apache.cassandra.db; +import java.io.DataInputStream; import java.io.IOException; import java.net.InetAddress; -import java.net.UnknownHostException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.io.util.FastByteArrayInputStream; +import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.utils.FBUtilities; - public class RowMutationVerbHandler implements IVerbHandler { @@ -45,7 +45,7 @@ public class RowMutationVerbHandler impl // Check if there were any forwarding headers in this message byte[] forwardBytes = message.getHeader(RowMutation.FORWARD_HEADER); - if (forwardBytes != null) + if (forwardBytes != null && message.getVersion() >= MessagingService.VERSION_11) forwardToLocalNodes(message, forwardBytes); rm.apply(); @@ -62,32 +62,26 @@ public class RowMutationVerbHandler impl } } - private void forwardToLocalNodes(Message message, byte[] forwardBytes) throws UnknownHostException + /** + * Older version (< 1.0) will not send this message at all, hence we don't + * need to check the version of the data. + */ + private void forwardToLocalNodes(Message message, byte[] forwardBytes) throws IOException { + DataInputStream dis = new DataInputStream(new FastByteArrayInputStream(forwardBytes)); + int size = dis.readInt(); + // remove fwds from message to avoid infinite loop Message messageCopy = message.withHeaderRemoved(RowMutation.FORWARD_HEADER); - - int bytesPerInetAddress = FBUtilities.getBroadcastAddress().getAddress().length; - assert forwardBytes.length >= bytesPerInetAddress; - assert forwardBytes.length % bytesPerInetAddress == 0; - - int offset = 0; - byte[] addressBytes = new byte[bytesPerInetAddress]; - - // Send a message to each of the addresses on our Forward List - while (offset < forwardBytes.length) + for (int i = 0; i < size; i++) { - System.arraycopy(forwardBytes, offset, addressBytes, 0, bytesPerInetAddress); - InetAddress address = InetAddress.getByAddress(addressBytes); - + // Send a message to each of the addresses on our Forward List + InetAddress address = CompactEndpointSerializationHelper.deserialize(dis); + String id = dis.readUTF(); if (logger_.isDebugEnabled()) - logger_.debug("Forwarding message to " + address); - - // Send the original message to the address specified by the FORWARD_HINT + logger_.debug("Forwarding message to " + address + " with= ID: " + id); // Let the response go back to the coordinator - MessagingService.instance().sendOneWay(messageCopy, address); - - offset += bytesPerInetAddress; + MessagingService.instance().sendOneWay(messageCopy, id, address); } } } Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1212088&r1=1212087&r2=1212088&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Thu Dec 8 20:31:23 2011 @@ -341,8 +341,14 @@ public final class MessagingService impl return verbHandlers_.get(type); } - private void addCallback(IMessageCallback cb, String messageId, Message message, InetAddress to, long timeout) + public String addCallback(IMessageCallback cb, Message message, InetAddress to) { + return addCallback(cb, message, to, DEFAULT_CALLBACK_TIMEOUT); + } + + public String addCallback(IMessageCallback cb, Message message, InetAddress to, long timeout) + { + String messageId = nextId(); CallbackInfo previous; // If HH is enabled and this is a mutation message => store the message to track for potential hints. @@ -352,6 +358,7 @@ public final class MessagingService impl previous = callbacks.put(messageId, new CallbackInfo(to, cb), timeout); assert previous == null; + return messageId; } private static AtomicInteger idGen = new AtomicInteger(0); @@ -384,8 +391,7 @@ public final class MessagingService impl */ public String sendRR(Message message, InetAddress to, IMessageCallback cb, long timeout) { - String id = nextId(); - addCallback(cb, id, message, to, timeout); + String id = addCallback(cb, message, to, timeout); sendOneWay(message, id, to); return id; } @@ -426,7 +432,7 @@ public final class MessagingService impl * @param message messages to be sent. * @param to endpoint to which the message needs to be sent */ - private void sendOneWay(Message message, String id, InetAddress to) + public void sendOneWay(Message message, String id, InetAddress to) { if (logger_.isTraceEnabled()) logger_.trace(FBUtilities.getBroadcastAddress() + " sending " + message.getVerb() + " to " + id + "@" + to); Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1212088&r1=1212087&r2=1212088&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu Dec 8 20:31:23 2011 @@ -400,33 +400,41 @@ public class StorageProxy implements Sto // a single message object is used for unhinted writes, so clean out any forwards // from previous loop iterations message = message.withHeaderRemoved(RowMutation.FORWARD_HEADER); + Iterator<InetAddress> iter = messages.getValue().iterator(); + InetAddress target = iter.next(); - if (dataCenter.equals(localDataCenter)) + // direct writes to local DC or old Cassadra versions + if (dataCenter.equals(localDataCenter) || Gossiper.instance.getVersion(target) < MessagingService.VERSION_11) { - // direct writes to local DC or old Cassadra versions - for (InetAddress destination : messages.getValue()) - MessagingService.instance().sendRR(message, destination, handler); - } - else - { - // Non-local DC. First endpoint in list is the destination for this group - Iterator<InetAddress> iter = messages.getValue().iterator(); - InetAddress target = iter.next(); - // Add all the other destinations of the same message as a header in the primary message. - if (iter.hasNext()) - { - FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - while (iter.hasNext()) - { - InetAddress destination = iter.next(); - dos.write(destination.getAddress()); - } - message = message.withHeaderAdded(RowMutation.FORWARD_HEADER, bos.toByteArray()); - } - // send the combined message + forward headers + // yes, the loop and non-loop code here are the same; this is clunky but we want to avoid + // creating a second iterator since we already have a perfectly good one MessagingService.instance().sendRR(message, target, handler); + while (iter.hasNext()) + { + target = iter.next(); + MessagingService.instance().sendRR(message, target, handler); + } + continue; + } + + // Add all the other destinations of the same message as a FORWARD_HEADER entry + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + dos.writeInt(messages.getValue().size() - 1); + while (iter.hasNext()) + { + InetAddress destination = iter.next(); + CompactEndpointSerializationHelper.serialize(destination, dos); + String id = MessagingService.instance().addCallback(handler, message, destination); + dos.writeUTF(id); + if (logger.isDebugEnabled()) + logger.debug("Adding FWD message to: " + destination + " with ID " + id); } + message = message.withHeaderAdded(RowMutation.FORWARD_HEADER, bos.toByteArray()); + // send the combined message + forward headers + String id = MessagingService.instance().sendRR(message, target, handler); + if (logger.isDebugEnabled()) + logger.debug("Sending message to: " + target + " with ID " + id); } } }