group stream out ranges Patch by Sam Overton; reviewed by Brandon Williams for CASSANDRA-4122
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8c09e87e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8c09e87e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8c09e87e Branch: refs/heads/cassandra-1.1 Commit: 8c09e87e40020139611088cf836f8f079bffb0ce Parents: 1a3661f Author: Eric Evans <eev...@apache.org> Authored: Wed Jul 18 13:35:53 2012 -0500 Committer: Eric Evans <eev...@apache.org> Committed: Wed Jul 18 13:35:53 2012 -0500 ---------------------------------------------------------------------- .../apache/cassandra/service/StorageService.java | 51 ++++++++++---- 1 files changed, 36 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c09e87e/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 207bf69..3875054 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2984,42 +2984,62 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe */ private CountDownLatch streamRanges(final Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByTable) { - final CountDownLatch latch = new CountDownLatch(rangesToStreamByTable.keySet().size()); + // First, we build a list of ranges to stream to each host, per table + final Map<String, Map<InetAddress, List<Range<Token>>>> sessionsToStreamByTable = new HashMap<String, Map<InetAddress, List<Range<Token>>>>(); + // The number of stream out sessions we need to start, to be built up as we build sessionsToStreamByTable + int sessionCount = 0; + for (Map.Entry<String, Multimap<Range<Token>, InetAddress>> entry : rangesToStreamByTable.entrySet()) { Multimap<Range<Token>, InetAddress> rangesWithEndpoints = entry.getValue(); if (rangesWithEndpoints.isEmpty()) - { - latch.countDown(); continue; - } final String table = entry.getKey(); - final Set<Map.Entry<Range<Token>, InetAddress>> pending = new HashSet<Map.Entry<Range<Token>, InetAddress>>(rangesWithEndpoints.entries()); + Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = new HashMap<InetAddress, List<Range<Token>>>(); for (final Map.Entry<Range<Token>, InetAddress> endPointEntry : rangesWithEndpoints.entries()) { final Range<Token> range = endPointEntry.getKey(); - final InetAddress newEndpoint = endPointEntry.getValue(); + final InetAddress endpoint = endPointEntry.getValue(); + + List<Range<Token>> curRanges = rangesPerEndpoint.get(endpoint); + if (curRanges == null) + { + curRanges = new LinkedList<Range<Token>>(); + rangesPerEndpoint.put(endpoint, curRanges); + } + curRanges.add(range); + } + + sessionCount += rangesPerEndpoint.size(); + sessionsToStreamByTable.put(table, rangesPerEndpoint); + } + + final CountDownLatch latch = new CountDownLatch(sessionCount); + + for (Map.Entry<String, Map<InetAddress, List<Range<Token>>>> entry : sessionsToStreamByTable.entrySet()) + { + final String table = entry.getKey(); + final Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = entry.getValue(); + + for (final Map.Entry<InetAddress, List<Range<Token>>> rangesEntry : rangesPerEndpoint.entrySet()) + { + final List<Range<Token>> ranges = rangesEntry.getValue(); + final InetAddress newEndpoint = rangesEntry.getKey(); final IStreamCallback callback = new IStreamCallback() { public void onSuccess() { - synchronized (pending) - { - pending.remove(endPointEntry); - - if (pending.isEmpty()) - latch.countDown(); - } + latch.countDown(); } public void onFailure() { - logger.warn("Streaming to " + endPointEntry + " failed"); + logger.warn("Streaming to " + newEndpoint + " failed"); onSuccess(); // calling onSuccess for latch countdown } }; @@ -3029,7 +3049,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe public void run() { // TODO each call to transferRanges re-flushes, this is potentially a lot of waste - StreamOut.transferRanges(newEndpoint, Table.open(table), Arrays.asList(range), callback, OperationType.UNBOOTSTRAP); + StreamOut.transferRanges(newEndpoint, Table.open(table), ranges, callback, + OperationType.UNBOOTSTRAP); } }); }