Updated Branches:
  refs/heads/trunk 0d5dfbcf3 -> 652ae9a64

improve DynamicEndpointSnitch by using reservoir sampling
patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-4038


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/652ae9a6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/652ae9a6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/652ae9a6

Branch: refs/heads/trunk
Commit: 652ae9a6456dd8cdd38e3eebdfbbfad51b2ad197
Parents: 0d5dfbc
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Fri Jul 27 13:56:45 2012 +0300
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Mon Jul 30 20:13:32 2012 +0300

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/locator/DynamicEndpointSnitch.java   |   56 ++++++--------
 .../cassandra/locator/ILatencySubscriber.java      |    2 +-
 .../org/apache/cassandra/net/MessagingService.java |    6 +-
 .../apache/cassandra/net/ResponseVerbHandler.java  |    4 +-
 .../locator/DynamicEndpointSnitchTest.java         |   30 ++++----
 6 files changed, 46 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/652ae9a6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 48360ad..09f4508 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -33,6 +33,7 @@
  * Validate date type correctly (CASSANDRA-4441)
  * (cql3) Allow definitions with only a PK (CASSANDRA-4361)
  * (cql3) Add support for row key composites (CASSANDRA-4179)
+ * improve DynamicEndpointSnitch by using reservoir sampling (CASSANDRA-4038)
 
 
 1.1.3

http://git-wip-us.apache.org/repos/asf/cassandra/blob/652ae9a6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index 9d4f08d..6d03391 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -24,22 +24,22 @@ import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.BoundedStatsDeque;
 import org.apache.cassandra.utils.FBUtilities;
 
+import com.yammer.metrics.stats.ExponentiallyDecayingSample;
+
 /**
  * A dynamic snitch that sorts endpoints by latency with an adapted phi 
failure detector
  */
 public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements 
ILatencySubscriber, DynamicEndpointSnitchMBean
 {
-    private static final int UPDATES_PER_INTERVAL = 10000;
+    private static final double ALPHA = 0.75; // set to 0.75 to make EDS more 
biased to towards the newer values
     private static final int WINDOW_SIZE = 100;
 
     private int UPDATE_INTERVAL_IN_MS = 
DatabaseDescriptor.getDynamicUpdateInterval();
@@ -50,8 +50,7 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
 
     private final ConcurrentHashMap<InetAddress, Double> scores = new 
ConcurrentHashMap<InetAddress, Double>();
     private final ConcurrentHashMap<InetAddress, Long> lastReceived = new 
ConcurrentHashMap<InetAddress, Long>();
-    private final ConcurrentHashMap<InetAddress, BoundedStatsDeque> windows = 
new ConcurrentHashMap<InetAddress, BoundedStatsDeque>();
-    private final AtomicInteger intervalupdates = new AtomicInteger(0);
+    private final ConcurrentHashMap<InetAddress, ExponentiallyDecayingSample> 
samples = new ConcurrentHashMap<InetAddress, ExponentiallyDecayingSample>();
 
     public final IEndpointSnitch subsnitch;
 
@@ -183,13 +182,13 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
         if (scored1 == null)
         {
             scored1 = 0.0;
-            receiveTiming(a1, 0.0);
+            receiveTiming(a1, 0);
         }
 
         if (scored2 == null)
         {
             scored2 = 0.0;
-            receiveTiming(a2, 0.0);
+            receiveTiming(a2, 0);
         }
 
         if (scored1.equals(scored2))
@@ -200,21 +199,19 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
             return 1;
     }
 
-    public void receiveTiming(InetAddress host, Double latency) // this is 
cheap
+    public void receiveTiming(InetAddress host, long latency) // this is cheap
     {
         lastReceived.put(host, System.currentTimeMillis());
-        if (intervalupdates.intValue() >= UPDATES_PER_INTERVAL)
-            return;
-        BoundedStatsDeque deque = windows.get(host);
-        if (deque == null)
+
+        ExponentiallyDecayingSample sample = samples.get(host);
+        if (sample == null)
         {
-            BoundedStatsDeque maybeNewDeque = new 
BoundedStatsDeque(WINDOW_SIZE);
-            deque = windows.putIfAbsent(host, maybeNewDeque);
-            if (deque == null)
-                deque = maybeNewDeque;
+            ExponentiallyDecayingSample maybeNewSample = new 
ExponentiallyDecayingSample(WINDOW_SIZE, ALPHA);
+            sample = samples.putIfAbsent(host, maybeNewSample);
+            if (sample == null)
+                sample = maybeNewSample;
         }
-        deque.add(latency);
-        intervalupdates.getAndIncrement();
+        sample.update(latency);
     }
 
     private void updateScores() // this is expensive
@@ -233,9 +230,9 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
         double maxLatency = 1;
         long maxPenalty = 1;
         HashMap<InetAddress, Long> penalties = new HashMap<InetAddress, 
Long>();
-        for (Map.Entry<InetAddress, BoundedStatsDeque> entry : 
windows.entrySet())
+        for (Map.Entry<InetAddress, ExponentiallyDecayingSample> entry : 
samples.entrySet())
         {
-            double mean = entry.getValue().mean();
+            double mean = entry.getValue().getSnapshot().getMedian();
             if (mean > maxLatency)
                 maxLatency = mean;
             long timePenalty = lastReceived.containsKey(entry.getKey()) ? 
lastReceived.get(entry.getKey()) : System.currentTimeMillis();
@@ -245,9 +242,9 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
             if (timePenalty > maxPenalty)
                 maxPenalty = timePenalty;
         }
-        for (Map.Entry<InetAddress, BoundedStatsDeque> entry: 
windows.entrySet())
+        for (Map.Entry<InetAddress, ExponentiallyDecayingSample> entry: 
samples.entrySet())
         {
-            double score = entry.getValue().mean() / maxLatency;
+            double score = entry.getValue().getSnapshot().getMedian() / 
maxLatency;
             if (penalties.containsKey(entry.getKey()))
                 score += penalties.get(entry.getKey()) / ((double) maxPenalty);
             else
@@ -255,16 +252,13 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
             score += StorageService.instance.getSeverity(entry.getKey());
             scores.put(entry.getKey(), score);            
         }
-        intervalupdates.set(0);
     }
 
 
     private void reset()
     {
-        for (BoundedStatsDeque deque : windows.values())
-        {
-            deque.clear();
-        }
+        for (ExponentiallyDecayingSample sample : samples.values())
+            sample.clear();
     }
 
     public Map<InetAddress, Double> getScores()
@@ -293,13 +287,11 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
     {
         InetAddress host = InetAddress.getByName(hostname);
         ArrayList<Double> timings = new ArrayList<Double>();
-        BoundedStatsDeque window = windows.get(host);
-        if (window != null)
+        ExponentiallyDecayingSample sample = samples.get(host);
+        if (sample != null)
         {
-            for (double time: window)
-            {
+            for (double time: sample.getSnapshot().getValues())
                 timings.add(time);
-            }
         }
         return timings;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/652ae9a6/src/java/org/apache/cassandra/locator/ILatencySubscriber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ILatencySubscriber.java 
b/src/java/org/apache/cassandra/locator/ILatencySubscriber.java
index 56d7af7..d2ae6db 100644
--- a/src/java/org/apache/cassandra/locator/ILatencySubscriber.java
+++ b/src/java/org/apache/cassandra/locator/ILatencySubscriber.java
@@ -21,5 +21,5 @@ import java.net.InetAddress;
 
 public interface ILatencySubscriber
 {
-    public void receiveTiming(InetAddress address, Double latency);
+    public void receiveTiming(InetAddress address, long latency);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/652ae9a6/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 2966801..2fd72cf 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -323,7 +323,7 @@ public final class MessagingService implements 
MessagingServiceMBean
             public Object apply(Pair<String, 
ExpiringMap.CacheableObject<CallbackInfo>> pair)
             {
                 CallbackInfo expiredCallbackInfo = pair.right.value;
-                maybeAddLatency(expiredCallbackInfo.callback, 
expiredCallbackInfo.target, (double) pair.right.timeout);
+                maybeAddLatency(expiredCallbackInfo.callback, 
expiredCallbackInfo.target, pair.right.timeout);
                 totalTimeouts++;
                 String ip = expiredCallbackInfo.target.getHostAddress();
                 AtomicLong c = timeoutsPerHost.get(ip);
@@ -368,13 +368,13 @@ public final class MessagingService implements 
MessagingServiceMBean
      * @param address the host that replied to the message
      * @param latency
      */
-    public void maybeAddLatency(IMessageCallback cb, InetAddress address, 
double latency)
+    public void maybeAddLatency(IMessageCallback cb, InetAddress address, long 
latency)
     {
         if (cb.isLatencyForSnitch())
             addLatency(address, latency);
     }
 
-    public void addLatency(InetAddress address, double latency)
+    public void addLatency(InetAddress address, long latency)
     {
         for (ILatencySubscriber subscriber : subscribers)
             subscriber.receiveTiming(address, latency);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/652ae9a6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java 
b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 106f76d..b62a452 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -26,7 +26,7 @@ public class ResponseVerbHandler implements IVerbHandler
 
     public void doVerb(MessageIn message, String id)
     {
-        double age = System.currentTimeMillis() - 
MessagingService.instance().getRegisteredCallbackAge(id);
+        long latency = System.currentTimeMillis() - 
MessagingService.instance().getRegisteredCallbackAge(id);
         CallbackInfo callbackInfo = 
MessagingService.instance().removeRegisteredCallback(id);
         if (callbackInfo == null)
         {
@@ -35,7 +35,7 @@ public class ResponseVerbHandler implements IVerbHandler
         }
 
         IMessageCallback cb = callbackInfo.callback;
-        MessagingService.instance().maybeAddLatency(cb, message.from, age);
+        MessagingService.instance().maybeAddLatency(cb, message.from, latency);
 
         if (cb instanceof IAsyncCallback)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/652ae9a6/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java 
b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
index decd59a..21a7993 100644
--- a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
@@ -48,9 +48,9 @@ public class DynamicEndpointSnitchTest
         // first, make all hosts equal
         for (int i = 0; i < 5; i++)
         {
-            dsnitch.receiveTiming(host1, 1.0);
-            dsnitch.receiveTiming(host2, 1.0);
-            dsnitch.receiveTiming(host3, 1.0);
+            dsnitch.receiveTiming(host1, 1L);
+            dsnitch.receiveTiming(host2, 1L);
+            dsnitch.receiveTiming(host3, 1L);
         }
 
         Thread.sleep(sleeptime);
@@ -61,9 +61,9 @@ public class DynamicEndpointSnitchTest
         assert dsnitch.getSortedListByProximity(self, order).equals(order);
 
         // make host1 a little worse
-        dsnitch.receiveTiming(host1, 2.0);
-        dsnitch.receiveTiming(host2, 1.0);
-        dsnitch.receiveTiming(host3, 1.0);
+        dsnitch.receiveTiming(host1, 2L);
+        dsnitch.receiveTiming(host2, 1L);
+        dsnitch.receiveTiming(host3, 1L);
         Thread.sleep(sleeptime);
 
         order.clear();
@@ -73,9 +73,9 @@ public class DynamicEndpointSnitchTest
         assert dsnitch.getSortedListByProximity(self, order).equals(order);
 
         // make host2 as bad as host1
-        dsnitch.receiveTiming(host2, 2.0);
-        dsnitch.receiveTiming(host1, 1.0);
-        dsnitch.receiveTiming(host3, 1.0);
+        dsnitch.receiveTiming(host2, 2L);
+        dsnitch.receiveTiming(host1, 1L);
+        dsnitch.receiveTiming(host3, 1L);
         Thread.sleep(sleeptime);
 
         order.clear();
@@ -87,9 +87,9 @@ public class DynamicEndpointSnitchTest
         // make host3 the worst
         for (int i = 0; i < 2; i++)
         {
-            dsnitch.receiveTiming(host1, 1.0);
-            dsnitch.receiveTiming(host2, 1.0);
-            dsnitch.receiveTiming(host3, 2.0);
+            dsnitch.receiveTiming(host1, 1L);
+            dsnitch.receiveTiming(host2, 1L);
+            dsnitch.receiveTiming(host3, 2L);
         }
         Thread.sleep(sleeptime);
 
@@ -102,9 +102,9 @@ public class DynamicEndpointSnitchTest
         // make host3 equal to the others
         for (int i = 0; i < 2; i++)
         {
-            dsnitch.receiveTiming(host1, 1.0);
-            dsnitch.receiveTiming(host2, 1.0);
-            dsnitch.receiveTiming(host3, 1.0);
+            dsnitch.receiveTiming(host1, 1L);
+            dsnitch.receiveTiming(host2, 1L);
+            dsnitch.receiveTiming(host3, 1L);
         }
         Thread.sleep(sleeptime);
 

Reply via email to