Updated Branches: refs/heads/trunk 0d5dfbcf3 -> 652ae9a64
improve DynamicEndpointSnitch by using reservoir sampling patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-4038 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/652ae9a6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/652ae9a6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/652ae9a6 Branch: refs/heads/trunk Commit: 652ae9a6456dd8cdd38e3eebdfbbfad51b2ad197 Parents: 0d5dfbc Author: Pavel Yaskevich <xe...@apache.org> Authored: Fri Jul 27 13:56:45 2012 +0300 Committer: Pavel Yaskevich <xe...@apache.org> Committed: Mon Jul 30 20:13:32 2012 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/locator/DynamicEndpointSnitch.java | 56 ++++++-------- .../cassandra/locator/ILatencySubscriber.java | 2 +- .../org/apache/cassandra/net/MessagingService.java | 6 +- .../apache/cassandra/net/ResponseVerbHandler.java | 4 +- .../locator/DynamicEndpointSnitchTest.java | 30 ++++---- 6 files changed, 46 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/652ae9a6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 48360ad..09f4508 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -33,6 +33,7 @@ * Validate date type correctly (CASSANDRA-4441) * (cql3) Allow definitions with only a PK (CASSANDRA-4361) * (cql3) Add support for row key composites (CASSANDRA-4179) + * improve DynamicEndpointSnitch by using reservoir sampling (CASSANDRA-4038) 1.1.3 http://git-wip-us.apache.org/repos/asf/cassandra/blob/652ae9a6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java index 9d4f08d..6d03391 100644 --- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java @@ -24,22 +24,22 @@ import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.BoundedStatsDeque; import org.apache.cassandra.utils.FBUtilities; +import com.yammer.metrics.stats.ExponentiallyDecayingSample; + /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector */ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { - private static final int UPDATES_PER_INTERVAL = 10000; + private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values private static final int WINDOW_SIZE = 100; private int UPDATE_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicUpdateInterval(); @@ -50,8 +50,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa private final ConcurrentHashMap<InetAddress, Double> scores = new ConcurrentHashMap<InetAddress, Double>(); private final ConcurrentHashMap<InetAddress, Long> lastReceived = new ConcurrentHashMap<InetAddress, Long>(); - private final ConcurrentHashMap<InetAddress, BoundedStatsDeque> windows = new ConcurrentHashMap<InetAddress, BoundedStatsDeque>(); - private final AtomicInteger intervalupdates = new AtomicInteger(0); + private final ConcurrentHashMap<InetAddress, ExponentiallyDecayingSample> samples = new ConcurrentHashMap<InetAddress, ExponentiallyDecayingSample>(); public final IEndpointSnitch subsnitch; @@ -183,13 +182,13 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa if (scored1 == null) { scored1 = 0.0; - receiveTiming(a1, 0.0); + receiveTiming(a1, 0); } if (scored2 == null) { scored2 = 0.0; - receiveTiming(a2, 0.0); + receiveTiming(a2, 0); } if (scored1.equals(scored2)) @@ -200,21 +199,19 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa return 1; } - public void receiveTiming(InetAddress host, Double latency) // this is cheap + public void receiveTiming(InetAddress host, long latency) // this is cheap { lastReceived.put(host, System.currentTimeMillis()); - if (intervalupdates.intValue() >= UPDATES_PER_INTERVAL) - return; - BoundedStatsDeque deque = windows.get(host); - if (deque == null) + + ExponentiallyDecayingSample sample = samples.get(host); + if (sample == null) { - BoundedStatsDeque maybeNewDeque = new BoundedStatsDeque(WINDOW_SIZE); - deque = windows.putIfAbsent(host, maybeNewDeque); - if (deque == null) - deque = maybeNewDeque; + ExponentiallyDecayingSample maybeNewSample = new ExponentiallyDecayingSample(WINDOW_SIZE, ALPHA); + sample = samples.putIfAbsent(host, maybeNewSample); + if (sample == null) + sample = maybeNewSample; } - deque.add(latency); - intervalupdates.getAndIncrement(); + sample.update(latency); } private void updateScores() // this is expensive @@ -233,9 +230,9 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa double maxLatency = 1; long maxPenalty = 1; HashMap<InetAddress, Long> penalties = new HashMap<InetAddress, Long>(); - for (Map.Entry<InetAddress, BoundedStatsDeque> entry : windows.entrySet()) + for (Map.Entry<InetAddress, ExponentiallyDecayingSample> entry : samples.entrySet()) { - double mean = entry.getValue().mean(); + double mean = entry.getValue().getSnapshot().getMedian(); if (mean > maxLatency) maxLatency = mean; long timePenalty = lastReceived.containsKey(entry.getKey()) ? lastReceived.get(entry.getKey()) : System.currentTimeMillis(); @@ -245,9 +242,9 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa if (timePenalty > maxPenalty) maxPenalty = timePenalty; } - for (Map.Entry<InetAddress, BoundedStatsDeque> entry: windows.entrySet()) + for (Map.Entry<InetAddress, ExponentiallyDecayingSample> entry: samples.entrySet()) { - double score = entry.getValue().mean() / maxLatency; + double score = entry.getValue().getSnapshot().getMedian() / maxLatency; if (penalties.containsKey(entry.getKey())) score += penalties.get(entry.getKey()) / ((double) maxPenalty); else @@ -255,16 +252,13 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa score += StorageService.instance.getSeverity(entry.getKey()); scores.put(entry.getKey(), score); } - intervalupdates.set(0); } private void reset() { - for (BoundedStatsDeque deque : windows.values()) - { - deque.clear(); - } + for (ExponentiallyDecayingSample sample : samples.values()) + sample.clear(); } public Map<InetAddress, Double> getScores() @@ -293,13 +287,11 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa { InetAddress host = InetAddress.getByName(hostname); ArrayList<Double> timings = new ArrayList<Double>(); - BoundedStatsDeque window = windows.get(host); - if (window != null) + ExponentiallyDecayingSample sample = samples.get(host); + if (sample != null) { - for (double time: window) - { + for (double time: sample.getSnapshot().getValues()) timings.add(time); - } } return timings; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/652ae9a6/src/java/org/apache/cassandra/locator/ILatencySubscriber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ILatencySubscriber.java b/src/java/org/apache/cassandra/locator/ILatencySubscriber.java index 56d7af7..d2ae6db 100644 --- a/src/java/org/apache/cassandra/locator/ILatencySubscriber.java +++ b/src/java/org/apache/cassandra/locator/ILatencySubscriber.java @@ -21,5 +21,5 @@ import java.net.InetAddress; public interface ILatencySubscriber { - public void receiveTiming(InetAddress address, Double latency); + public void receiveTiming(InetAddress address, long latency); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/652ae9a6/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 2966801..2fd72cf 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -323,7 +323,7 @@ public final class MessagingService implements MessagingServiceMBean public Object apply(Pair<String, ExpiringMap.CacheableObject<CallbackInfo>> pair) { CallbackInfo expiredCallbackInfo = pair.right.value; - maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, (double) pair.right.timeout); + maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, pair.right.timeout); totalTimeouts++; String ip = expiredCallbackInfo.target.getHostAddress(); AtomicLong c = timeoutsPerHost.get(ip); @@ -368,13 +368,13 @@ public final class MessagingService implements MessagingServiceMBean * @param address the host that replied to the message * @param latency */ - public void maybeAddLatency(IMessageCallback cb, InetAddress address, double latency) + public void maybeAddLatency(IMessageCallback cb, InetAddress address, long latency) { if (cb.isLatencyForSnitch()) addLatency(address, latency); } - public void addLatency(InetAddress address, double latency) + public void addLatency(InetAddress address, long latency) { for (ILatencySubscriber subscriber : subscribers) subscriber.receiveTiming(address, latency); http://git-wip-us.apache.org/repos/asf/cassandra/blob/652ae9a6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java index 106f76d..b62a452 100644 --- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java +++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java @@ -26,7 +26,7 @@ public class ResponseVerbHandler implements IVerbHandler public void doVerb(MessageIn message, String id) { - double age = System.currentTimeMillis() - MessagingService.instance().getRegisteredCallbackAge(id); + long latency = System.currentTimeMillis() - MessagingService.instance().getRegisteredCallbackAge(id); CallbackInfo callbackInfo = MessagingService.instance().removeRegisteredCallback(id); if (callbackInfo == null) { @@ -35,7 +35,7 @@ public class ResponseVerbHandler implements IVerbHandler } IMessageCallback cb = callbackInfo.callback; - MessagingService.instance().maybeAddLatency(cb, message.from, age); + MessagingService.instance().maybeAddLatency(cb, message.from, latency); if (cb instanceof IAsyncCallback) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/652ae9a6/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java index decd59a..21a7993 100644 --- a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java +++ b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java @@ -48,9 +48,9 @@ public class DynamicEndpointSnitchTest // first, make all hosts equal for (int i = 0; i < 5; i++) { - dsnitch.receiveTiming(host1, 1.0); - dsnitch.receiveTiming(host2, 1.0); - dsnitch.receiveTiming(host3, 1.0); + dsnitch.receiveTiming(host1, 1L); + dsnitch.receiveTiming(host2, 1L); + dsnitch.receiveTiming(host3, 1L); } Thread.sleep(sleeptime); @@ -61,9 +61,9 @@ public class DynamicEndpointSnitchTest assert dsnitch.getSortedListByProximity(self, order).equals(order); // make host1 a little worse - dsnitch.receiveTiming(host1, 2.0); - dsnitch.receiveTiming(host2, 1.0); - dsnitch.receiveTiming(host3, 1.0); + dsnitch.receiveTiming(host1, 2L); + dsnitch.receiveTiming(host2, 1L); + dsnitch.receiveTiming(host3, 1L); Thread.sleep(sleeptime); order.clear(); @@ -73,9 +73,9 @@ public class DynamicEndpointSnitchTest assert dsnitch.getSortedListByProximity(self, order).equals(order); // make host2 as bad as host1 - dsnitch.receiveTiming(host2, 2.0); - dsnitch.receiveTiming(host1, 1.0); - dsnitch.receiveTiming(host3, 1.0); + dsnitch.receiveTiming(host2, 2L); + dsnitch.receiveTiming(host1, 1L); + dsnitch.receiveTiming(host3, 1L); Thread.sleep(sleeptime); order.clear(); @@ -87,9 +87,9 @@ public class DynamicEndpointSnitchTest // make host3 the worst for (int i = 0; i < 2; i++) { - dsnitch.receiveTiming(host1, 1.0); - dsnitch.receiveTiming(host2, 1.0); - dsnitch.receiveTiming(host3, 2.0); + dsnitch.receiveTiming(host1, 1L); + dsnitch.receiveTiming(host2, 1L); + dsnitch.receiveTiming(host3, 2L); } Thread.sleep(sleeptime); @@ -102,9 +102,9 @@ public class DynamicEndpointSnitchTest // make host3 equal to the others for (int i = 0; i < 2; i++) { - dsnitch.receiveTiming(host1, 1.0); - dsnitch.receiveTiming(host2, 1.0); - dsnitch.receiveTiming(host3, 1.0); + dsnitch.receiveTiming(host1, 1L); + dsnitch.receiveTiming(host2, 1L); + dsnitch.receiveTiming(host3, 1L); } Thread.sleep(sleeptime);