Author: jbellis Date: Mon Aug 22 21:36:04 2011 New Revision: 1160449 URL: http://svn.apache.org/viewvc?rev=1160449&view=rev Log: expose rpc timeouts per host in MessagingServiceMBean patch by Melvin Wang; reviewed by jbellis for CASSANDRA-2941
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingServiceMBean.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1160449&r1=1160448&r2=1160449&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon Aug 22 21:36:04 2011 @@ -26,6 +26,7 @@ (CASSANDRA-2662) * avoid retaining references to dropped CFS objects in CompactionManager.estimatedCompactions (CASSANDRA-2708) + * expose rpc timeouts per host in MessagingServiceMBean (CASSANDRA-2941) 0.8.4 Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java?rev=1160449&r1=1160448&r2=1160449&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java Mon Aug 22 21:36:04 2011 @@ -29,6 +29,7 @@ import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -102,6 +103,10 @@ public final class MessagingService impl private final Map<StorageService.Verb, Integer> lastDropped = Collections.synchronizedMap(new EnumMap<StorageService.Verb, Integer>(StorageService.Verb.class)); private final Map<StorageService.Verb, Integer> lastDroppedInternal = new EnumMap<StorageService.Verb, Integer>(StorageService.Verb.class); + private long totalTimeouts = 0; + private long recentTotalTimeouts = 0; + private final Map<String, AtomicLong> timeoutsPerHost = new HashMap<String, AtomicLong>(); + private final Map<String, AtomicLong> recentTimeoutsPerHost = new HashMap<String, AtomicLong>(); private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>(); private static final long DEFAULT_CALLBACK_TIMEOUT = (long) (1.1 * DatabaseDescriptor.getRpcTimeout()); @@ -141,6 +146,17 @@ public final class MessagingService impl { Pair<InetAddress, IMessageCallback> expiredValue = pair.right; maybeAddLatency(expiredValue.right, expiredValue.left, (double) DatabaseDescriptor.getRpcTimeout()); + totalTimeouts++; + String ip = expiredValue.left.getHostAddress(); + AtomicLong c = timeoutsPerHost.get(ip); + if (c == null) + c = timeoutsPerHost.put(ip, new AtomicLong()); + c.incrementAndGet(); + // we only create AtomicLong instances here, so that the write + // access to the hashmap happens single-threadedly. + if (recentTimeoutsPerHost.get(ip) == null) + recentTimeoutsPerHost.put(ip, new AtomicLong()); + return null; } }; @@ -695,4 +711,39 @@ public final class MessagingService impl } return map; } + + public long getTotalTimeouts() + { + return totalTimeouts; + } + + public long getRecentTotalTimouts() + { + long recent = totalTimeouts - recentTotalTimeouts; + recentTotalTimeouts = totalTimeouts; + return recent; + } + + public Map<String, Long> getTimeoutsPerHost() + { + Map<String, Long> result = new HashMap<String, Long>(); + for (Map.Entry<String, AtomicLong> entry: timeoutsPerHost.entrySet()) + { + result.put(entry.getKey(), entry.getValue().get()); + } + return result; + } + + public Map<String, Long> getRecentTimeoutsPerHost() + { + Map<String, Long> result = new HashMap<String, Long>(); + for (Map.Entry<String, AtomicLong> entry: recentTimeoutsPerHost.entrySet()) + { + String ip = entry.getKey(); + AtomicLong recent = entry.getValue(); + Long timeout = timeoutsPerHost.get(ip).get(); + result.put(ip, timeout - recent.getAndSet(timeout)); + } + return result; + } } Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingServiceMBean.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingServiceMBean.java?rev=1160449&r1=1160448&r2=1160449&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingServiceMBean.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingServiceMBean.java Mon Aug 22 21:36:04 2011 @@ -59,4 +59,24 @@ public interface MessagingServiceMBean * dropped message counts since last called */ public Map<String, Integer> getRecentlyDroppedMessages(); + + /** + * Total number of timeouts happened on this node + */ + public long getTotalTimeouts(); + + /** + * Number of timeouts per host + */ + public Map<String, Long> getTimeoutsPerHost(); + + /** + * Number of timeouts since last check. + */ + public long getRecentTotalTimouts(); + + /** + * Number of timeouts since last check per host. + */ + public Map<String, Long> getRecentTimeoutsPerHost(); }