Repository: cassandra Updated Branches: refs/heads/trunk a8be43e45 -> 4af23348e
Remove StreamCoordinator.streamExecutor thread pool patch by jasobrown; reviewed by Dinesh Joshi for CASANDRA-14402 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4af23348 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4af23348 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4af23348 Branch: refs/heads/trunk Commit: 4af23348ecd6fc8dbef44ac5ebdb6ae60d599283 Parents: a8be43e Author: Jason Brown <jasedbr...@gmail.com> Authored: Thu Apr 19 05:33:34 2018 -0700 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Fri Apr 20 05:05:51 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/StreamCoordinator.java | 30 ++++---------------- 2 files changed, 7 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4af23348/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7d9769c..5902305 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Remove StreamCoordinator.streamExecutor thread pool (CASSANDRA-14402) * Rename nodetool --with-port to --print-port to disambiguate from --port (CASSANDRA-14392) * Client TOPOLOGY_CHANGE messages have wrong port. (CASSANDRA-14398) * Add ability to load new SSTables from a separate directory (CASSANDRA-6719) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4af23348/src/java/org/apache/cassandra/streaming/StreamCoordinator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java index 139488d..6b92dfe 100644 --- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java +++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java @@ -22,9 +22,7 @@ import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.utils.FBUtilities; /** * {@link StreamCoordinator} is a helper class that abstracts away maintaining multiple @@ -37,15 +35,9 @@ public class StreamCoordinator { private static final Logger logger = LoggerFactory.getLogger(StreamCoordinator.class); - /** - * Executor strictly for establishing the initial connections. Once we're connected to the other end the rest of the - * streaming is handled directly by the {@link StreamingMessageSender}'s incoming and outgoing threads. - */ - private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher", - FBUtilities.getAvailableProcessors()); private final boolean connectSequentially; - private Map<InetAddressAndPort, HostStreamingData> peerSessions = new HashMap<>(); + private final Map<InetAddressAndPort, HostStreamingData> peerSessions = new HashMap<>(); private final StreamOperation streamOperation; private final int connectionsPerHost; private StreamConnectionFactory factory; @@ -144,7 +136,7 @@ public class StreamCoordinator { StreamSession next = sessionsToConnect.next(); logger.debug("Connecting next session {} with {}.", next.planId(), next.peer.toString()); - streamExecutor.execute(new StreamSessionConnector(next)); + startSession(next); } else logger.debug("Finished connecting all sessions"); @@ -259,20 +251,10 @@ public class StreamCoordinator return pendingRepair; } - private static class StreamSessionConnector implements Runnable + private void startSession(StreamSession session) { - private final StreamSession session; - public StreamSessionConnector(StreamSession session) - { - this.session = session; - } - - @Override - public void run() - { - session.start(); - logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", session.planId(), session.sessionIndex(), session.peer); - } + session.start(); + logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", session.planId(), session.sessionIndex(), session.peer); } private class HostStreamingData @@ -316,7 +298,7 @@ public class StreamCoordinator { for (StreamSession session : streamSessions.values()) { - streamExecutor.execute(new StreamSessionConnector(session)); + startSession(session); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org