Merge branch 'cassandra-2.0' into trunk

Conflicts:
        src/java/org/apache/cassandra/streaming/StreamManager.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6cf15b2b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6cf15b2b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6cf15b2b

Branch: refs/heads/trunk
Commit: 6cf15b2b438b25cabacbad3c1a557061d3aa80e9
Parents: 97859b0 ea28d36
Author: Vijay <vijay2...@gmail.com>
Authored: Sun Feb 16 13:43:29 2014 -0800
Committer: Vijay <vijay2...@gmail.com>
Committed: Sun Feb 16 13:43:29 2014 -0800

----------------------------------------------------------------------
 src/java/org/apache/cassandra/streaming/StreamManager.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cf15b2b/src/java/org/apache/cassandra/streaming/StreamManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamManager.java
index c82e45c,3fe6179..872e524
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@@ -54,47 -55,17 +54,47 @@@ public class StreamManager implements S
       * with the rate of Double.MAX_VALUE bytes per second.
       * Rate unit is bytes per sec.
       *
 -     * @return RateLimiter with rate limit set
 +     * @return StreamRateLimiter with rate limit set based on peer location.
       */
 -    public static RateLimiter getRateLimiter()
 +    public static StreamRateLimiter getRateLimiter(InetAddress peer)
 +    {
 +        return new StreamRateLimiter(peer);
 +    }
 +
 +    public static class StreamRateLimiter
      {
-         private static final double ONE_MEGA_BITS = 1024 * 1024 * 8;
 -        double currentThroughput = (((double) 
DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec()) * 1024 * 1024 ) 
/ 8;
 -        // if throughput is set to 0, throttling is disabled
 -        if (currentThroughput == 0)
 -            currentThroughput = Double.MAX_VALUE;
 -        if (limiter.getRate() != currentThroughput)
 -            limiter.setRate(currentThroughput);
 -        return limiter;
++        private static final double ONE_MEGA_BYTE = (1024 * 1024) / 8; // 
from bits
 +        private static final RateLimiter limiter = 
RateLimiter.create(Double.MAX_VALUE);
 +        private static final RateLimiter interDCLimiter = 
RateLimiter.create(Double.MAX_VALUE);
 +        private final boolean isLocalDC;
 +
 +        public StreamRateLimiter(InetAddress peer)
 +        {
-             double throughput = ((double) 
DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec()) * ONE_MEGA_BITS;
++            double throughput = ((double) 
DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec()) * ONE_MEGA_BYTE;
 +            mayUpdateThroughput(throughput, limiter);
 +
-             double interDCThroughput = ((double) 
DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec()) * 
ONE_MEGA_BITS;
++            double interDCThroughput = ((double) 
DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec()) * 
ONE_MEGA_BYTE;
 +            mayUpdateThroughput(interDCThroughput, interDCLimiter);
 +
 +            isLocalDC = DatabaseDescriptor.getLocalDataCenter().equals(
 +                        
DatabaseDescriptor.getEndpointSnitch().getDatacenter(peer));
 +        }
 +
 +        private void mayUpdateThroughput(double limit, RateLimiter 
rateLimiter)
 +        {
 +            // if throughput is set to 0, throttling is disabled
 +            if (limit == 0)
 +                limit = Double.MAX_VALUE;
 +            if (rateLimiter.getRate() != limit)
 +                rateLimiter.setRate(limit);
 +        }
 +
 +        public void acquire(int toTransfer)
 +        {
 +            limiter.acquire(toTransfer);
 +            if (!isLocalDC)
 +                interDCLimiter.acquire(toTransfer);
 +        }
      }
  
      private final StreamEventJMXNotifier notifier = new 
StreamEventJMXNotifier();

Reply via email to