Repository: cassandra
Updated Branches:
  refs/heads/trunk adea05f58 -> fe2d7ddaf


Starting threads in the OutboundTcpConnectionPool constructor causes race 
conditions

patch by sbtourist; reviewed by jasobrown for CASSANDRA-7177


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05bacaea
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05bacaea
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05bacaea

Branch: refs/heads/trunk
Commit: 05bacaeabc96a6d85fbf908dce8474acffcab730
Parents: 2e61cd5
Author: Jason Brown <jasobr...@apple.com>
Authored: Wed May 7 11:58:56 2014 -0700
Committer: Jason Brown <jasobr...@apple.com>
Committed: Wed May 7 11:58:56 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +-
 .../apache/cassandra/net/MessagingService.java  |  6 +--
 .../net/OutboundTcpConnectionPool.java          | 41 +++++++++++++++++---
 3 files changed, 40 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fc192ef..65ee6cf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,6 @@
 2.0.9
  * Warn when 'USING TIMESTAMP' is used on a CAS BATCH (CASSANDRA-7067)
-
+ * Starting threads in OutboundTcpConnectionPool constructor causes race 
conditions (CASSANDRA-7177)
 
 2.0.8
  * Correctly delete scheduled range xfers (CASSANDRA-7143)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index cccf698..dbd76d6 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -498,11 +498,11 @@ public final class MessagingService implements 
MessagingServiceMBean
             cp = new OutboundTcpConnectionPool(to);
             OutboundTcpConnectionPool existingPool = 
connectionManagers.putIfAbsent(to, cp);
             if (existingPool != null)
-            {
-                cp.close();
                 cp = existingPool;
-            }
+            else
+                cp.start();
         }
+        cp.waitForStarted();
         return cp;
     }
     

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacaea/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index 81168c6..c45fc53 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -22,6 +22,8 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.channels.SocketChannel;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.Config;
@@ -36,6 +38,7 @@ public class OutboundTcpConnectionPool
 {
     // pointer for the real Address.
     private final InetAddress id;
+    private final CountDownLatch started;
     public final OutboundTcpConnection cmdCon;
     public final OutboundTcpConnection ackCon;
     // pointer to the reseted Address.
@@ -46,13 +49,10 @@ public class OutboundTcpConnectionPool
     {
         id = remoteEp;
         resetedEndpoint = SystemKeyspace.getPreferredIP(remoteEp);
+        started = new CountDownLatch(1);
 
         cmdCon = new OutboundTcpConnection(this);
-        cmdCon.start();
         ackCon = new OutboundTcpConnection(this);
-        ackCon.start();
-
-        metrics = new ConnectionMetrics(id, this);
     }
 
     /**
@@ -167,14 +167,45 @@ public class OutboundTcpConnectionPool
         }
         return true;
     }
+    
+    public void start()
+    {
+        cmdCon.start();
+        ackCon.start();
+
+        metrics = new ConnectionMetrics(id, this);
+        
+        started.countDown();
+    }
+    
+    public void waitForStarted()
+    {
+        if (started.getCount() == 0)
+            return;
+
+        boolean error = false;
+        try
+        {
+            if (!started.await(1, TimeUnit.MINUTES))
+                error = true;
+        }
+        catch (InterruptedException e)
+        {
+            Thread.currentThread().interrupt();
+            error = true;
+        }
+        if (error)
+            throw new IllegalStateException(String.format("Connections to %s 
are not started!", id.getHostAddress()));
+    }
 
-   public void close()
+    public void close()
     {
         // these null guards are simply for tests
         if (ackCon != null)
             ackCon.closeSocket(true);
         if (cmdCon != null)
             cmdCon.closeSocket(true);
+        
         metrics.release();
     }
 }

Reply via email to