Author: jbellis Date: Fri Nov 25 15:54:12 2011 New Revision: 1206235 URL: http://svn.apache.org/viewvc?rev=1206235&view=rev Log: revert patch for #3530 since the bug it fixes only affects 1.0.3
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Header.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Message.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1206235&r1=1206234&r2=1206235&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Fri Nov 25 15:54:12 2011 @@ -42,7 +42,6 @@ * Fix bug preventing the use of efficient cross-DC writes (CASSANDRA-3472) * (Hadoop) skip empty rows when entire row is requested, redux (CASSANDRA-2855) * fix concurrence issue in the FailureDetector (CASSANDRA-3519) - * avoids rade in OutboundTcpConnection for multi-DC setups (CASSANDRA-3530) 0.8.7 Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1206235&r1=1206234&r2=1206235&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Fri Nov 25 15:54:12 2011 @@ -86,7 +86,7 @@ public class RowMutationVerbHandler impl private void forwardToLocalNodes(Message message, byte[] forwardBytes) throws UnknownHostException { // remove fwds from message to avoid infinite loop - Message messageCopy = message.withHeaderRemoved(RowMutation.FORWARD_HEADER); + message.removeHeader(RowMutation.FORWARD_HEADER); int bytesPerInetAddress = FBUtilities.getLocalAddress().getAddress().length; assert forwardBytes.length >= bytesPerInetAddress; @@ -106,7 +106,7 @@ public class RowMutationVerbHandler impl // Send the original message to the address specified by the FORWARD_HINT // Let the response go back to the coordinator - MessagingService.instance().sendOneWay(messageCopy, address); + MessagingService.instance().sendOneWay(message, address); offset += bytesPerInetAddress; } Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Header.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Header.java?rev=1206235&r1=1206234&r2=1206235&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Header.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Header.java Fri Nov 25 15:54:12 2011 @@ -22,7 +22,6 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.InetAddress; -import java.util.Collections; import java.util.Hashtable; import java.util.Map; import java.util.Set; @@ -31,9 +30,6 @@ import java.util.concurrent.atomic.Atomi import org.apache.cassandra.io.ICompactSerializer; import org.apache.cassandra.service.StorageService; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; - public class Header { private static ICompactSerializer<Header> serializer_; @@ -51,21 +47,21 @@ public class Header private final InetAddress from_; // TODO STAGE can be determined from verb private final StorageService.Verb verb_; - protected final Map<String, byte[]> details_; + protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>(); Header(InetAddress from, StorageService.Verb verb) { - this(from, verb, Collections.<String, byte[]>emptyMap()); - } - - Header(InetAddress from, StorageService.Verb verb, Map<String, byte[]> details) - { assert from != null; assert verb != null; from_ = from; verb_ = verb; - details_ = ImmutableMap.copyOf(details); + } + + Header(InetAddress from, StorageService.Verb verb, Map<String, byte[]> details) + { + this(from, verb); + details_ = details; } InetAddress getFrom() @@ -83,20 +79,14 @@ public class Header return details_.get(key); } - Header withDetailsAdded(String key, byte[] value) + void setDetail(String key, byte[] value) { - Map<String, byte[]> detailsCopy = Maps.newHashMap(details_); - detailsCopy.put(key, value); - return new Header(from_, verb_, detailsCopy); + details_.put(key, value); } - Header withDetailsRemoved(String key) + void removeDetail(String key) { - if (!details_.containsKey(key)) - return this; - Map<String, byte[]> detailsCopy = Maps.newHashMap(details_); - detailsCopy.remove(key); - return new Header(from_, verb_, detailsCopy); + details_.remove(key); } } Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Message.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Message.java?rev=1206235&r1=1206234&r2=1206235&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Message.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Message.java Fri Nov 25 15:54:12 2011 @@ -66,14 +66,14 @@ public class Message return header_.getDetail(key); } - public Message withHeaderAdded(String key, byte[] value) + public void setHeader(String key, byte[] value) { - return new Message(header_.withDetailsAdded(key, value), body_, version); + header_.setDetail(key, value); } - public Message withHeaderRemoved(String key) + public void removeHeader(String key) { - return new Message(header_.withDetailsRemoved(key), body_, version); + header_.removeDetail(key); } public byte[] getMessageBody() Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1206235&r1=1206234&r2=1206235&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java Fri Nov 25 15:54:12 2011 @@ -273,7 +273,7 @@ public class StorageProxy implements Sto { if (!target.equals(destination)) { - hintedMessage = addHintHeader(hintedMessage, target); + addHintHeader(hintedMessage, target); if (logger.isDebugEnabled()) logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination + " for " + target); } @@ -304,7 +304,7 @@ public class StorageProxy implements Sto Message message = messages.getKey(); // a single message object is used for unhinted writes, so clean out any forwards // from previous loop iterations - message = message.withHeaderRemoved(RowMutation.FORWARD_HEADER); + message.removeHeader(RowMutation.FORWARD_HEADER); if (dataCenter.equals(localDataCenter)) { @@ -318,14 +318,21 @@ public class StorageProxy implements Sto Iterator<InetAddress> iter = messages.getValue().iterator(); InetAddress target = iter.next(); // Add all the other destinations of the same message as a header in the primary message. - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); while (iter.hasNext()) { InetAddress destination = iter.next(); + // group all nodes in this DC as forward headers on the primary message + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + + // append to older addresses + byte[] previousHints = message.getHeader(RowMutation.FORWARD_HEADER); + if (previousHints != null) + dos.write(previousHints); + dos.write(destination.getAddress()); + message.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray()); } - message = message.withHeaderAdded(RowMutation.FORWARD_HEADER, bos.toByteArray()); // send the combined message + forward headers MessagingService.instance().sendRR(message, target, handler); } @@ -333,7 +340,7 @@ public class StorageProxy implements Sto } } - private static Message addHintHeader(Message message, InetAddress target) throws IOException + private static void addHintHeader(Message message, InetAddress target) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); @@ -343,7 +350,7 @@ public class StorageProxy implements Sto dos.write(previousHints); } ByteBufferUtil.writeWithShortLength(ByteBufferUtil.bytes(target.getHostAddress()), dos); - return message.withHeaderAdded(RowMutation.HINT, bos.toByteArray()); + message.setHeader(RowMutation.HINT, bos.toByteArray()); } private static void insertLocal(final RowMutation rm, final IWriteResponseHandler responseHandler)