Author: slebresne Date: Fri Nov 25 09:48:15 2011 New Revision: 1206098 URL: http://svn.apache.org/viewvc?rev=1206098&view=rev Log: avoid race in OutboundTcpConnection in multi-DC setups patch by jbellis; reviewed by slebresne for CASSANDRA-3530
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Header.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Message.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1206098&r1=1206097&r2=1206098&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Fri Nov 25 09:48:15 2011 @@ -42,6 +42,7 @@ * Fix bug preventing the use of efficient cross-DC writes (CASSANDRA-3472) * (Hadoop) skip empty rows when entire row is requested, redux (CASSANDRA-2855) * fix concurrence issue in the FailureDetector (CASSANDRA-3519) + * avoids rade in OutboundTcpConnection for multi-DC setups (CASSANDRA-3530) 0.8.7 Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1206098&r1=1206097&r2=1206098&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Fri Nov 25 09:48:15 2011 @@ -86,7 +86,7 @@ public class RowMutationVerbHandler impl private void forwardToLocalNodes(Message message, byte[] forwardBytes) throws UnknownHostException { // remove fwds from message to avoid infinite loop - message.removeHeader(RowMutation.FORWARD_HEADER); + Message messageCopy = message.withHeaderRemoved(RowMutation.FORWARD_HEADER); int bytesPerInetAddress = FBUtilities.getLocalAddress().getAddress().length; assert forwardBytes.length >= bytesPerInetAddress; @@ -106,7 +106,7 @@ public class RowMutationVerbHandler impl // Send the original message to the address specified by the FORWARD_HINT // Let the response go back to the coordinator - MessagingService.instance().sendOneWay(message, address); + MessagingService.instance().sendOneWay(messageCopy, address); offset += bytesPerInetAddress; } Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Header.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Header.java?rev=1206098&r1=1206097&r2=1206098&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Header.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Header.java Fri Nov 25 09:48:15 2011 @@ -22,6 +22,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.InetAddress; +import java.util.Collections; import java.util.Hashtable; import java.util.Map; import java.util.Set; @@ -30,6 +31,9 @@ import java.util.concurrent.atomic.Atomi import org.apache.cassandra.io.ICompactSerializer; import org.apache.cassandra.service.StorageService; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; + public class Header { private static ICompactSerializer<Header> serializer_; @@ -47,21 +51,21 @@ public class Header private final InetAddress from_; // TODO STAGE can be determined from verb private final StorageService.Verb verb_; - protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>(); + protected final Map<String, byte[]> details_; Header(InetAddress from, StorageService.Verb verb) { + this(from, verb, Collections.<String, byte[]>emptyMap()); + } + + Header(InetAddress from, StorageService.Verb verb, Map<String, byte[]> details) + { assert from != null; assert verb != null; from_ = from; verb_ = verb; - } - - Header(InetAddress from, StorageService.Verb verb, Map<String, byte[]> details) - { - this(from, verb); - details_ = details; + details_ = ImmutableMap.copyOf(details); } InetAddress getFrom() @@ -79,14 +83,20 @@ public class Header return details_.get(key); } - void setDetail(String key, byte[] value) + Header withDetailsAdded(String key, byte[] value) { - details_.put(key, value); + Map<String, byte[]> detailsCopy = Maps.newHashMap(details_); + detailsCopy.put(key, value); + return new Header(from_, verb_, detailsCopy); } - void removeDetail(String key) + Header withDetailsRemoved(String key) { - details_.remove(key); + if (!details_.containsKey(key)) + return this; + Map<String, byte[]> detailsCopy = Maps.newHashMap(details_); + detailsCopy.remove(key); + return new Header(from_, verb_, detailsCopy); } } Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Message.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Message.java?rev=1206098&r1=1206097&r2=1206098&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Message.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Message.java Fri Nov 25 09:48:15 2011 @@ -66,14 +66,14 @@ public class Message return header_.getDetail(key); } - public void setHeader(String key, byte[] value) + public Message withHeaderAdded(String key, byte[] value) { - header_.setDetail(key, value); + return new Message(header_.withDetailsAdded(key, value), body_, version); } - public void removeHeader(String key) + public Message withHeaderRemoved(String key) { - header_.removeDetail(key); + return new Message(header_.withDetailsRemoved(key), body_, version); } public byte[] getMessageBody() Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1206098&r1=1206097&r2=1206098&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java Fri Nov 25 09:48:15 2011 @@ -273,7 +273,7 @@ public class StorageProxy implements Sto { if (!target.equals(destination)) { - addHintHeader(hintedMessage, target); + hintedMessage = addHintHeader(hintedMessage, target); if (logger.isDebugEnabled()) logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination + " for " + target); } @@ -304,7 +304,7 @@ public class StorageProxy implements Sto Message message = messages.getKey(); // a single message object is used for unhinted writes, so clean out any forwards // from previous loop iterations - message.removeHeader(RowMutation.FORWARD_HEADER); + message = message.withHeaderRemoved(RowMutation.FORWARD_HEADER); if (dataCenter.equals(localDataCenter)) { @@ -318,21 +318,14 @@ public class StorageProxy implements Sto Iterator<InetAddress> iter = messages.getValue().iterator(); InetAddress target = iter.next(); // Add all the other destinations of the same message as a header in the primary message. + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); while (iter.hasNext()) { InetAddress destination = iter.next(); - // group all nodes in this DC as forward headers on the primary message - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - - // append to older addresses - byte[] previousHints = message.getHeader(RowMutation.FORWARD_HEADER); - if (previousHints != null) - dos.write(previousHints); - dos.write(destination.getAddress()); - message.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray()); } + message = message.withHeaderAdded(RowMutation.FORWARD_HEADER, bos.toByteArray()); // send the combined message + forward headers MessagingService.instance().sendRR(message, target, handler); } @@ -340,7 +333,7 @@ public class StorageProxy implements Sto } } - private static void addHintHeader(Message message, InetAddress target) throws IOException + private static Message addHintHeader(Message message, InetAddress target) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); @@ -350,7 +343,7 @@ public class StorageProxy implements Sto dos.write(previousHints); } ByteBufferUtil.writeWithShortLength(ByteBufferUtil.bytes(target.getHostAddress()), dos); - message.setHeader(RowMutation.HINT, bos.toByteArray()); + return message.withHeaderAdded(RowMutation.HINT, bos.toByteArray()); } private static void insertLocal(final RowMutation rm, final IWriteResponseHandler responseHandler)