Author: brandonwilliams Date: Fri Jul 9 23:07:00 2010 New Revision: 962720
URL: http://svn.apache.org/viewvc?rev=962720&view=rev Log: Dynamic snitch to adaptively avoid reading from slow nodes. Patch by brandonwilliams; reviewed by jbellis for CASSANDRA-981 Added: cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencyPublisher.java - copied, changed from r962683, cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencySubscriber.java - copied, changed from r962683, cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java Modified: cassandra/trunk/conf/cassandra.yaml cassandra/trunk/src/java/org/apache/cassandra/config/Config.java cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java Modified: cassandra/trunk/conf/cassandra.yaml URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=962720&r1=962719&r2=962720&view=diff ============================================================================== --- cassandra/trunk/conf/cassandra.yaml (original) +++ cassandra/trunk/conf/cassandra.yaml Fri Jul 9 23:07:00 2010 @@ -170,6 +170,12 @@ request_scheduler: org.apache.cassandra. # the request scheduling. The current supported option is "keyspace" request_scheduler_id: keyspace +# dynamic_snitch -- This boolean controls whether the above snitch is +# wrapped with a dynamic snitch, which will monitor read latencies +# and avoid reading from hosts that have slowed (due to compaction, +# for instance) +dynamic_snitch: false + # A ColumnFamily is the Cassandra concept closest to a relational table. # # Keyspaces are separate groups of ColumnFamilies. Except in very Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=962720&r1=962719&r2=962720&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Fri Jul 9 23:07:00 2010 @@ -71,6 +71,7 @@ public class Config { public Integer commitlog_sync_period_in_ms; public String endpoint_snitch; + public Boolean dynamic_snitch = false; public String request_scheduler; public RequestSchedulerId request_scheduler_id; Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=962720&r1=962719&r2=962720&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Jul 9 23:07:00 2010 @@ -27,6 +27,7 @@ import java.net.URL; import java.net.UnknownHostException; import java.util.*; +import org.apache.cassandra.locator.DynamicEndpointSnitch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -415,10 +416,10 @@ public class DatabaseDescriptor throw (ConfigurationException)e.getCause(); throw new ConfigurationException("Error instantiating " + endpointSnitchClassName + " " + e.getMessage()); } - return snitch; + return conf.dynamic_snitch ? new DynamicEndpointSnitch(snitch) : snitch; } - public static void loadSchemas() throws IOException + public static void loadSchemas() throws IOException { // we can load tables from local storage if a version is set in the system table and that acutally maps to // real data in the definitions table. If we do end up loading from xml, store the defintions so that we Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java?rev=962720&r1=962719&r2=962720&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java Fri Jul 9 23:07:00 2010 @@ -21,6 +21,7 @@ package org.apache.cassandra.locator; import java.net.InetAddress; import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -42,4 +43,9 @@ public abstract class AbstractEndpointSn public abstract List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress); public abstract List<InetAddress> sortByProximity(InetAddress address, List<InetAddress> addresses); + + public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) + { + return a1.getHostAddress().compareTo(a2.getHostAddress()); + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java?rev=962720&r1=962719&r2=962720&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java Fri Jul 9 23:07:00 2010 @@ -72,30 +72,35 @@ public abstract class AbstractRackAwareS { public int compare(InetAddress a1, InetAddress a2) { - if (address.equals(a1) && !address.equals(a2)) - return -1; - if (address.equals(a2) && !address.equals(a1)) - return 1; + return compareEndpoints(address, a1, a2); + }; + }); + return addresses; + } - String addressRack = getRack(address); - String a1Rack = getRack(a1); - String a2Rack = getRack(a2); - if (addressRack.equals(a1Rack) && !addressRack.equals(a2Rack)) - return -1; - if (addressRack.equals(a2Rack) && !addressRack.equals(a1Rack)) - return 1; + public int compareEndpoints(InetAddress address, InetAddress a1, InetAddress a2) + { + if (address.equals(a1) && !address.equals(a2)) + return -1; + if (address.equals(a2) && !address.equals(a1)) + return 1; - String addressDatacenter = getDatacenter(address); - String a1Datacenter = getDatacenter(a1); - String a2Datacenter = getDatacenter(a2); - if (addressDatacenter.equals(a1Datacenter) && !addressDatacenter.equals(a2Datacenter)) - return -1; - if (addressDatacenter.equals(a2Datacenter) && !addressDatacenter.equals(a1Datacenter)) - return 1; + String addressRack = getRack(address); + String a1Rack = getRack(a1); + String a2Rack = getRack(a2); + if (addressRack.equals(a1Rack) && !addressRack.equals(a2Rack)) + return -1; + if (addressRack.equals(a2Rack) && !addressRack.equals(a1Rack)) + return 1; - return 0; - } - }); - return addresses; + String addressDatacenter = getDatacenter(address); + String a1Datacenter = getDatacenter(a1); + String a2Datacenter = getDatacenter(a2); + if (addressDatacenter.equals(a1Datacenter) && !addressDatacenter.equals(a2Datacenter)) + return -1; + if (addressDatacenter.equals(a2Datacenter) && !addressDatacenter.equals(a1Datacenter)) + return 1; + + return 0; } } Added: cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=962720&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Fri Jul 9 23:07:00 2010 @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import java.net.InetAddress; +import java.util.*; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import java.lang.management.ManagementFactory; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.ResponseVerbHandler; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.AbstractStatsDeque; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.ILatencyPublisher; +import org.apache.cassandra.locator.ILatencySubscriber; +import org.apache.cassandra.locator.AbstractEndpointSnitch; +import org.apache.cassandra.locator.DynamicEndpointSnitchMBean; +import org.apache.cassandra.utils.FBUtilities; + +/** + * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector + */ +public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean +{ + private static int UPDATES_PER_INTERVAL = 100; + private static int UPDATE_INTERVAL_IN_MS = 1000; + private static int RESET_INTERVAL_IN_MS = 60000; + private static int WINDOW_SIZE = 100; + private boolean registered = false; + + private ConcurrentHashMap<InetAddress, Double> scores = new ConcurrentHashMap(); + private ConcurrentHashMap<InetAddress, AdaptiveLatencyTracker> windows = new ConcurrentHashMap(); + private AtomicInteger intervalupdates = new AtomicInteger(0); + public IEndpointSnitch subsnitch; + + public DynamicEndpointSnitch(IEndpointSnitch snitch) + { + subsnitch = snitch; + TimerTask update = new TimerTask() + { + public void run() + { + updateScores(); + } + }; + TimerTask reset = new TimerTask() + { + public void run() + { + // we do this so that a host considered bad has a chance to recover, otherwise would we never try + // to read from it, which would cause its score to never change + reset(); + } + }; + Timer timer = new Timer("DynamicEndpointSnitch"); + timer.schedule(update, UPDATE_INTERVAL_IN_MS, UPDATE_INTERVAL_IN_MS); + timer.schedule(reset, RESET_INTERVAL_IN_MS, RESET_INTERVAL_IN_MS); + + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + try + { + mbs.registerMBean(this, new ObjectName("org.apache.cassandra.locator:type=DynamicEndpointSnitch")); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + public String getRack(InetAddress endpoint) + { + return subsnitch.getRack(endpoint); + } + + public String getDatacenter(InetAddress endpoint) + { + return subsnitch.getDatacenter(endpoint); + } + + public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses) + { + List<InetAddress> list = new ArrayList<InetAddress>(addresses); + sortByProximity(address, list); + return list; + } + + public List<InetAddress> sortByProximity(final InetAddress address, List<InetAddress> addresses) + { + assert address == FBUtilities.getLocalAddress(); // we only know about ourself + Collections.sort(addresses, new Comparator<InetAddress>() + { + public int compare(InetAddress a1, InetAddress a2) + { + return compareEndpoints(address, a1, a2); + } + }); + return addresses; + } + + public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) + { + Double scored1 = scores.get(a1); + Double scored2 = scores.get(a2); + + if (scored1 == null || scored2 == null) + return subsnitch.compareEndpoints(target, a1, a2); + if (scored1.equals(scored2)) + return 0; + if (scored1 < scored2) + return 1; + else + return -1; + } + + public void receiveTiming(InetAddress host, Double latency) // this is cheap + { + if (intervalupdates.intValue() >= UPDATES_PER_INTERVAL) + return; + AdaptiveLatencyTracker tracker = windows.get(host); + if (tracker == null) + { + AdaptiveLatencyTracker alt = new AdaptiveLatencyTracker(WINDOW_SIZE); + tracker = windows.putIfAbsent(host, alt); + if (tracker == null) + tracker = alt; + } + tracker.add(latency); + intervalupdates.getAndIncrement(); + } + + private void updateScores() // this is expensive + { + if (!registered) + { + ILatencyPublisher handler = (ILatencyPublisher)MessagingService.instance.getVerbHandler(StorageService.Verb.READ_RESPONSE); + if (handler != null) + { + handler.register(this); + registered = true; + } + + } + for (Map.Entry<InetAddress, AdaptiveLatencyTracker> entry: windows.entrySet()) + { + scores.put(entry.getKey(), entry.getValue().score()); + } + intervalupdates.set(0); + } + + private void reset() + { + for (AdaptiveLatencyTracker tracker : windows.values()) + { + tracker.clear(); + } + } + + public Map<InetAddress, Double> getScores() + { + return scores; + } +} + +/** a threadsafe version of BoundedStatsDeque+ArrivalWindow with modification for arbitrary times **/ +class AdaptiveLatencyTracker extends AbstractStatsDeque +{ + private LinkedBlockingDeque latencies; + private final int size; + private static double SENTINEL_COMPARE = 0.0001; // arbitrary; as long as it is the same across hosts it doesn't matter + + AdaptiveLatencyTracker(int size) + { + this.size = size; + latencies = new LinkedBlockingDeque(size); + } + + public void add(double i) + { + latencies.offer(i); + } + + public void clear() + { + latencies.clear(); + } + + public Iterator<Double> iterator() + { + return latencies.iterator(); + } + + public int size() + { + return latencies.size(); + } + + double p(double t) + { + double mean = mean(); + double exponent = (-1) * (t) / mean; + return 1 - ( 1 - Math.pow( Math.E, exponent) ); + } + + double score() + { + double log = 0d; + if ( latencies.size() > 0 ) + { + double probability = p(SENTINEL_COMPARE); + log = (-1) * Math.log10( probability ); + } + return log; + } + +} Added: cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java?rev=962720&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java Fri Jul 9 23:07:00 2010 @@ -0,0 +1,27 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.cassandra.locator; + +import java.net.InetAddress; +import java.util.Map; + +public interface DynamicEndpointSnitchMBean { + public Map<InetAddress, Double> getScores(); +} Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java?rev=962720&r1=962719&r2=962720&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java Fri Jul 9 23:07:00 2010 @@ -31,6 +31,16 @@ import java.util.List; public interface IEndpointSnitch { /** + * returns a String repesenting the rack this endpoint belongs to + */ + public String getRack(InetAddress endpoint); + + /** + * returns a String representing the datacenter this endpoint belongs to + */ + public String getDatacenter(InetAddress endpoint); + + /** * returns a new <tt>List</tt> sorted by proximity to the given endpoint */ public List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress); @@ -45,4 +55,9 @@ public interface IEndpointSnitch * @param subscriber the subscriber to notify */ public void register(AbstractReplicationStrategy subscriber); + + /** + * compares two endpoints in relation to the target endpoint, returning as Comparator.compare would + */ + public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2); } Copied: cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencyPublisher.java (from r962683, cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java) URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencyPublisher.java?p2=cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencyPublisher.java&p1=cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java&r1=962683&r2=962720&rev=962720&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencyPublisher.java Fri Jul 9 23:07:00 2010 @@ -18,24 +18,9 @@ package org.apache.cassandra.locator; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; +import org.apache.cassandra.locator.ILatencySubscriber; -/** - * A simple endpoint snitch implementation does not sort addresses by - * proximity. - */ -public class SimpleSnitch extends AbstractEndpointSnitch +public interface ILatencyPublisher { - public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses) - { - return new ArrayList<InetAddress>(addresses); - } - - public List<InetAddress> sortByProximity(final InetAddress address, List<InetAddress> addresses) - { - return addresses; - } + public void register(ILatencySubscriber subcriber); } Copied: cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencySubscriber.java (from r962683, cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java) URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencySubscriber.java?p2=cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencySubscriber.java&p1=cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java&r1=962683&r2=962720&rev=962720&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencySubscriber.java Fri Jul 9 23:07:00 2010 @@ -19,23 +19,8 @@ package org.apache.cassandra.locator; import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -/** - * A simple endpoint snitch implementation does not sort addresses by - * proximity. - */ -public class SimpleSnitch extends AbstractEndpointSnitch +public interface ILatencySubscriber { - public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses) - { - return new ArrayList<InetAddress>(addresses); - } - - public List<InetAddress> sortByProximity(final InetAddress address, List<InetAddress> addresses) - { - return addresses; - } + public void receiveTiming(InetAddress address, Double latency); } Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java?rev=962720&r1=962719&r2=962720&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java Fri Jul 9 23:07:00 2010 @@ -23,12 +23,24 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import org.apache.commons.lang.NotImplementedException; + /** * A simple endpoint snitch implementation does not sort addresses by * proximity. */ public class SimpleSnitch extends AbstractEndpointSnitch { + public String getRack(InetAddress endpoint) + { + throw new NotImplementedException(); + } + + public String getDatacenter(InetAddress endpoint) + { + throw new NotImplementedException(); + } + public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses) { return new ArrayList<InetAddress>(addresses); Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=962720&r1=962719&r2=962720&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Jul 9 23:07:00 2010 @@ -400,6 +400,16 @@ public class MessagingService implements return taskCompletionMap_.remove(key); } + public static long getRegisteredCallbackAge(String key) + { + return callbackMap_.getAge(key); + } + + public static long getAsyncResultAge(String key) + { + return taskCompletionMap_.getAge(key); + } + public static ExecutorService getDeserializationExecutor() { return messageDeserializerExecutor_; Modified: cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=962720&r1=962719&r2=962720&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Fri Jul 9 23:07:00 2010 @@ -18,21 +18,30 @@ package org.apache.cassandra.net; + +import java.util.*; +import java.net.InetAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.locator.ILatencyPublisher; +import org.apache.cassandra.locator.ILatencySubscriber; -public class ResponseVerbHandler implements IVerbHandler +public class ResponseVerbHandler implements IVerbHandler, ILatencyPublisher { private static final Logger logger_ = LoggerFactory.getLogger( ResponseVerbHandler.class ); - + private List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>(); + + public void doVerb(Message message) { String messageId = message.getMessageId(); IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId); + double age = 0; if (cb != null) { if (logger_.isDebugEnabled()) logger_.debug("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom()); + age = System.currentTimeMillis() - MessagingService.getRegisteredCallbackAge(messageId); cb.response(message); } else @@ -42,8 +51,23 @@ public class ResponseVerbHandler impleme { if (logger_.isDebugEnabled()) logger_.debug("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom()); + age = System.currentTimeMillis() - MessagingService.getAsyncResultAge(messageId); ar.result(message); } } + notifySubscribers(message.getFrom(), age); + } + + private void notifySubscribers(InetAddress host, double latency) + { + for (ILatencySubscriber subscriber : subscribers) + { + subscriber.receiveTiming(host, latency); + } + } + + public void register(ILatencySubscriber subscriber) + { + subscribers.add(subscriber); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=962720&r1=962719&r2=962720&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java Fri Jul 9 23:07:00 2010 @@ -129,6 +129,17 @@ public class ExpiringMap<K, V> return result; } + public long getAge(K key) + { + long age = 0; + CacheableObject<V> co = cache.get(key); + if (co != null) + { + age = co.age; + } + return age; + } + public int size() { return cache.size(); Added: cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java?rev=962720&view=auto ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java (added) +++ cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java Fri Jul 9 23:07:00 2010 @@ -0,0 +1,109 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.cassandra.locator; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; + +import org.junit.Test; + +import static org.junit.Assert.*; +import org.apache.cassandra.locator.DynamicEndpointSnitch; +import org.apache.cassandra.locator.SimpleSnitch; +import org.apache.cassandra.utils.FBUtilities; + +public class DynamicEndpointSnitchTest +{ + @Test + public void testSnitch() throws UnknownHostException, InterruptedException + { + DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(new SimpleSnitch()); + InetAddress self = FBUtilities.getLocalAddress(); + ArrayList<InetAddress> order = new ArrayList<InetAddress>(); + InetAddress host1 = InetAddress.getByName("127.0.0.1"); + InetAddress host2 = InetAddress.getByName("127.0.0.2"); + InetAddress host3 = InetAddress.getByName("127.0.0.3"); + + // first, make all hosts equal + for (int i = 0; i < 5; i++) + { + dsnitch.receiveTiming(host1, 1.0); + dsnitch.receiveTiming(host2, 1.0); + dsnitch.receiveTiming(host3, 1.0); + } + + Thread.sleep(1500); + + order.add(host1); + order.add(host2); + order.add(host3); + + assert dsnitch.getSortedListByProximity(self, order).equals(order); + + // make host1 a little worse + dsnitch.receiveTiming(host1, 2.0); + Thread.sleep(1500); + order.clear(); + + order.add(host2); + order.add(host3); + order.add(host1); + + assert dsnitch.getSortedListByProximity(self, order).equals(order); + + // make host2 a little worse + dsnitch.receiveTiming(host2, 2.0); + Thread.sleep(1500); + order.clear(); + + order.add(host3); + order.add(host2); + order.add(host1); + + assert dsnitch.getSortedListByProximity(self, order).equals(order); + + // make host3 the worst + for (int i = 0; i < 2; i++) + { + dsnitch.receiveTiming(host3, 2.0); + } + Thread.sleep(1500); + order.clear(); + + order.add(host2); + order.add(host1); + order.add(host3); + + // make host3 equal to the others + for (int i = 0; i < 2; i++) + { + dsnitch.receiveTiming(host3, 1.0); + } + Thread.sleep(1500); + order.clear(); + + order.add(host1); + order.add(host2); + order.add(host3); + + assert dsnitch.getSortedListByProximity(self, order).equals(order); + } +}