Author: jbellis Date: Wed Jan 19 14:43:37 2011 New Revision: 1060828 URL: http://svn.apache.org/viewvc?rev=1060828&view=rev Log: add latency information fromlocal reads to DynamicSnitch patch by brandonwilliams and jbellis for CASSANDRA-2004
Removed: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencyPublisher.java Modified: cassandra/branches/cassandra-0.6/CHANGES.txt cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java Modified: cassandra/branches/cassandra-0.6/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1060828&r1=1060827&r2=1060828&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.6/CHANGES.txt Wed Jan 19 14:43:37 2011 @@ -2,6 +2,7 @@ * buffer network stack to avoid inefficient small TCP messages while avoiding the nagle/delayed ack problem (CASSANDRA-1896) * fix race condition in MessagingService.targets (CASSANDRA-1959) + * add latency information from local reads to DynamicSnitch (CASSANDRA-2004) 0.6.9 Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1060828&r1=1060827&r2=1060828&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Wed Jan 19 14:43:37 2011 @@ -38,10 +38,11 @@ import org.apache.cassandra.utils.FBUtil */ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { - private static int UPDATES_PER_INTERVAL = 10000; - private static int UPDATE_INTERVAL_IN_MS = 100; - private static int RESET_INTERVAL_IN_MS = 60000 * 10; - private static int WINDOW_SIZE = 100; + private static final int UPDATES_PER_INTERVAL = 10000; + private static final int UPDATE_INTERVAL_IN_MS = 100; + private static final int RESET_INTERVAL_IN_MS = 60000 * 10; + private static final int WINDOW_SIZE = 100; + private boolean registered = false; private ConcurrentHashMap<InetAddress, Double> scores = new ConcurrentHashMap(); @@ -151,13 +152,8 @@ public class DynamicEndpointSnitch exten { if (!registered) { - ILatencyPublisher handler = (ILatencyPublisher)MessagingService.instance.getVerbHandler(StorageService.Verb.READ_RESPONSE); - if (handler != null) - { - handler.register(this); - registered = true; - } - + MessagingService.instance.register(this); + registered = true; } for (Map.Entry<InetAddress, AdaptiveLatencyTracker> entry: windows.entrySet()) { Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=1060828&r1=1060827&r2=1060828&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 19 14:43:37 2011 @@ -35,16 +35,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Function; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; import org.apache.log4j.Logger; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.locator.ILatencyPublisher; import org.apache.cassandra.locator.ILatencySubscriber; import org.apache.cassandra.net.io.SerializerType; import org.apache.cassandra.net.sink.SinkManager; @@ -56,7 +52,7 @@ import org.apache.cassandra.utils.Simple import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.cliffc.high_scale_lib.NonBlockingHashSet; -public class MessagingService implements ILatencyPublisher +public class MessagingService { private static int version_ = 1; //TODO: make this parameter dynamic somehow. Not sure if config is appropriate. @@ -116,10 +112,7 @@ public class MessagingService implements return null; for (InetAddress address : addresses) - { - for (ILatencySubscriber subscriber : subscribers) - subscriber.receiveTiming(address, (double) DatabaseDescriptor.getRpcTimeout()); - } + addLatency(address, (double) DatabaseDescriptor.getRpcTimeout()); return null; } @@ -140,6 +133,12 @@ public class MessagingService implements timer.schedule(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS); } + public void addLatency(InetAddress address, double latency) + { + for (ILatencySubscriber subscriber : subscribers) + subscriber.receiveTiming(address, latency); + } + public byte[] hash(String type, byte data[]) { byte result[]; Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1060828&r1=1060827&r2=1060828&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Wed Jan 19 14:43:37 2011 @@ -18,19 +18,12 @@ package org.apache.cassandra.net; -import java.util.ArrayList; -import java.util.List; - import org.apache.log4j.Logger; -import org.apache.cassandra.locator.ILatencyPublisher; -import org.apache.cassandra.locator.ILatencySubscriber; - -public class ResponseVerbHandler implements IVerbHandler, ILatencyPublisher +public class ResponseVerbHandler implements IVerbHandler { private static final Logger logger_ = Logger.getLogger( ResponseVerbHandler.class ); - private List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>(); - + public void doVerb(Message message) { String messageId = message.getMessageId(); @@ -41,8 +34,7 @@ public class ResponseVerbHandler impleme return; // if cb is not null, then age will be valid - for (ILatencySubscriber subscriber : subscribers) - subscriber.receiveTiming(message.getFrom(), age); + MessagingService.instance.addLatency(message.getFrom(), age); if (cb instanceof IAsyncCallback) { @@ -58,8 +50,4 @@ public class ResponseVerbHandler impleme } } - public void register(ILatencySubscriber subscriber) - { - subscribers.add(subscriber); - } } Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1060828&r1=1060827&r2=1060828&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java Wed Jan 19 14:43:37 2011 @@ -702,7 +702,8 @@ public class StorageProxy implements Sto static class weakReadLocalCallable implements Callable<Object> { - private ReadCommand command; + private final ReadCommand command; + private final long start = System.currentTimeMillis(); weakReadLocalCallable(ReadCommand command) { @@ -718,6 +719,7 @@ public class StorageProxy implements Sto Row row = command.getRow(table); StorageService.instance.doConsistencyCheck(row, command, FBUtilities.getLocalAddress()); + MessagingService.instance.addLatency(FBUtilities.getLocalAddress(), System.currentTimeMillis() - start); return row; } }