CASSANDRA-4099-reopened patch by Vijay; reviewed by Brandon Williams for CASSANDRA-4098
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cec4253b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cec4253b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cec4253b Branch: refs/heads/trunk Commit: cec4253ba83be62dff17894442dbd1bb0690aaed Parents: 4a65311 Author: Vijay Parthasarathy <vijay2...@gmail.com> Authored: Fri Mar 30 11:42:10 2012 -0700 Committer: Vijay Parthasarathy <vijay2...@gmail.com> Committed: Fri Mar 30 11:42:10 2012 -0700 ---------------------------------------------------------------------- .../cassandra/net/IncomingTcpConnection.java | 21 +++++++-------- 1 files changed, 10 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cec4253b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index 47ab39a..a9a5809 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -92,19 +92,16 @@ public class IncomingTcpConnection extends Thread // we should buffer input = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096)); // Receive the first message to set the version. - Message msg = receiveMessage(input, version); - from = msg.getFrom(); // why? see => CASSANDRA-4099 + from = receiveMessage(input, version); // why? see => CASSANDRA-4099 if (version > MessagingService.version_) { // save the endpoint so gossip will reconnect to it Gossiper.instance.addSavedEndpoint(from); logger.info("Received " + (isStream ? "streaming " : "") + "connection from newer protocol version. Ignoring"); + return; } - else if (msg != null) - { - Gossiper.instance.setVersion(from, version); - logger.debug("set version for {} to {}", from, version); - } + Gossiper.instance.setVersion(from, version); + logger.debug("set version for {} to {}", from, version); // loop to get the next message. while (true) @@ -133,7 +130,7 @@ public class IncomingTcpConnection extends Thread } } - private Message receiveMessage(DataInputStream input, int version) throws IOException + private InetAddress receiveMessage(DataInputStream input, int version) throws IOException { int totalSize = input.readInt(); String id = input.readUTF(); @@ -158,10 +155,12 @@ public class IncomingTcpConnection extends Thread { Message message = new Message(header, body, version); MessagingService.instance().receive(message, id); - return message; } - logger.debug("Received connection from newer protocol version {}. Ignoring message", version); - return null; + else + { + logger.debug("Received connection from newer protocol version {}. Ignoring message", version); + } + return header.getFrom(); } private void close()