[2/4] git commit: Starting threads in the OutboundTcpConnectionPool constructor causes race conditions

2014-05-16 Thread jake
Starting threads in the OutboundTcpConnectionPool constructor causes race 
conditions

patch by sbtourist; reviewed by jasobrown for CASSANDRA-7177


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05bacaea
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05bacaea
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05bacaea

Branch: refs/heads/cassandra-2.1
Commit: 05bacaeabc96a6d85fbf908dce8474acffcab730
Parents: 2e61cd5
Author: Jason Brown 
Authored: Wed May 7 11:58:56 2014 -0700
Committer: Jason Brown 
Committed: Wed May 7 11:58:56 2014 -0700

--
 CHANGES.txt |  2 +-
 .../apache/cassandra/net/MessagingService.java  |  6 +--
 .../net/OutboundTcpConnectionPool.java  | 41 +---
 3 files changed, 40 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/CHANGES.txt
--
diff --git a/CHANGES.txt b/CHANGES.txt
index fc192ef..65ee6cf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,6 @@
 2.0.9
  * Warn when 'USING TIMESTAMP' is used on a CAS BATCH (CASSANDRA-7067)
-
+ * Starting threads in OutboundTcpConnectionPool constructor causes race 
conditions (CASSANDRA-7177)
 
 2.0.8
  * Correctly delete scheduled range xfers (CASSANDRA-7143)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/src/java/org/apache/cassandra/net/MessagingService.java
--
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index cccf698..dbd76d6 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -498,11 +498,11 @@ public final class MessagingService implements 
MessagingServiceMBean
 cp = new OutboundTcpConnectionPool(to);
 OutboundTcpConnectionPool existingPool = 
connectionManagers.putIfAbsent(to, cp);
 if (existingPool != null)
-{
-cp.close();
 cp = existingPool;
-}
+else
+cp.start();
 }
+cp.waitForStarted();
 return cp;
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
--
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index 81168c6..c45fc53 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -22,6 +22,8 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.channels.SocketChannel;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.Config;
@@ -36,6 +38,7 @@ public class OutboundTcpConnectionPool
 {
 // pointer for the real Address.
 private final InetAddress id;
+private final CountDownLatch started;
 public final OutboundTcpConnection cmdCon;
 public final OutboundTcpConnection ackCon;
 // pointer to the reseted Address.
@@ -46,13 +49,10 @@ public class OutboundTcpConnectionPool
 {
 id = remoteEp;
 resetedEndpoint = SystemKeyspace.getPreferredIP(remoteEp);
+started = new CountDownLatch(1);
 
 cmdCon = new OutboundTcpConnection(this);
-cmdCon.start();
 ackCon = new OutboundTcpConnection(this);
-ackCon.start();
-
-metrics = new ConnectionMetrics(id, this);
 }
 
 /**
@@ -167,14 +167,45 @@ public class OutboundTcpConnectionPool
 }
 return true;
 }
+
+public void start()
+{
+cmdCon.start();
+ackCon.start();
+
+metrics = new ConnectionMetrics(id, this);
+
+started.countDown();
+}
+
+public void waitForStarted()
+{
+if (started.getCount() == 0)
+return;
+
+boolean error = false;
+try
+{
+if (!started.await(1, TimeUnit.MINUTES))
+error = true;
+}
+catch (InterruptedException e)
+{
+Thread.currentThread().interrupt();
+error = true;
+}
+if (error)
+throw new IllegalStateException(String.format("Connections to %s 
are not started!", id.getHostAddress()));
+}
 
-   public void close()
+public void close()
 {

git commit: Starting threads in the OutboundTcpConnectionPool constructor causes race conditions

2014-05-15 Thread jasobrown
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 2e61cd5e0 -> 05bacaeab


Starting threads in the OutboundTcpConnectionPool constructor causes race 
conditions

patch by sbtourist; reviewed by jasobrown for CASSANDRA-7177


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05bacaea
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05bacaea
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05bacaea

Branch: refs/heads/cassandra-2.0
Commit: 05bacaeabc96a6d85fbf908dce8474acffcab730
Parents: 2e61cd5
Author: Jason Brown 
Authored: Wed May 7 11:58:56 2014 -0700
Committer: Jason Brown 
Committed: Wed May 7 11:58:56 2014 -0700

--
 CHANGES.txt |  2 +-
 .../apache/cassandra/net/MessagingService.java  |  6 +--
 .../net/OutboundTcpConnectionPool.java  | 41 +---
 3 files changed, 40 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/CHANGES.txt
--
diff --git a/CHANGES.txt b/CHANGES.txt
index fc192ef..65ee6cf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,6 @@
 2.0.9
  * Warn when 'USING TIMESTAMP' is used on a CAS BATCH (CASSANDRA-7067)
-
+ * Starting threads in OutboundTcpConnectionPool constructor causes race 
conditions (CASSANDRA-7177)
 
 2.0.8
  * Correctly delete scheduled range xfers (CASSANDRA-7143)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/src/java/org/apache/cassandra/net/MessagingService.java
--
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index cccf698..dbd76d6 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -498,11 +498,11 @@ public final class MessagingService implements 
MessagingServiceMBean
 cp = new OutboundTcpConnectionPool(to);
 OutboundTcpConnectionPool existingPool = 
connectionManagers.putIfAbsent(to, cp);
 if (existingPool != null)
-{
-cp.close();
 cp = existingPool;
-}
+else
+cp.start();
 }
+cp.waitForStarted();
 return cp;
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
--
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index 81168c6..c45fc53 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -22,6 +22,8 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.channels.SocketChannel;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.Config;
@@ -36,6 +38,7 @@ public class OutboundTcpConnectionPool
 {
 // pointer for the real Address.
 private final InetAddress id;
+private final CountDownLatch started;
 public final OutboundTcpConnection cmdCon;
 public final OutboundTcpConnection ackCon;
 // pointer to the reseted Address.
@@ -46,13 +49,10 @@ public class OutboundTcpConnectionPool
 {
 id = remoteEp;
 resetedEndpoint = SystemKeyspace.getPreferredIP(remoteEp);
+started = new CountDownLatch(1);
 
 cmdCon = new OutboundTcpConnection(this);
-cmdCon.start();
 ackCon = new OutboundTcpConnection(this);
-ackCon.start();
-
-metrics = new ConnectionMetrics(id, this);
 }
 
 /**
@@ -167,14 +167,45 @@ public class OutboundTcpConnectionPool
 }
 return true;
 }
+
+public void start()
+{
+cmdCon.start();
+ackCon.start();
+
+metrics = new ConnectionMetrics(id, this);
+
+started.countDown();
+}
+
+public void waitForStarted()
+{
+if (started.getCount() == 0)
+return;
+
+boolean error = false;
+try
+{
+if (!started.await(1, TimeUnit.MINUTES))
+error = true;
+}
+catch (InterruptedException e)
+{
+Thread.currentThread().interrupt();
+error = true;
+}
+if (error)
+throw new IllegalStateException(String.format("Connections to %s 
are not starte

[1/4] git commit: Starting threads in the OutboundTcpConnectionPool constructor causes race conditions

2014-05-10 Thread jake
Repository: cassandra
Updated Branches:
  refs/heads/trunk adea05f58 -> fe2d7ddaf


Starting threads in the OutboundTcpConnectionPool constructor causes race 
conditions

patch by sbtourist; reviewed by jasobrown for CASSANDRA-7177


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05bacaea
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05bacaea
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05bacaea

Branch: refs/heads/trunk
Commit: 05bacaeabc96a6d85fbf908dce8474acffcab730
Parents: 2e61cd5
Author: Jason Brown 
Authored: Wed May 7 11:58:56 2014 -0700
Committer: Jason Brown 
Committed: Wed May 7 11:58:56 2014 -0700

--
 CHANGES.txt |  2 +-
 .../apache/cassandra/net/MessagingService.java  |  6 +--
 .../net/OutboundTcpConnectionPool.java  | 41 +---
 3 files changed, 40 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/CHANGES.txt
--
diff --git a/CHANGES.txt b/CHANGES.txt
index fc192ef..65ee6cf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,6 @@
 2.0.9
  * Warn when 'USING TIMESTAMP' is used on a CAS BATCH (CASSANDRA-7067)
-
+ * Starting threads in OutboundTcpConnectionPool constructor causes race 
conditions (CASSANDRA-7177)
 
 2.0.8
  * Correctly delete scheduled range xfers (CASSANDRA-7143)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/src/java/org/apache/cassandra/net/MessagingService.java
--
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index cccf698..dbd76d6 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -498,11 +498,11 @@ public final class MessagingService implements 
MessagingServiceMBean
 cp = new OutboundTcpConnectionPool(to);
 OutboundTcpConnectionPool existingPool = 
connectionManagers.putIfAbsent(to, cp);
 if (existingPool != null)
-{
-cp.close();
 cp = existingPool;
-}
+else
+cp.start();
 }
+cp.waitForStarted();
 return cp;
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
--
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index 81168c6..c45fc53 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -22,6 +22,8 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.channels.SocketChannel;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.Config;
@@ -36,6 +38,7 @@ public class OutboundTcpConnectionPool
 {
 // pointer for the real Address.
 private final InetAddress id;
+private final CountDownLatch started;
 public final OutboundTcpConnection cmdCon;
 public final OutboundTcpConnection ackCon;
 // pointer to the reseted Address.
@@ -46,13 +49,10 @@ public class OutboundTcpConnectionPool
 {
 id = remoteEp;
 resetedEndpoint = SystemKeyspace.getPreferredIP(remoteEp);
+started = new CountDownLatch(1);
 
 cmdCon = new OutboundTcpConnection(this);
-cmdCon.start();
 ackCon = new OutboundTcpConnection(this);
-ackCon.start();
-
-metrics = new ConnectionMetrics(id, this);
 }
 
 /**
@@ -167,14 +167,45 @@ public class OutboundTcpConnectionPool
 }
 return true;
 }
+
+public void start()
+{
+cmdCon.start();
+ackCon.start();
+
+metrics = new ConnectionMetrics(id, this);
+
+started.countDown();
+}
+
+public void waitForStarted()
+{
+if (started.getCount() == 0)
+return;
+
+boolean error = false;
+try
+{
+if (!started.await(1, TimeUnit.MINUTES))
+error = true;
+}
+catch (InterruptedException e)
+{
+Thread.currentThread().interrupt();
+error = true;
+}
+if (error)
+throw new IllegalStateException(String.format("Connections to %s 
are not started!", id.getHostA