Merge branch 'cassandra-2.0' into trunk Conflicts: src/java/org/apache/cassandra/streaming/StreamManager.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6cf15b2b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6cf15b2b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6cf15b2b Branch: refs/heads/trunk Commit: 6cf15b2b438b25cabacbad3c1a557061d3aa80e9 Parents: 97859b0 ea28d36 Author: Vijay <vijay2...@gmail.com> Authored: Sun Feb 16 13:43:29 2014 -0800 Committer: Vijay <vijay2...@gmail.com> Committed: Sun Feb 16 13:43:29 2014 -0800 ---------------------------------------------------------------------- src/java/org/apache/cassandra/streaming/StreamManager.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cf15b2b/src/java/org/apache/cassandra/streaming/StreamManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamManager.java index c82e45c,3fe6179..872e524 --- a/src/java/org/apache/cassandra/streaming/StreamManager.java +++ b/src/java/org/apache/cassandra/streaming/StreamManager.java @@@ -54,47 -55,17 +54,47 @@@ public class StreamManager implements S * with the rate of Double.MAX_VALUE bytes per second. * Rate unit is bytes per sec. * - * @return RateLimiter with rate limit set + * @return StreamRateLimiter with rate limit set based on peer location. */ - public static RateLimiter getRateLimiter() + public static StreamRateLimiter getRateLimiter(InetAddress peer) + { + return new StreamRateLimiter(peer); + } + + public static class StreamRateLimiter { - private static final double ONE_MEGA_BITS = 1024 * 1024 * 8; - double currentThroughput = (((double) DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec()) * 1024 * 1024 ) / 8; - // if throughput is set to 0, throttling is disabled - if (currentThroughput == 0) - currentThroughput = Double.MAX_VALUE; - if (limiter.getRate() != currentThroughput) - limiter.setRate(currentThroughput); - return limiter; ++ private static final double ONE_MEGA_BYTE = (1024 * 1024) / 8; // from bits + private static final RateLimiter limiter = RateLimiter.create(Double.MAX_VALUE); + private static final RateLimiter interDCLimiter = RateLimiter.create(Double.MAX_VALUE); + private final boolean isLocalDC; + + public StreamRateLimiter(InetAddress peer) + { - double throughput = ((double) DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec()) * ONE_MEGA_BITS; ++ double throughput = ((double) DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec()) * ONE_MEGA_BYTE; + mayUpdateThroughput(throughput, limiter); + - double interDCThroughput = ((double) DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec()) * ONE_MEGA_BITS; ++ double interDCThroughput = ((double) DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec()) * ONE_MEGA_BYTE; + mayUpdateThroughput(interDCThroughput, interDCLimiter); + + isLocalDC = DatabaseDescriptor.getLocalDataCenter().equals( + DatabaseDescriptor.getEndpointSnitch().getDatacenter(peer)); + } + + private void mayUpdateThroughput(double limit, RateLimiter rateLimiter) + { + // if throughput is set to 0, throttling is disabled + if (limit == 0) + limit = Double.MAX_VALUE; + if (rateLimiter.getRate() != limit) + rateLimiter.setRate(limit); + } + + public void acquire(int toTransfer) + { + limiter.acquire(toTransfer); + if (!isLocalDC) + interDCLimiter.acquire(toTransfer); + } } private final StreamEventJMXNotifier notifier = new StreamEventJMXNotifier();