Repository: cassandra
Updated Branches:
  refs/heads/trunk a8be43e45 -> 4af23348e


Remove StreamCoordinator.streamExecutor thread pool

patch by jasobrown; reviewed by Dinesh Joshi for CASANDRA-14402


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4af23348
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4af23348
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4af23348

Branch: refs/heads/trunk
Commit: 4af23348ecd6fc8dbef44ac5ebdb6ae60d599283
Parents: a8be43e
Author: Jason Brown <jasedbr...@gmail.com>
Authored: Thu Apr 19 05:33:34 2018 -0700
Committer: Jason Brown <jasedbr...@gmail.com>
Committed: Fri Apr 20 05:05:51 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamCoordinator.java  | 30 ++++----------------
 2 files changed, 7 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4af23348/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7d9769c..5902305 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Remove StreamCoordinator.streamExecutor thread pool (CASSANDRA-14402)
  * Rename nodetool --with-port to --print-port to disambiguate from --port 
(CASSANDRA-14392)
  * Client TOPOLOGY_CHANGE messages have wrong port. (CASSANDRA-14398)
  * Add ability to load new SSTables from a separate directory (CASSANDRA-6719)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4af23348/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java 
b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 139488d..6b92dfe 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -22,9 +22,7 @@ import java.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * {@link StreamCoordinator} is a helper class that abstracts away maintaining 
multiple
@@ -37,15 +35,9 @@ public class StreamCoordinator
 {
     private static final Logger logger = 
LoggerFactory.getLogger(StreamCoordinator.class);
 
-    /**
-     * Executor strictly for establishing the initial connections. Once we're 
connected to the other end the rest of the
-     * streaming is handled directly by the {@link StreamingMessageSender}'s 
incoming and outgoing threads.
-     */
-    private static final DebuggableThreadPoolExecutor streamExecutor = 
DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
-                                                                               
                                             
FBUtilities.getAvailableProcessors());
     private final boolean connectSequentially;
 
-    private Map<InetAddressAndPort, HostStreamingData> peerSessions = new 
HashMap<>();
+    private final Map<InetAddressAndPort, HostStreamingData> peerSessions = 
new HashMap<>();
     private final StreamOperation streamOperation;
     private final int connectionsPerHost;
     private StreamConnectionFactory factory;
@@ -144,7 +136,7 @@ public class StreamCoordinator
         {
             StreamSession next = sessionsToConnect.next();
             logger.debug("Connecting next session {} with {}.", next.planId(), 
next.peer.toString());
-            streamExecutor.execute(new StreamSessionConnector(next));
+            startSession(next);
         }
         else
             logger.debug("Finished connecting all sessions");
@@ -259,20 +251,10 @@ public class StreamCoordinator
         return pendingRepair;
     }
 
-    private static class StreamSessionConnector implements Runnable
+    private void startSession(StreamSession session)
     {
-        private final StreamSession session;
-        public StreamSessionConnector(StreamSession session)
-        {
-            this.session = session;
-        }
-
-        @Override
-        public void run()
-        {
-            session.start();
-            logger.info("[Stream #{}, ID#{}] Beginning stream session with 
{}", session.planId(), session.sessionIndex(), session.peer);
-        }
+        session.start();
+        logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", 
session.planId(), session.sessionIndex(), session.peer);
     }
 
     private class HostStreamingData
@@ -316,7 +298,7 @@ public class StreamCoordinator
         {
             for (StreamSession session : streamSessions.values())
             {
-                streamExecutor.execute(new StreamSessionConnector(session));
+                startSession(session);
             }
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to