Avoid race on receiver by starting streaming sender thread after sending init message
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12886 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06feaefb Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06feaefb Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06feaefb Branch: refs/heads/trunk Commit: 06feaefba50301734c490521d720c8a482f638e4 Parents: 9bbb449 Author: Paulo Motta <pauloricard...@gmail.com> Authored: Wed Mar 1 20:25:32 2017 -0300 Committer: Paulo Motta <pa...@apache.org> Committed: Wed Mar 1 20:30:30 2017 -0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/ConnectionHandler.java | 26 +++++++++++--------- 2 files changed, 15 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 404440a..ca1aa27 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.10 + * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886) * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232) * Coalescing strategy sleeps too much (CASSANDRA-13090) * Make sure compaction stats are updated when compaction is interrupted (Backport from 3.0, CASSANDRA-12100) http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index d3d8ed2..fe551a8 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -82,13 +82,11 @@ public class ConnectionHandler { logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId()); Socket incomingSocket = session.createConnection(); - incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION); - incoming.sendInitMessage(incomingSocket, true); + incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION, true); logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId()); Socket outgoingSocket = session.createConnection(); - outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION); - outgoing.sendInitMessage(outgoingSocket, false); + outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION, true); } /** @@ -159,13 +157,15 @@ public class ConnectionHandler protected int protocolVersion; protected Socket socket; + private final boolean isOutgoingHandler; private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>(); private IncomingStreamingConnection incomingConnection; - protected MessageHandler(StreamSession session) + protected MessageHandler(StreamSession session, boolean isOutgoingHandler) { this.session = session; + this.isOutgoingHandler = isOutgoingHandler; } protected abstract String name(); @@ -187,14 +187,14 @@ public class ConnectionHandler } @SuppressWarnings("resource") - public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException + public void sendInitMessage() throws IOException { StreamInitMessage message = new StreamInitMessage( FBUtilities.getBroadcastAddress(), session.sessionIndex(), session.planId(), session.description(), - isForOutgoing, + !isOutgoingHandler, session.keepSSTableLevel(), session.isIncremental()); ByteBuffer messageBuf = message.createMessage(false, protocolVersion); @@ -203,16 +203,18 @@ public class ConnectionHandler out.flush(); } - public void start(IncomingStreamingConnection connection, int protocolVersion) + public void start(IncomingStreamingConnection connection, int protocolVersion) throws IOException { this.incomingConnection = connection; - start(connection.socket, protocolVersion); + start(connection.socket, protocolVersion, false); } - public void start(Socket socket, int protocolVersion) + public void start(Socket socket, int protocolVersion, boolean initiator) throws IOException { this.socket = socket; this.protocolVersion = protocolVersion; + if (initiator) + sendInitMessage(); new Thread(this, name() + "-" + session.peer).start(); } @@ -270,7 +272,7 @@ public class ConnectionHandler { IncomingMessageHandler(StreamSession session) { - super(session); + super(session, false); } protected String name() @@ -330,7 +332,7 @@ public class ConnectionHandler OutgoingMessageHandler(StreamSession session) { - super(session); + super(session, true); } protected String name()