Author: jbellis
Date: Thu Dec  8 20:31:23 2011
New Revision: 1212088

URL: http://svn.apache.org/viewvc?rev=1212088&view=rev
Log:
multi-dc replication optimization supporting CL > ONE
patch by Vijay and jbellis for CASSANDRA-3577

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1212088&r1=1212087&r2=1212088&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Dec  8 20:31:23 2011
@@ -1,4 +1,5 @@
 1.1-dev
+ * multi-dc replication optimization supporting CL > ONE (CASSANDRA-3577)
  * add command to stop compactions (CASSANDRA-1740, 3566)
  * multithreaded streaming (CASSANDRA-3494)
  * removed in-tree redhat spec (CASSANDRA-3567)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1212088&r1=1212087&r2=1212088&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java 
Thu Dec  8 20:31:23 2011
@@ -18,18 +18,18 @@
 
 package org.apache.cassandra.db;
 
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.UnknownHostException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.FBUtilities;
-
 
 public class RowMutationVerbHandler implements IVerbHandler
 {
@@ -45,7 +45,7 @@ public class RowMutationVerbHandler impl
 
             // Check if there were any forwarding headers in this message
             byte[] forwardBytes = 
message.getHeader(RowMutation.FORWARD_HEADER);
-            if (forwardBytes != null)
+            if (forwardBytes != null && message.getVersion() >= 
MessagingService.VERSION_11)
                 forwardToLocalNodes(message, forwardBytes);
 
             rm.apply();
@@ -62,32 +62,26 @@ public class RowMutationVerbHandler impl
         }
     }  
     
-    private void forwardToLocalNodes(Message message, byte[] forwardBytes) 
throws UnknownHostException
+    /**
+     * Older version (< 1.0) will not send this message at all, hence we don't
+     * need to check the version of the data.
+     */
+    private void forwardToLocalNodes(Message message, byte[] forwardBytes) 
throws IOException
     {
+        DataInputStream dis = new DataInputStream(new 
FastByteArrayInputStream(forwardBytes));
+        int size = dis.readInt();
+        
         // remove fwds from message to avoid infinite loop
         Message messageCopy = 
message.withHeaderRemoved(RowMutation.FORWARD_HEADER);
-
-        int bytesPerInetAddress = 
FBUtilities.getBroadcastAddress().getAddress().length;
-        assert forwardBytes.length >= bytesPerInetAddress;
-        assert forwardBytes.length % bytesPerInetAddress == 0;
-
-        int offset = 0;
-        byte[] addressBytes = new byte[bytesPerInetAddress];
-
-        // Send a message to each of the addresses on our Forward List
-        while (offset < forwardBytes.length)
+        for (int i = 0; i < size; i++)
         {
-            System.arraycopy(forwardBytes, offset, addressBytes, 0, 
bytesPerInetAddress);
-            InetAddress address = InetAddress.getByAddress(addressBytes);
-
+            // Send a message to each of the addresses on our Forward List
+            InetAddress address = 
CompactEndpointSerializationHelper.deserialize(dis);
+            String id = dis.readUTF();
             if (logger_.isDebugEnabled())
-                logger_.debug("Forwarding message to " + address);
-
-            // Send the original message to the address specified by the 
FORWARD_HINT
+                logger_.debug("Forwarding message to " + address + " with= ID: 
" + id);
             // Let the response go back to the coordinator
-            MessagingService.instance().sendOneWay(messageCopy, address);
-
-            offset += bytesPerInetAddress;
+            MessagingService.instance().sendOneWay(messageCopy, id, address);
         }
     }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1212088&r1=1212087&r2=1212088&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Thu 
Dec  8 20:31:23 2011
@@ -341,8 +341,14 @@ public final class MessagingService impl
         return verbHandlers_.get(type);
     }
 
-    private void addCallback(IMessageCallback cb, String messageId, Message 
message, InetAddress to, long timeout)
+    public String addCallback(IMessageCallback cb, Message message, 
InetAddress to)
     {
+        return addCallback(cb, message, to, DEFAULT_CALLBACK_TIMEOUT);
+    }
+    
+    public String addCallback(IMessageCallback cb, Message message, 
InetAddress to, long timeout)
+    {
+        String messageId = nextId();
         CallbackInfo previous;
 
         // If HH is enabled and this is a mutation message => store the 
message to track for potential hints.
@@ -352,6 +358,7 @@ public final class MessagingService impl
             previous = callbacks.put(messageId, new CallbackInfo(to, cb), 
timeout);
 
         assert previous == null;
+        return messageId;
     }
 
     private static AtomicInteger idGen = new AtomicInteger(0);
@@ -384,8 +391,7 @@ public final class MessagingService impl
      */
     public String sendRR(Message message, InetAddress to, IMessageCallback cb, 
long timeout)
     {
-        String id = nextId();
-        addCallback(cb, id, message, to, timeout);
+        String id = addCallback(cb, message, to, timeout);
         sendOneWay(message, id, to);
         return id;
     }
@@ -426,7 +432,7 @@ public final class MessagingService impl
      * @param message messages to be sent.
      * @param to endpoint to which the message needs to be sent
      */
-    private void sendOneWay(Message message, String id, InetAddress to)
+    public void sendOneWay(Message message, String id, InetAddress to)
     {
         if (logger_.isTraceEnabled())
             logger_.trace(FBUtilities.getBroadcastAddress() + " sending " + 
message.getVerb() + " to " + id + "@" + to);

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1212088&r1=1212087&r2=1212088&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu 
Dec  8 20:31:23 2011
@@ -400,33 +400,41 @@ public class StorageProxy implements Sto
                 // a single message object is used for unhinted writes, so 
clean out any forwards
                 // from previous loop iterations
                 message = 
message.withHeaderRemoved(RowMutation.FORWARD_HEADER);
+                Iterator<InetAddress> iter = messages.getValue().iterator();
+                InetAddress target = iter.next();
 
-                if (dataCenter.equals(localDataCenter))
+                // direct writes to local DC or old Cassadra versions
+                if (dataCenter.equals(localDataCenter) || 
Gossiper.instance.getVersion(target) < MessagingService.VERSION_11)
                 {
-                    // direct writes to local DC or old Cassadra versions
-                    for (InetAddress destination : messages.getValue())
-                        MessagingService.instance().sendRR(message, 
destination, handler);
-                }
-                else
-                {
-                    // Non-local DC. First endpoint in list is the destination 
for this group
-                    Iterator<InetAddress> iter = 
messages.getValue().iterator();
-                    InetAddress target = iter.next();
-                    // Add all the other destinations of the same message as a 
header in the primary message.
-                    if (iter.hasNext())
-                    {
-                        FastByteArrayOutputStream bos = new 
FastByteArrayOutputStream();
-                        DataOutputStream dos = new DataOutputStream(bos);
-                        while (iter.hasNext())
-                        {
-                            InetAddress destination = iter.next();
-                            dos.write(destination.getAddress());
-                        }
-                        message = 
message.withHeaderAdded(RowMutation.FORWARD_HEADER, bos.toByteArray());
-                    }
-                    // send the combined message + forward headers
+                    // yes, the loop and non-loop code here are the same; this 
is clunky but we want to avoid
+                    // creating a second iterator since we already have a 
perfectly good one
                     MessagingService.instance().sendRR(message, target, 
handler);
+                    while (iter.hasNext())
+                    {
+                        target = iter.next();
+                        MessagingService.instance().sendRR(message, target, 
handler);
+                    }
+                    continue;
+                }
+
+                // Add all the other destinations of the same message as a 
FORWARD_HEADER entry
+                FastByteArrayOutputStream bos = new 
FastByteArrayOutputStream();
+                DataOutputStream dos = new DataOutputStream(bos);
+                dos.writeInt(messages.getValue().size() - 1);
+                while (iter.hasNext())
+                {
+                    InetAddress destination = iter.next();
+                    CompactEndpointSerializationHelper.serialize(destination, 
dos);
+                    String id = 
MessagingService.instance().addCallback(handler, message, destination);
+                    dos.writeUTF(id);
+                    if (logger.isDebugEnabled())
+                        logger.debug("Adding FWD message to: " + destination + 
" with ID " + id);
                 }
+                message = message.withHeaderAdded(RowMutation.FORWARD_HEADER, 
bos.toByteArray());
+                // send the combined message + forward headers
+                String id = MessagingService.instance().sendRR(message, 
target, handler);
+                if (logger.isDebugEnabled())
+                    logger.debug("Sending message to: " + target + " with ID " 
+ id);
             }
         }
     }


Reply via email to