Repository: cassandra Updated Branches: refs/heads/trunk adea05f58 -> fe2d7ddaf
Starting threads in the OutboundTcpConnectionPool constructor causes race conditions patch by sbtourist; reviewed by jasobrown for CASSANDRA-7177 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05bacaea Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05bacaea Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05bacaea Branch: refs/heads/trunk Commit: 05bacaeabc96a6d85fbf908dce8474acffcab730 Parents: 2e61cd5 Author: Jason Brown <jasobr...@apple.com> Authored: Wed May 7 11:58:56 2014 -0700 Committer: Jason Brown <jasobr...@apple.com> Committed: Wed May 7 11:58:56 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../apache/cassandra/net/MessagingService.java | 6 +-- .../net/OutboundTcpConnectionPool.java | 41 +++++++++++++++++--- 3 files changed, 40 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fc192ef..65ee6cf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,6 @@ 2.0.9 * Warn when 'USING TIMESTAMP' is used on a CAS BATCH (CASSANDRA-7067) - + * Starting threads in OutboundTcpConnectionPool constructor causes race conditions (CASSANDRA-7177) 2.0.8 * Correctly delete scheduled range xfers (CASSANDRA-7143) http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index cccf698..dbd76d6 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -498,11 +498,11 @@ public final class MessagingService implements MessagingServiceMBean cp = new OutboundTcpConnectionPool(to); OutboundTcpConnectionPool existingPool = connectionManagers.putIfAbsent(to, cp); if (existingPool != null) - { - cp.close(); cp = existingPool; - } + else + cp.start(); } + cp.waitForStarted(); return cp; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java index 81168c6..c45fc53 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java @@ -22,6 +22,8 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.SocketChannel; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.Config; @@ -36,6 +38,7 @@ public class OutboundTcpConnectionPool { // pointer for the real Address. private final InetAddress id; + private final CountDownLatch started; public final OutboundTcpConnection cmdCon; public final OutboundTcpConnection ackCon; // pointer to the reseted Address. @@ -46,13 +49,10 @@ public class OutboundTcpConnectionPool { id = remoteEp; resetedEndpoint = SystemKeyspace.getPreferredIP(remoteEp); + started = new CountDownLatch(1); cmdCon = new OutboundTcpConnection(this); - cmdCon.start(); ackCon = new OutboundTcpConnection(this); - ackCon.start(); - - metrics = new ConnectionMetrics(id, this); } /** @@ -167,14 +167,45 @@ public class OutboundTcpConnectionPool } return true; } + + public void start() + { + cmdCon.start(); + ackCon.start(); + + metrics = new ConnectionMetrics(id, this); + + started.countDown(); + } + + public void waitForStarted() + { + if (started.getCount() == 0) + return; + + boolean error = false; + try + { + if (!started.await(1, TimeUnit.MINUTES)) + error = true; + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + error = true; + } + if (error) + throw new IllegalStateException(String.format("Connections to %s are not started!", id.getHostAddress())); + } - public void close() + public void close() { // these null guards are simply for tests if (ackCon != null) ackCon.closeSocket(true); if (cmdCon != null) cmdCon.closeSocket(true); + metrics.release(); } }