Author: jbellis Date: Tue Dec 28 03:19:34 2010 New Revision: 1053246 URL: http://svn.apache.org/viewvc?rev=1053246&view=rev Log: merge from 0.7
Added: cassandra/trunk/contrib/stress/ - copied from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/ cassandra/trunk/contrib/stress/bin/ - copied from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/bin/ cassandra/trunk/contrib/stress/bin/stress - copied unchanged from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/bin/stress cassandra/trunk/contrib/stress/build.xml - copied unchanged from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/build.xml cassandra/trunk/contrib/stress/src/ - copied from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/src/ cassandra/trunk/contrib/stress/src/org/ - copied from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/src/org/ cassandra/trunk/contrib/stress/src/org/apache/ - copied from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/ cassandra/trunk/contrib/stress/src/org/apache/cassandra/ - copied from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/ - copied from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/ - copied from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java - copied unchanged from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java - copied unchanged from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/tests/ - copied from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/tests/ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/tests/IndexedRangeSlicer.java - copied unchanged from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/tests/IndexedRangeSlicer.java cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/tests/Inserter.java - copied unchanged from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/tests/Inserter.java cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/tests/MultiGetter.java - copied unchanged from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/tests/MultiGetter.java cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/tests/RangeSlicer.java - copied unchanged from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/tests/RangeSlicer.java cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/tests/Reader.java - copied unchanged from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/tests/Reader.java cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/ - copied from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/util/ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/OperationThread.java - copied unchanged from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/util/OperationThread.java cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/Range.java - copied unchanged from r1053221, cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/util/Range.java cassandra/trunk/src/java/org/apache/cassandra/net/IMessageCallback.java - copied unchanged from r1053245, cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IMessageCallback.java Modified: cassandra/trunk/ (props changed) cassandra/trunk/CHANGES.txt cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed) cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncCallback.java cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java Propchange: cassandra/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Dec 28 03:19:34 2010 @@ -1,5 +1,5 @@ -/cassandra/branches/cassandra-0.6:922689-1051640,1051662 -/cassandra/branches/cassandra-0.7:1026517-1053205 +/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053244 +/cassandra/branches/cassandra-0.7:1026517-1053245 /incubator/cassandra/branches/cassandra-0.3:774578-796573 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350 /incubator/cassandra/branches/cassandra-0.5:888872-915439 Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1053246&r1=1053245&r2=1053246&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Tue Dec 28 03:19:34 2010 @@ -15,6 +15,8 @@ and seq scan operations (CASSANDRA-1470) * add OpenBitSet to support larger bloom filters (CASSANDRA-1555) * handle URL-specified log4j regression (CASSANDRA-1907) + * enable keepalive on intra-cluster sockets (CASSANDRA-1766) + * count timeouts towards dynamicsnitch latencies (CASSANDRA-1905) 0.7.0-rc3 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Dec 28 03:19:34 2010 @@ -1,5 +1,5 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1051640,1051662 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026517-1053205 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053244 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026517-1053245 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-903502 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Dec 28 03:19:34 2010 @@ -1,5 +1,5 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1051640,1051662 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026517-1053205 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053244 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026517-1053245 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-903502 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Dec 28 03:19:34 2010 @@ -1,5 +1,5 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1051640,1051662 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026517-1053205 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053244 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026517-1053245 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-903502 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Dec 28 03:19:34 2010 @@ -1,5 +1,5 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1051640,1051662 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026517-1053205 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053244 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026517-1053245 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-903502 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Dec 28 03:19:34 2010 @@ -1,5 +1,5 @@ -/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1051640,1051662 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026517-1053205 +/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053244 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026517-1053245 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-903502 Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1053246&r1=1053245&r2=1053246&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Tue Dec 28 03:19:34 2010 @@ -39,17 +39,17 @@ import org.apache.cassandra.utils.FBUtil */ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { - private static int UPDATES_PER_INTERVAL = 10000; - private static int UPDATE_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicUpdateInterval(); - private static int RESET_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicResetInterval(); - private static double BADNESS_THRESHOLD = DatabaseDescriptor.getDynamicBadnessThreshold(); - private static int WINDOW_SIZE = 100; + private static final int UPDATES_PER_INTERVAL = 10000; + private static final int UPDATE_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicUpdateInterval(); + private static final int RESET_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicResetInterval(); + private static final double BADNESS_THRESHOLD = DatabaseDescriptor.getDynamicBadnessThreshold(); + private static final int WINDOW_SIZE = 100; private boolean registered = false; - private ConcurrentHashMap<InetAddress, Double> scores = new ConcurrentHashMap(); - private ConcurrentHashMap<InetAddress, AdaptiveLatencyTracker> windows = new ConcurrentHashMap(); - private AtomicInteger intervalupdates = new AtomicInteger(0); - public IEndpointSnitch subsnitch; + private final ConcurrentHashMap<InetAddress, Double> scores = new ConcurrentHashMap<InetAddress, Double>(); + private final ConcurrentHashMap<InetAddress, AdaptiveLatencyTracker> windows = new ConcurrentHashMap<InetAddress, AdaptiveLatencyTracker>(); + private final AtomicInteger intervalupdates = new AtomicInteger(0); + public final IEndpointSnitch subsnitch; public DynamicEndpointSnitch(IEndpointSnitch snitch) { @@ -213,8 +213,8 @@ public class DynamicEndpointSnitch exten /** a threadsafe version of BoundedStatsDeque+ArrivalWindow with modification for arbitrary times **/ class AdaptiveLatencyTracker extends AbstractStatsDeque { - private LinkedBlockingDeque<Double> latencies; - private static double SENTINEL_COMPARE = 0.0001; // arbitrary; as long as it is the same across hosts it doesn't matter + private final LinkedBlockingDeque<Double> latencies; + private static final double SENTINEL_COMPARE = 0.0001; // arbitrary; as long as it is the same across hosts it doesn't matter AdaptiveLatencyTracker(int size) { Modified: cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1053246&r1=1053245&r2=1053246&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java Tue Dec 28 03:19:34 2010 @@ -94,6 +94,8 @@ class AsyncResult implements IAsyncResul { lock.unlock(); } + + MessagingService.removeRegisteredCallback(response.getMessageId()); } public InetAddress getFrom() Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncCallback.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncCallback.java?rev=1053246&r1=1053245&r2=1053246&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncCallback.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncCallback.java Tue Dec 28 03:19:34 2010 @@ -24,7 +24,7 @@ package org.apache.cassandra.net; * service. In particular, if any shared state is referenced, making * response alone synchronized will not suffice. */ -public interface IAsyncCallback +public interface IAsyncCallback extends IMessageCallback { /** * @param msg response received. Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java?rev=1053246&r1=1053245&r2=1053246&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java Tue Dec 28 03:19:34 2010 @@ -22,7 +22,7 @@ import java.net.InetAddress; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -public interface IAsyncResult +public interface IAsyncResult extends IMessageCallback { /** * Same operation as the above get() but allows the calling Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1053246&r1=1053245&r2=1053246&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Tue Dec 28 03:19:34 2010 @@ -36,6 +36,9 @@ import java.util.concurrent.atomic.Atomi import javax.management.MBeanServer; import javax.management.ObjectName; +import com.google.common.base.Function; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +46,8 @@ import org.apache.cassandra.concurrent.D import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.locator.ILatencyPublisher; +import org.apache.cassandra.locator.ILatencySubscriber; import org.apache.cassandra.net.io.SerializerType; import org.apache.cassandra.net.sink.SinkManager; import org.apache.cassandra.service.GCInspector; @@ -54,7 +59,7 @@ import org.apache.cassandra.utils.GuidGe import org.apache.cassandra.utils.SimpleCondition; import org.cliffc.high_scale_lib.NonBlockingHashMap; -public class MessagingService implements MessagingServiceMBean +public class MessagingService implements MessagingServiceMBean, ILatencyPublisher { private static int version_ = 1; //TODO: make this parameter dynamic somehow. Not sure if config is appropriate. @@ -64,9 +69,9 @@ public class MessagingService implements public static final int PROTOCOL_MAGIC = 0xCA552DFA; /* This records all the results mapped by message Id */ - private static ExpiringMap<String, IAsyncCallback> callbackMap_; - private static ExpiringMap<String, IAsyncResult> taskCompletionMap_; - + private static ExpiringMap<String, IMessageCallback> callbacks; + private static Multimap<String, InetAddress> targets; + /* Lookup table for registering message handlers based on the verb. */ private static Map<StorageService.Verb, IVerbHandler> verbHandlers_; @@ -83,6 +88,8 @@ public class MessagingService implements private SocketThread socketThread; private SimpleCondition listenGate; private static final Map<StorageService.Verb, AtomicInteger> droppedMessages = new EnumMap<StorageService.Verb, AtomicInteger>(StorageService.Verb.class); + private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>(); + static { for (StorageService.Verb verb : StorageService.Verb.values()) @@ -99,15 +106,6 @@ public class MessagingService implements { listenGate = new SimpleCondition(); verbHandlers_ = new EnumMap<StorageService.Verb, IVerbHandler>(StorageService.Verb.class); - /* - * Leave callbacks in the cachetable long enough that any related messages will arrive - * before the callback is evicted from the table. The concurrency level is set at 128 - * which is the sum of the threads in the pool that adds shit into the table and the - * pool that retrives the callback from here. - */ - callbackMap_ = new ExpiringMap<String, IAsyncCallback>((long) (1.1 * DatabaseDescriptor.getRpcTimeout())); - taskCompletionMap_ = new ExpiringMap<String, IAsyncResult>((long) (1.1 * DatabaseDescriptor.getRpcTimeout())); - streamExecutor_ = new DebuggableThreadPoolExecutor("Streaming", DatabaseDescriptor.getCompactionThreadPriority()); Runnable logDropped = new Runnable() { @@ -118,6 +116,26 @@ public class MessagingService implements }; StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS); + Function<String, ?> timeoutReporter = new Function<String, Object>() + { + public Object apply(String messageId) + { + Collection<InetAddress> addresses = targets.removeAll(messageId); + if (addresses == null) + return null; + + for (InetAddress address : addresses) + { + for (ILatencySubscriber subscriber : subscribers) + subscriber.receiveTiming(address, (double) DatabaseDescriptor.getRpcTimeout()); + } + + return null; + } + }; + targets = ArrayListMultimap.create(); + callbacks = new ExpiringMap<String, IMessageCallback>((long) (1.1 * DatabaseDescriptor.getRpcTimeout()), timeoutReporter); + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { @@ -230,6 +248,7 @@ public class MessagingService implements addCallback(cb, messageId); for (InetAddress endpoint : to) { + targets.put(messageId, endpoint); sendOneWay(message, endpoint); } return messageId; @@ -237,7 +256,7 @@ public class MessagingService implements public void addCallback(IAsyncCallback cb, String messageId) { - callbackMap_.put(messageId, cb); + callbacks.put(messageId, cb); } /** @@ -254,6 +273,7 @@ public class MessagingService implements { String messageId = message.getMessageId(); addCallback(cb, messageId); + targets.put(messageId, to); sendOneWay(message, to); return messageId; } @@ -280,6 +300,7 @@ public class MessagingService implements for ( int i = 0; i < messages.length; ++i ) { messages[i].setMessageId(groupId); + targets.put(groupId, to.get(i)); sendOneWay(messages[i], to.get(i)); } return groupId; @@ -332,7 +353,8 @@ public class MessagingService implements public IAsyncResult sendRR(Message message, InetAddress to) { IAsyncResult iar = new AsyncResult(); - taskCompletionMap_.put(message.getMessageId(), iar); + callbacks.put(message.getMessageId(), iar); + targets.put(message.getMessageId(), to); sendOneWay(message, to); return iar; } @@ -350,6 +372,11 @@ public class MessagingService implements streamExecutor_.execute(new FileStreamTask(header, to)); } + public void register(ILatencySubscriber subcriber) + { + subscribers.add(subcriber); + } + /** blocks until the processing pools are empty and done. */ public static void waitFor() throws InterruptedException { @@ -371,10 +398,7 @@ public class MessagingService implements } streamExecutor_.shutdownNow(); - - /* shut down the cachetables */ - taskCompletionMap_.shutdown(); - callbackMap_.shutdown(); + callbacks.shutdown(); logger_.info("Shutdown complete (no further commands will be processed)"); } @@ -391,29 +415,25 @@ public class MessagingService implements stage.execute(runnable); } - public static IAsyncCallback getRegisteredCallback(String key) - { - return callbackMap_.get(key); - } - - public static void removeRegisteredCallback(String key) + public static IMessageCallback getRegisteredCallback(String messageId) { - callbackMap_.remove(key); + return callbacks.get(messageId); } - public static IAsyncResult getAsyncResult(String key) + public static IMessageCallback removeRegisteredCallback(String messageId) { - return taskCompletionMap_.remove(key); + targets.removeAll(messageId); // TODO fix this when we clean up quorum reads to do proper RR + return callbacks.remove(messageId); } - public static long getRegisteredCallbackAge(String key) + public static long getRegisteredCallbackAge(String messageId) { - return callbackMap_.getAge(key); + return callbacks.getAge(messageId); } - public static long getAsyncResultAge(String key) + public static void responseReceivedFrom(String messageId, InetAddress from) { - return taskCompletionMap_.getAge(key); + targets.remove(messageId, from); } public static void validateMagic(int magic) throws IOException Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1053246&r1=1053245&r2=1053246&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Tue Dec 28 03:19:34 2010 @@ -162,6 +162,7 @@ public class OutboundTcpConnection exten { // zero means 'bind on any available port.' socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0); + socket.setKeepAlive(true); socket.setTcpNoDelay(true); output = new DataOutputStream(socket.getOutputStream()); return true; Modified: cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1053246&r1=1053245&r2=1053246&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Tue Dec 28 03:19:34 2010 @@ -18,7 +18,6 @@ package org.apache.cassandra.net; - import java.net.InetAddress; import java.util.ArrayList; import java.util.List; @@ -37,35 +36,28 @@ public class ResponseVerbHandler impleme public void doVerb(Message message) { - String messageId = message.getMessageId(); - IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId); - double age = 0; - if (cb != null) + String messageId = message.getMessageId(); + MessagingService.responseReceivedFrom(messageId, message.getFrom()); + double age = System.currentTimeMillis() - MessagingService.getRegisteredCallbackAge(messageId); + IMessageCallback cb = MessagingService.getRegisteredCallback(messageId); + if (cb == null) + return; + + // if cb is not null, then age will be valid + for (ILatencySubscriber subscriber : subscribers) + subscriber.receiveTiming(message.getFrom(), age); + + if (cb instanceof IAsyncCallback) { if (logger_.isDebugEnabled()) logger_.debug("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom()); - age = System.currentTimeMillis() - MessagingService.getRegisteredCallbackAge(messageId); - cb.response(message); + ((IAsyncCallback) cb).response(message); } else { - IAsyncResult ar = MessagingService.getAsyncResult(messageId); - if (ar != null) - { - if (logger_.isDebugEnabled()) - logger_.debug("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom()); - age = System.currentTimeMillis() - MessagingService.getAsyncResultAge(messageId); - ar.result(message); - } - } - notifySubscribers(message.getFrom(), age); - } - - private void notifySubscribers(InetAddress host, double latency) - { - for (ILatencySubscriber subscriber : subscribers) - { - subscriber.receiveTiming(host, latency); + if (logger_.isDebugEnabled()) + logger_.debug("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom()); + ((IAsyncResult) cb).result(message); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1053246&r1=1053245&r2=1053246&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java Tue Dec 28 03:19:34 2010 @@ -22,7 +22,9 @@ import java.util.Enumeration; import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.Callable; +import com.google.common.base.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +33,7 @@ import org.cliffc.high_scale_lib.NonBloc public class ExpiringMap<K, V> { private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class); + private final Function<K, ?> postExpireHook; private static class CacheableObject<T> { @@ -76,6 +79,7 @@ public class ExpiringMap<K, V> if (co != null && co.isReadyToDie(expiration)) { cache.remove(key); + postExpireHook.apply(key); } } } @@ -86,12 +90,18 @@ public class ExpiringMap<K, V> private final Timer timer; private static int counter = 0; - /* - * Specify the TTL for objects in the cache - * in milliseconds. - */ public ExpiringMap(long expiration) { + this(expiration, null); + } + + /** + * + * @param expiration the TTL for objects in the cache in milliseconds + */ + public ExpiringMap(long expiration, Function<K, ?> postExpireHook) + { + this.postExpireHook = postExpireHook; if (expiration <= 0) { throw new IllegalArgumentException("Argument specified must be a positive number");