Author: slebresne Date: Fri Nov 25 09:41:46 2011 New Revision: 1206097 URL: http://svn.apache.org/viewvc?rev=1206097&view=rev Log: merge from 1.0
Modified: cassandra/trunk/ (props changed) cassandra/trunk/CHANGES.txt cassandra/trunk/contrib/ (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed) cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java cassandra/trunk/src/java/org/apache/cassandra/net/Header.java cassandra/trunk/src/java/org/apache/cassandra/net/Message.java cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Propchange: cassandra/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Nov 25 09:41:46 2011 @@ -4,7 +4,7 @@ /cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1198724,1198726-1205453 /cassandra/branches/cassandra-0.8.0:1125021-1130369 /cassandra/branches/cassandra-0.8.1:1101014-1125018 -/cassandra/branches/cassandra-1.0:1167085-1205978,1206088 +/cassandra/branches/cassandra-1.0:1167085-1205978,1206088,1206095 /cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020 Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1206097&r1=1206096&r2=1206097&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Fri Nov 25 09:41:46 2011 @@ -24,6 +24,7 @@ * skip --debug requirement to see common exceptions in CLI (CASSANDRA-3508) * fix incorrect query results due to invalid max timestamp (CASSANDRA-3510) * make sstableloader recognize compressed sstables (CASSANDRA-3521) + * avoids race in OutboundTcpConnection in multi-DC setups (CASSANDRA-3530) Merged from 0.8: * fix concurrence issue in the FailureDetector (CASSANDRA-3519) * fix array out of bounds error in counter shard removal (CASSANDRA-3514) Propchange: cassandra/trunk/contrib/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Nov 25 09:41:46 2011 @@ -4,7 +4,7 @@ /cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1198724,1198726-1205453 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018 -/cassandra/branches/cassandra-1.0/contrib:1167085-1205978,1206088 +/cassandra/branches/cassandra-1.0/contrib:1167085-1205978,1206088,1206095 /cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Nov 25 09:41:46 2011 @@ -4,7 +4,7 @@ /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1198724,1198726-1205453 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018 -/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1205978,1206088 +/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1205978,1206088,1206095 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Nov 25 09:41:46 2011 @@ -4,7 +4,7 @@ /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1198724,1198726-1205453 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018 -/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1205978,1206088 +/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1205978,1206088,1206095 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Nov 25 09:41:46 2011 @@ -4,7 +4,7 @@ /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1198724,1198726-1205453 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018 -/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1205978,1206088 +/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1205978,1206088,1206095 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Nov 25 09:41:46 2011 @@ -4,7 +4,7 @@ /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1198724,1198726-1205453 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018 -/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1205978,1206088 +/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1205978,1206088,1206095 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Nov 25 09:41:46 2011 @@ -4,7 +4,7 @@ /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1198724,1198726-1205453 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018 -/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1205978,1206088 +/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1205978,1206088,1206095 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020 Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1206097&r1=1206096&r2=1206097&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Fri Nov 25 09:41:46 2011 @@ -18,20 +18,16 @@ package org.apache.cassandra.db; -import java.io.DataInputStream; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; -import java.nio.ByteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -69,7 +65,7 @@ public class RowMutationVerbHandler impl private void forwardToLocalNodes(Message message, byte[] forwardBytes) throws UnknownHostException { // remove fwds from message to avoid infinite loop - message.removeHeader(RowMutation.FORWARD_HEADER); + Message messageCopy = message.withHeaderRemoved(RowMutation.FORWARD_HEADER); int bytesPerInetAddress = FBUtilities.getBroadcastAddress().getAddress().length; assert forwardBytes.length >= bytesPerInetAddress; @@ -89,7 +85,7 @@ public class RowMutationVerbHandler impl // Send the original message to the address specified by the FORWARD_HINT // Let the response go back to the coordinator - MessagingService.instance().sendOneWay(message, address); + MessagingService.instance().sendOneWay(messageCopy, address); offset += bytesPerInetAddress; } Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Header.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=1206097&r1=1206096&r2=1206097&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Fri Nov 25 09:41:46 2011 @@ -20,6 +20,7 @@ package org.apache.cassandra.net; import java.io.*; import java.net.InetAddress; +import java.util.Collections; import java.util.Hashtable; import java.util.Map; @@ -27,6 +28,9 @@ import org.apache.cassandra.io.IVersione import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; + public class Header { private static IVersionedSerializer<Header> serializer_; @@ -46,21 +50,21 @@ public class Header // and RowMutationVerbHandler.forwardToLocalNodes) private final InetAddress from_; private final StorageService.Verb verb_; - protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>(); + protected final Map<String, byte[]> details_; Header(InetAddress from, StorageService.Verb verb) { + this(from, verb, Collections.<String, byte[]>emptyMap()); + } + + Header(InetAddress from, StorageService.Verb verb, Map<String, byte[]> details) + { assert from != null; assert verb != null; from_ = from; verb_ = verb; - } - - Header(InetAddress from, StorageService.Verb verb, Map<String, byte[]> details) - { - this(from, verb); - details_ = details; + details_ = ImmutableMap.copyOf(details); } InetAddress getFrom() @@ -78,14 +82,20 @@ public class Header return details_.get(key); } - void setDetail(String key, byte[] value) + Header withDetailsAdded(String key, byte[] value) { - details_.put(key, value); + Map<String, byte[]> detailsCopy = Maps.newHashMap(details_); + detailsCopy.put(key, value); + return new Header(from_, verb_, detailsCopy); } - void removeDetail(String key) + Header withDetailsRemoved(String key) { - details_.remove(key); + if (!details_.containsKey(key)) + return this; + Map<String, byte[]> detailsCopy = Maps.newHashMap(details_); + detailsCopy.remove(key); + return new Header(from_, verb_, detailsCopy); } public int serializedSize() Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1206097&r1=1206096&r2=1206097&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Fri Nov 25 09:41:46 2011 @@ -50,14 +50,14 @@ public class Message return header_.getDetail(key); } - public void setHeader(String key, byte[] value) + public Message withHeaderAdded(String key, byte[] value) { - header_.setDetail(key, value); + return new Message(header_.withDetailsAdded(key, value), body_, version); } - public void removeHeader(String key) + public Message withHeaderRemoved(String key) { - header_.removeDetail(key); + return new Message(header_.withDetailsRemoved(key), body_, version); } public byte[] getMessageBody() Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1206097&r1=1206096&r2=1206097&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Fri Nov 25 09:41:46 2011 @@ -151,9 +151,12 @@ public class OutboundTcpConnection exten out.flush(); } } - catch (IOException e) + catch (Exception e) { - if (logger.isDebugEnabled()) + // Non IO exceptions is likely a programming error so let's not silence it + if (!(e instanceof IOException)) + logger.error("error writing to " + poolReference.endPoint(), e); + else if (logger.isDebugEnabled()) logger.debug("error writing to " + poolReference.endPoint(), e); disconnect(); } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1206097&r1=1206096&r2=1206097&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Fri Nov 25 09:41:46 2011 @@ -397,7 +397,7 @@ public class StorageProxy implements Sto Message message = messages.getKey(); // a single message object is used for unhinted writes, so clean out any forwards // from previous loop iterations - message.removeHeader(RowMutation.FORWARD_HEADER); + message = message.withHeaderRemoved(RowMutation.FORWARD_HEADER); if (dataCenter.equals(localDataCenter)) { @@ -411,21 +411,14 @@ public class StorageProxy implements Sto Iterator<InetAddress> iter = messages.getValue().iterator(); InetAddress target = iter.next(); // Add all the other destinations of the same message as a header in the primary message. + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); while (iter.hasNext()) { InetAddress destination = iter.next(); - // group all nodes in this DC as forward headers on the primary message - FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - - // append to older addresses - byte[] previousHints = message.getHeader(RowMutation.FORWARD_HEADER); - if (previousHints != null) - dos.write(previousHints); - dos.write(destination.getAddress()); - message.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray()); } + message = message.withHeaderAdded(RowMutation.FORWARD_HEADER, bos.toByteArray()); // send the combined message + forward headers MessagingService.instance().sendRR(message, target, handler); }