group stream out ranges

Patch by Sam Overton; reviewed by Brandon Williams for CASSANDRA-4122


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8c09e87e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8c09e87e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8c09e87e

Branch: refs/heads/cassandra-1.1
Commit: 8c09e87e40020139611088cf836f8f079bffb0ce
Parents: 1a3661f
Author: Eric Evans <eev...@apache.org>
Authored: Wed Jul 18 13:35:53 2012 -0500
Committer: Eric Evans <eev...@apache.org>
Committed: Wed Jul 18 13:35:53 2012 -0500

----------------------------------------------------------------------
 .../apache/cassandra/service/StorageService.java   |   51 ++++++++++----
 1 files changed, 36 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c09e87e/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 207bf69..3875054 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2984,42 +2984,62 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
      */
     private CountDownLatch streamRanges(final Map<String, 
Multimap<Range<Token>, InetAddress>> rangesToStreamByTable)
     {
-        final CountDownLatch latch = new 
CountDownLatch(rangesToStreamByTable.keySet().size());
+        // First, we build a list of ranges to stream to each host, per table
+        final Map<String, Map<InetAddress, List<Range<Token>>>> 
sessionsToStreamByTable = new HashMap<String, Map<InetAddress, 
List<Range<Token>>>>();
+        // The number of stream out sessions we need to start, to be built up 
as we build sessionsToStreamByTable
+        int sessionCount = 0;
+
         for (Map.Entry<String, Multimap<Range<Token>, InetAddress>> entry : 
rangesToStreamByTable.entrySet())
         {
             Multimap<Range<Token>, InetAddress> rangesWithEndpoints = 
entry.getValue();
 
             if (rangesWithEndpoints.isEmpty())
-            {
-                latch.countDown();
                 continue;
-            }
 
             final String table = entry.getKey();
 
-            final Set<Map.Entry<Range<Token>, InetAddress>> pending = new 
HashSet<Map.Entry<Range<Token>, InetAddress>>(rangesWithEndpoints.entries());
+            Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = new 
HashMap<InetAddress, List<Range<Token>>>();
 
             for (final Map.Entry<Range<Token>, InetAddress> endPointEntry : 
rangesWithEndpoints.entries())
             {
                 final Range<Token> range = endPointEntry.getKey();
-                final InetAddress newEndpoint = endPointEntry.getValue();
+                final InetAddress endpoint = endPointEntry.getValue();
+
+                List<Range<Token>> curRanges = rangesPerEndpoint.get(endpoint);
+                if (curRanges == null)
+                {
+                    curRanges = new LinkedList<Range<Token>>();
+                    rangesPerEndpoint.put(endpoint, curRanges);
+                }
+                curRanges.add(range);
+            }
+
+            sessionCount += rangesPerEndpoint.size();
+            sessionsToStreamByTable.put(table, rangesPerEndpoint);
+        }
+
+        final CountDownLatch latch = new CountDownLatch(sessionCount);
+
+        for (Map.Entry<String, Map<InetAddress, List<Range<Token>>>> entry : 
sessionsToStreamByTable.entrySet())
+        {
+            final String table = entry.getKey();
+            final Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = 
entry.getValue();
+
+            for (final Map.Entry<InetAddress, List<Range<Token>>> rangesEntry 
: rangesPerEndpoint.entrySet())
+            {
+                final List<Range<Token>> ranges = rangesEntry.getValue();
+                final InetAddress newEndpoint = rangesEntry.getKey();
 
                 final IStreamCallback callback = new IStreamCallback()
                 {
                     public void onSuccess()
                     {
-                        synchronized (pending)
-                        {
-                            pending.remove(endPointEntry);
-
-                            if (pending.isEmpty())
-                                latch.countDown();
-                        }
+                        latch.countDown();
                     }
 
                     public void onFailure()
                     {
-                        logger.warn("Streaming to " + endPointEntry + " 
failed");
+                        logger.warn("Streaming to " + newEndpoint + " failed");
                         onSuccess(); // calling onSuccess for latch countdown
                     }
                 };
@@ -3029,7 +3049,8 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
                     public void run()
                     {
                         // TODO each call to transferRanges re-flushes, this 
is potentially a lot of waste
-                        StreamOut.transferRanges(newEndpoint, 
Table.open(table), Arrays.asList(range), callback, OperationType.UNBOOTSTRAP);
+                        StreamOut.transferRanges(newEndpoint, 
Table.open(table), ranges, callback,
+                                OperationType.UNBOOTSTRAP);
                     }
                 });
             }

Reply via email to