[2/4] git commit: Starting threads in the OutboundTcpConnectionPool constructor causes race conditions
Starting threads in the OutboundTcpConnectionPool constructor causes race conditions patch by sbtourist; reviewed by jasobrown for CASSANDRA-7177 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05bacaea Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05bacaea Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05bacaea Branch: refs/heads/cassandra-2.1 Commit: 05bacaeabc96a6d85fbf908dce8474acffcab730 Parents: 2e61cd5 Author: Jason Brown Authored: Wed May 7 11:58:56 2014 -0700 Committer: Jason Brown Committed: Wed May 7 11:58:56 2014 -0700 -- CHANGES.txt | 2 +- .../apache/cassandra/net/MessagingService.java | 6 +-- .../net/OutboundTcpConnectionPool.java | 41 +--- 3 files changed, 40 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/CHANGES.txt -- diff --git a/CHANGES.txt b/CHANGES.txt index fc192ef..65ee6cf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,6 @@ 2.0.9 * Warn when 'USING TIMESTAMP' is used on a CAS BATCH (CASSANDRA-7067) - + * Starting threads in OutboundTcpConnectionPool constructor causes race conditions (CASSANDRA-7177) 2.0.8 * Correctly delete scheduled range xfers (CASSANDRA-7143) http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/src/java/org/apache/cassandra/net/MessagingService.java -- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index cccf698..dbd76d6 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -498,11 +498,11 @@ public final class MessagingService implements MessagingServiceMBean cp = new OutboundTcpConnectionPool(to); OutboundTcpConnectionPool existingPool = connectionManagers.putIfAbsent(to, cp); if (existingPool != null) -{ -cp.close(); cp = existingPool; -} +else +cp.start(); } +cp.waitForStarted(); return cp; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java -- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java index 81168c6..c45fc53 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java @@ -22,6 +22,8 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.SocketChannel; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.Config; @@ -36,6 +38,7 @@ public class OutboundTcpConnectionPool { // pointer for the real Address. private final InetAddress id; +private final CountDownLatch started; public final OutboundTcpConnection cmdCon; public final OutboundTcpConnection ackCon; // pointer to the reseted Address. @@ -46,13 +49,10 @@ public class OutboundTcpConnectionPool { id = remoteEp; resetedEndpoint = SystemKeyspace.getPreferredIP(remoteEp); +started = new CountDownLatch(1); cmdCon = new OutboundTcpConnection(this); -cmdCon.start(); ackCon = new OutboundTcpConnection(this); -ackCon.start(); - -metrics = new ConnectionMetrics(id, this); } /** @@ -167,14 +167,45 @@ public class OutboundTcpConnectionPool } return true; } + +public void start() +{ +cmdCon.start(); +ackCon.start(); + +metrics = new ConnectionMetrics(id, this); + +started.countDown(); +} + +public void waitForStarted() +{ +if (started.getCount() == 0) +return; + +boolean error = false; +try +{ +if (!started.await(1, TimeUnit.MINUTES)) +error = true; +} +catch (InterruptedException e) +{ +Thread.currentThread().interrupt(); +error = true; +} +if (error) +throw new IllegalStateException(String.format("Connections to %s are not started!", id.getHostAddress())); +} - public void close() +public void close() {
git commit: Starting threads in the OutboundTcpConnectionPool constructor causes race conditions
Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 2e61cd5e0 -> 05bacaeab Starting threads in the OutboundTcpConnectionPool constructor causes race conditions patch by sbtourist; reviewed by jasobrown for CASSANDRA-7177 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05bacaea Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05bacaea Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05bacaea Branch: refs/heads/cassandra-2.0 Commit: 05bacaeabc96a6d85fbf908dce8474acffcab730 Parents: 2e61cd5 Author: Jason Brown Authored: Wed May 7 11:58:56 2014 -0700 Committer: Jason Brown Committed: Wed May 7 11:58:56 2014 -0700 -- CHANGES.txt | 2 +- .../apache/cassandra/net/MessagingService.java | 6 +-- .../net/OutboundTcpConnectionPool.java | 41 +--- 3 files changed, 40 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/CHANGES.txt -- diff --git a/CHANGES.txt b/CHANGES.txt index fc192ef..65ee6cf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,6 @@ 2.0.9 * Warn when 'USING TIMESTAMP' is used on a CAS BATCH (CASSANDRA-7067) - + * Starting threads in OutboundTcpConnectionPool constructor causes race conditions (CASSANDRA-7177) 2.0.8 * Correctly delete scheduled range xfers (CASSANDRA-7143) http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/src/java/org/apache/cassandra/net/MessagingService.java -- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index cccf698..dbd76d6 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -498,11 +498,11 @@ public final class MessagingService implements MessagingServiceMBean cp = new OutboundTcpConnectionPool(to); OutboundTcpConnectionPool existingPool = connectionManagers.putIfAbsent(to, cp); if (existingPool != null) -{ -cp.close(); cp = existingPool; -} +else +cp.start(); } +cp.waitForStarted(); return cp; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java -- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java index 81168c6..c45fc53 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java @@ -22,6 +22,8 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.SocketChannel; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.Config; @@ -36,6 +38,7 @@ public class OutboundTcpConnectionPool { // pointer for the real Address. private final InetAddress id; +private final CountDownLatch started; public final OutboundTcpConnection cmdCon; public final OutboundTcpConnection ackCon; // pointer to the reseted Address. @@ -46,13 +49,10 @@ public class OutboundTcpConnectionPool { id = remoteEp; resetedEndpoint = SystemKeyspace.getPreferredIP(remoteEp); +started = new CountDownLatch(1); cmdCon = new OutboundTcpConnection(this); -cmdCon.start(); ackCon = new OutboundTcpConnection(this); -ackCon.start(); - -metrics = new ConnectionMetrics(id, this); } /** @@ -167,14 +167,45 @@ public class OutboundTcpConnectionPool } return true; } + +public void start() +{ +cmdCon.start(); +ackCon.start(); + +metrics = new ConnectionMetrics(id, this); + +started.countDown(); +} + +public void waitForStarted() +{ +if (started.getCount() == 0) +return; + +boolean error = false; +try +{ +if (!started.await(1, TimeUnit.MINUTES)) +error = true; +} +catch (InterruptedException e) +{ +Thread.currentThread().interrupt(); +error = true; +} +if (error) +throw new IllegalStateException(String.format("Connections to %s are not starte
[1/4] git commit: Starting threads in the OutboundTcpConnectionPool constructor causes race conditions
Repository: cassandra Updated Branches: refs/heads/trunk adea05f58 -> fe2d7ddaf Starting threads in the OutboundTcpConnectionPool constructor causes race conditions patch by sbtourist; reviewed by jasobrown for CASSANDRA-7177 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05bacaea Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05bacaea Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05bacaea Branch: refs/heads/trunk Commit: 05bacaeabc96a6d85fbf908dce8474acffcab730 Parents: 2e61cd5 Author: Jason Brown Authored: Wed May 7 11:58:56 2014 -0700 Committer: Jason Brown Committed: Wed May 7 11:58:56 2014 -0700 -- CHANGES.txt | 2 +- .../apache/cassandra/net/MessagingService.java | 6 +-- .../net/OutboundTcpConnectionPool.java | 41 +--- 3 files changed, 40 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/CHANGES.txt -- diff --git a/CHANGES.txt b/CHANGES.txt index fc192ef..65ee6cf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,6 @@ 2.0.9 * Warn when 'USING TIMESTAMP' is used on a CAS BATCH (CASSANDRA-7067) - + * Starting threads in OutboundTcpConnectionPool constructor causes race conditions (CASSANDRA-7177) 2.0.8 * Correctly delete scheduled range xfers (CASSANDRA-7143) http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/src/java/org/apache/cassandra/net/MessagingService.java -- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index cccf698..dbd76d6 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -498,11 +498,11 @@ public final class MessagingService implements MessagingServiceMBean cp = new OutboundTcpConnectionPool(to); OutboundTcpConnectionPool existingPool = connectionManagers.putIfAbsent(to, cp); if (existingPool != null) -{ -cp.close(); cp = existingPool; -} +else +cp.start(); } +cp.waitForStarted(); return cp; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java -- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java index 81168c6..c45fc53 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java @@ -22,6 +22,8 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.SocketChannel; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.Config; @@ -36,6 +38,7 @@ public class OutboundTcpConnectionPool { // pointer for the real Address. private final InetAddress id; +private final CountDownLatch started; public final OutboundTcpConnection cmdCon; public final OutboundTcpConnection ackCon; // pointer to the reseted Address. @@ -46,13 +49,10 @@ public class OutboundTcpConnectionPool { id = remoteEp; resetedEndpoint = SystemKeyspace.getPreferredIP(remoteEp); +started = new CountDownLatch(1); cmdCon = new OutboundTcpConnection(this); -cmdCon.start(); ackCon = new OutboundTcpConnection(this); -ackCon.start(); - -metrics = new ConnectionMetrics(id, this); } /** @@ -167,14 +167,45 @@ public class OutboundTcpConnectionPool } return true; } + +public void start() +{ +cmdCon.start(); +ackCon.start(); + +metrics = new ConnectionMetrics(id, this); + +started.countDown(); +} + +public void waitForStarted() +{ +if (started.getCount() == 0) +return; + +boolean error = false; +try +{ +if (!started.await(1, TimeUnit.MINUTES)) +error = true; +} +catch (InterruptedException e) +{ +Thread.currentThread().interrupt(); +error = true; +} +if (error) +throw new IllegalStateException(String.format("Connections to %s are not started!", id.getHostA