Updated Branches:
  refs/heads/trunk 3b8bff78d -> 796df645c

Revert "remove vestiges of STREAM stage"

This reverts commit 69ad77d8c6a1e2c78255d54477798d5619a3a84d.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/796df645
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/796df645
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/796df645

Branch: refs/heads/trunk
Commit: 796df645c603556d5d09867612cf65a7f2529e51
Parents: 3b8bff7
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Tue Oct 23 09:12:39 2012 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Tue Oct 23 09:12:50 2012 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/concurrent/Stage.java     |    2 ++
 .../apache/cassandra/concurrent/StageManager.java  |    1 +
 .../org/apache/cassandra/net/MessagingService.java |    1 +
 .../apache/cassandra/service/StorageService.java   |   11 +++++++++--
 4 files changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/796df645/src/java/org/apache/cassandra/concurrent/Stage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/Stage.java 
b/src/java/org/apache/cassandra/concurrent/Stage.java
index f2907e2..062bbe6 100644
--- a/src/java/org/apache/cassandra/concurrent/Stage.java
+++ b/src/java/org/apache/cassandra/concurrent/Stage.java
@@ -21,6 +21,7 @@ public enum Stage
 {
     READ,
     MUTATION,
+    STREAM,
     GOSSIP,
     REQUEST_RESPONSE,
     ANTI_ENTROPY,
@@ -40,6 +41,7 @@ public enum Stage
             case MIGRATION:
             case MISC:
             case TRACING:
+            case STREAM:
             case INTERNAL_RESPONSE:
                 return "internal";
             case MUTATION:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/796df645/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java 
b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 7ca45f4..8359346 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -51,6 +51,7 @@ public class StageManager
         stages.put(Stage.INTERNAL_RESPONSE, 
multiThreadedStage(Stage.INTERNAL_RESPONSE, 
Runtime.getRuntime().availableProcessors()));
         stages.put(Stage.REPLICATE_ON_WRITE, 
multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE, 
getConcurrentReplicators(), MAX_REPLICATE_ON_WRITE_TASKS));
         // the rest are all single-threaded
+        stages.put(Stage.STREAM, new 
JMXEnabledThreadPoolExecutor(Stage.STREAM));
         stages.put(Stage.GOSSIP, new 
JMXEnabledThreadPoolExecutor(Stage.GOSSIP));
         stages.put(Stage.ANTI_ENTROPY, new 
JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
         stages.put(Stage.MIGRATION, new 
JMXEnabledThreadPoolExecutor(Stage.MIGRATION));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/796df645/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 263d5c0..677e17d 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -133,6 +133,7 @@ public final class MessagingService implements 
MessagingServiceMBean
         put(Verb.READ, Stage.READ);
         put(Verb.REQUEST_RESPONSE, Stage.REQUEST_RESPONSE);
         put(Verb.STREAM_REPLY, Stage.MISC); // TODO does this really belong on 
misc? I've just copied old behavior here
+        put(Verb.STREAM_REQUEST, Stage.STREAM);
         put(Verb.RANGE_SLICE, Stage.READ);
         put(Verb.BOOTSTRAP_TOKEN, Stage.MISC);
         put(Verb.TREE_REQUEST, Stage.ANTI_ENTROPY);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/796df645/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 58ce112..4cc8581 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3352,8 +3352,15 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
                     }
                 };
 
-                // TODO each call to transferRanges re-flushes, this is 
potentially a lot of waste
-                StreamOut.transferRanges(newEndpoint, Table.open(table), 
ranges, callback, OperationType.UNBOOTSTRAP);
+                StageManager.getStage(Stage.STREAM).execute(new Runnable()
+                {
+                    public void run()
+                    {
+                        // TODO each call to transferRanges re-flushes, this 
is potentially a lot of waste
+                        StreamOut.transferRanges(newEndpoint, 
Table.open(table), ranges, callback,
+                                OperationType.UNBOOTSTRAP);
+                    }
+                });
             }
         }
         return latch;

Reply via email to