merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7d8a56df Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7d8a56df Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7d8a56df Branch: refs/heads/trunk Commit: 7d8a56df3df26be7537f2a5158469629c34b911c Parents: 71c8912 92b3622 Author: Jonathan Ellis <jbel...@apache.org> Authored: Thu Oct 3 12:12:11 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Thu Oct 3 12:12:11 2013 -0500 ---------------------------------------------------------------------- src/java/org/apache/cassandra/service/StorageProxy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d8a56df/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index 6c8e636,9b559e5..d16bef9 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -963,29 -637,35 +963,29 @@@ public class StorageProxy implements St Iterator<InetAddress> iter = targets.iterator(); InetAddress target = iter.next(); - // direct writes to local DC or old Cassandra versions - if (localDC || MessagingService.instance().getVersion(target) < MessagingService.VERSION_11) + // Add the other destinations of the same message as a FORWARD_HEADER entry + DataOutputBuffer out = new DataOutputBuffer(); + try { - // yes, the loop and non-loop code here are the same; this is clunky but we want to avoid - // creating a second iterator since we already have a perfectly good one - MessagingService.instance().sendRR(message, target, handler, message.getTimeout(), handler.consistencyLevel); + out.writeInt(targets.size() - 1); while (iter.hasNext()) { - target = iter.next(); - MessagingService.instance().sendRR(message, target, handler); + InetAddress destination = iter.next(); + CompactEndpointSerializationHelper.serialize(destination, out); - int id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout()); ++ int id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout(), handler.consistencyLevel); + out.writeInt(id); + logger.trace("Adding FWD message to {}@{}", id, destination); } - return; + message = message.withParameter(RowMutation.FORWARD_TO, out.getData()); + // send the combined message + forward headers + int id = MessagingService.instance().sendRR(message, target, handler); + logger.trace("Sending message to {}@{}", id, target); } - - // Add all the other destinations of the same message as a FORWARD_HEADER entry - FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - dos.writeInt(targets.size() - 1); - while (iter.hasNext()) + catch (IOException e) { - InetAddress destination = iter.next(); - CompactEndpointSerializationHelper.serialize(destination, dos); - String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout()); - dos.writeUTF(id); + // DataOutputBuffer is in-memory, doesn't throw IOException + throw new AssertionError(e); } - message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray()); - // send the combined message + forward headers - Tracing.trace("Enqueuing message to {}", target); - MessagingService.instance().sendRR(message, target, handler); } private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)