Updated Branches: refs/heads/cassandra-1.0 452ddf63c -> 1e7501381 refs/heads/trunk bb7b64312 -> b3a378c02
merge from 1.0 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b3a378c0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b3a378c0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b3a378c0 Branch: refs/heads/trunk Commit: b3a378c02598f2150aede95daaae273ea356f127 Parents: bb7b643 1e75013 Author: Jonathan Ellis <jbel...@apache.org> Authored: Wed Jan 11 20:48:40 2012 -0600 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Wed Jan 11 20:48:40 2012 -0600 ---------------------------------------------------------------------- .../org/apache/cassandra/net/MessagingService.java | 16 ++++++++++++++- .../org/apache/cassandra/utils/ExpiringMap.java | 3 +- 2 files changed, 17 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a378c0/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/MessagingService.java index c54cdec,e12f9ee..d7613cb --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@@ -513,23 -464,15 +514,23 @@@ public final class MessagingService imp subscribers.add(subcriber); } - public void waitForStreaming() throws InterruptedException + public void clearCallbacksUnsafe() { - callbacks.clear(); - streamExecutor_.shutdown(); - streamExecutor_.awaitTermination(24, TimeUnit.HOURS); ++ callbacks.reset(); } - public void clearCallbacksUnsafe() + public void waitForStreaming() throws InterruptedException { - callbacks.reset(); + // this does not prevent new streams from beginning after a drain begins, but since streams are only + // started in response to explicit operator action (bootstrap/move/repair/etc) that feels like a feature. + for (DebuggableThreadPoolExecutor e : streamExecutors.values()) + e.shutdown(); + + for (DebuggableThreadPoolExecutor e : streamExecutors.values()) + { + if (e.awaitTermination(24, TimeUnit.HOURS)) + logger_.error("Stream took more than 24H to complete; skipping"); + } } /**