Author: jbellis Date: Thu Dec 23 01:06:49 2010 New Revision: 1052123 URL: http://svn.apache.org/viewvc?rev=1052123&view=rev Log: More-efficient cross-DC replication patch by Joaquin Casares, Jake Luciani, and jbellis for CASSANDRA-1530
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/SimpleSnitch.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Modified: cassandra/branches/cassandra-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1052123&r1=1052122&r2=1052123&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.7/CHANGES.txt Thu Dec 23 01:06:49 2010 @@ -3,6 +3,7 @@ dev * count timeouts in storageproxy latencies, and include latency histograms in StorageProxyMBean (CASSANDRA-1893) * check log4j configuration for changes every 10s (CASSANDRA-1525) + * More-efficient cross-DC replication (CASSANDRA-1530) 0.7.0-rc3 Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java?rev=1052123&r1=1052122&r2=1052123&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java Thu Dec 23 01:06:49 2010 @@ -48,6 +48,7 @@ public class RowMutation { private static ICompactSerializer<RowMutation> serializer_; public static final String HINT = "HINT"; + public static final String FORWARD_HEADER = "FORWARD"; static { Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1052123&r1=1052122&r2=1052123&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Thu Dec 23 01:06:49 2010 @@ -18,25 +18,23 @@ package org.apache.cassandra.db; -import java.io.*; - +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import com.google.common.base.Charsets; - -import org.apache.cassandra.net.IVerbHandler; -import org.apache.cassandra.net.Message; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.net.*; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import static com.google.common.base.Charsets.UTF_8; - public class RowMutationVerbHandler implements IVerbHandler { @@ -69,6 +67,11 @@ public class RowMutationVerbHandler impl hintedMutation.apply(); } } + + // Check if there were any forwarding headers in this message + byte[] forwardBytes = message.getHeader(RowMutation.FORWARD_HEADER); + if (forwardBytes != null) + forwardToLocalNodes(message, forwardBytes); Table.open(rm.getTable()).apply(rm, bytes, true); @@ -82,5 +85,34 @@ public class RowMutationVerbHandler impl { logger_.error("Error in row mutation", e); } + } + + private void forwardToLocalNodes(Message message, byte[] forwardBytes) throws UnknownHostException + { + // remove fwds from message to avoid infinite loop + message.setHeader(RowMutation.FORWARD_HEADER, null); + + int bytesPerInetAddress = FBUtilities.getLocalAddress().getAddress().length; + assert forwardBytes.length >= bytesPerInetAddress; + assert forwardBytes.length % bytesPerInetAddress == 0; + + int offset = 0; + byte[] addressBytes = new byte[bytesPerInetAddress]; + + // Send a message to each of the addresses on our Forward List + while (offset < forwardBytes.length) + { + System.arraycopy(forwardBytes, offset, addressBytes, 0, bytesPerInetAddress); + InetAddress address = InetAddress.getByAddress(addressBytes); + + if (logger_.isDebugEnabled()) + logger_.debug("Forwarding message to " + address); + + // Send the original message to the address specified by the FORWARD_HINT + // Let the response go back to the coordinator + MessagingService.instance.sendOneWay(message, message.getFrom()); + + offset += bytesPerInetAddress; + } } } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/SimpleSnitch.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/SimpleSnitch.java?rev=1052123&r1=1052122&r2=1052123&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/SimpleSnitch.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/SimpleSnitch.java Thu Dec 23 01:06:49 2010 @@ -32,12 +32,12 @@ public class SimpleSnitch extends Abstra { public String getRack(InetAddress endpoint) { - throw new UnsupportedOperationException(); + return "rack1"; } public String getDatacenter(InetAddress endpoint) { - throw new UnsupportedOperationException(); + return "datacenter1"; } public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses) Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1052123&r1=1052122&r2=1052123&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Thu Dec 23 01:06:49 2010 @@ -27,20 +27,23 @@ import java.util.concurrent.*; import javax.management.MBeanServer; import javax.management.ObjectName; +import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; -import static com.google.common.base.Charsets.UTF_8; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; -import org.apache.cassandra.dht.*; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.TokenMetadata; @@ -53,7 +56,8 @@ import org.apache.cassandra.utils.FBUtil import org.apache.cassandra.utils.LatencyTracker; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; -import org.apache.cassandra.db.filter.QueryFilter; + +import static com.google.common.base.Charsets.UTF_8; public class StorageProxy implements StorageProxyMBean { @@ -93,10 +97,11 @@ public class StorageProxy implements Sto public static void mutate(List<RowMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, TimeoutException { long startTime = System.nanoTime(); - ArrayList<IWriteResponseHandler> responseHandlers = new ArrayList<IWriteResponseHandler>(); + List<IWriteResponseHandler> responseHandlers = new ArrayList<IWriteResponseHandler>(); RowMutation mostRecentRowMutation = null; StorageService ss = StorageService.instance; + String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getLocalAddress()); try { @@ -110,17 +115,24 @@ public class StorageProxy implements Sto Collection<InetAddress> writeEndpoints = ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(rm.key()), table, naturalEndpoints); Multimap<InetAddress, InetAddress> hintedEndpoints = rs.getHintedEndpoints(writeEndpoints); - // send out the writes, as in mutate() above, but this time with a callback that tracks responses final IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level); + + // exit early if we can't fulfill the CL at this time responseHandler.assureSufficientLiveNodes(); - + responseHandlers.add(responseHandler); + + // Multimap that holds onto all the messages and addresses meant for a specific datacenter + Multimap<String, Pair<Message, InetAddress>> dcMessages = HashMultimap.create(hintedEndpoints.size(), 10); Message unhintedMessage = null; + for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet()) { InetAddress destination = entry.getKey(); Collection<InetAddress> targets = entry.getValue(); + String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination); + if (targets.size() == 1 && targets.iterator().next().equals(destination)) { // unhinted writes @@ -130,7 +142,7 @@ public class StorageProxy implements Sto } else { - // belongs on a different server. send it there. + // belongs on a different server if (unhintedMessage == null) { unhintedMessage = rm.makeRowMutationMessage(); @@ -138,7 +150,7 @@ public class StorageProxy implements Sto } if (logger.isDebugEnabled()) logger.debug("insert writing key " + FBUtilities.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + "@" + destination); - MessagingService.instance.sendOneWay(unhintedMessage, destination); + dcMessages.put(dc, new Pair<Message, InetAddress>(unhintedMessage, destination)); } } else @@ -155,15 +167,16 @@ public class StorageProxy implements Sto } } responseHandler.addHintCallback(hintedMessage, destination); - MessagingService.instance.sendOneWay(hintedMessage, destination); + dcMessages.put(dc, new Pair<Message, InetAddress>(hintedMessage, destination)); } } + + sendMessages(localDataCenter, dcMessages); } + // wait for writes. throws timeoutexception if necessary for (IWriteResponseHandler responseHandler : responseHandlers) - { responseHandler.get(); - } } catch (IOException e) { @@ -178,6 +191,59 @@ public class StorageProxy implements Sto } } + /** + * for each datacenter, send a message to one node to relay the write to other replicas + */ + private static void sendMessages(String localDataCenter, Multimap<String, Pair<Message, InetAddress>> dcMessages) + throws IOException + { + for (Map.Entry<String, Collection<Pair<Message, InetAddress>>> entry : dcMessages.asMap().entrySet()) + { + String dataCenter = entry.getKey(); + + // Grab a set of all the messages bound for this dataCenter and create an iterator over this set. + Collection<Pair<Message, InetAddress>> messagesForDataCenter = entry.getValue(); + Iterator<Pair<Message, InetAddress>> iter = messagesForDataCenter.iterator(); + assert iter.hasNext(); + + // First endpoint in list is the destination for this group + Pair<Message, InetAddress> messageAndDestination = iter.next(); + + Message primaryMessage = messageAndDestination.left; + InetAddress target = messageAndDestination.right; + + // Add all the other destinations that are bound for the same dataCenter as a header in the primary message. + while (iter.hasNext()) + { + messageAndDestination = iter.next(); + assert messageAndDestination.left == primaryMessage; + + if (dataCenter.equals(localDataCenter)) + { + // direct write to local DC + assert primaryMessage.getHeader(RowMutation.FORWARD_HEADER) == null; + MessagingService.instance.sendOneWay(primaryMessage, target); + } + else + { + // group all nodes in this DC as forward headers on the primary message + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + + // append to older addresses + byte[] previousHints = primaryMessage.getHeader(RowMutation.FORWARD_HEADER); + if (previousHints != null) + dos.write(previousHints); + + dos.write(messageAndDestination.right.getAddress()); + primaryMessage.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray()); + } + } + + MessagingService.instance.sendOneWay(primaryMessage, target); + } + } + private static void addHintHeader(Message message, InetAddress target) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream();