Add latency logging for dropped messages

Patch by akale; reviewed by pmotta for CASSANDRA-10580


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c9ef25fd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c9ef25fd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c9ef25fd

Branch: refs/heads/trunk
Commit: c9ef25fd81501005b6484baf064081efc557f3f4
Parents: bd5c8bb
Author: anubhavkale <anubh...@microsoft.com>
Authored: Tue Dec 15 21:39:16 2015 -0800
Committer: Joshua McKenzie <jmcken...@apache.org>
Committed: Thu Dec 24 08:01:53 2015 -0500

----------------------------------------------------------------------
 .../cassandra/db/ReadCommandVerbHandler.java    |  2 +-
 .../metrics/DroppedMessageMetrics.java          | 10 ++++++
 .../cassandra/net/MessageDeliveryTask.java      |  5 +--
 .../apache/cassandra/net/MessagingService.java  | 37 ++++++++++++++++----
 .../apache/cassandra/service/StorageProxy.java  | 13 ++++---
 .../cassandra/net/MessagingServiceTest.java     | 10 +++---
 6 files changed, 57 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java 
b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 9eaa8fa..b2fb876 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -53,7 +53,7 @@ public class ReadCommandVerbHandler implements 
IVerbHandler<ReadCommand>
         if (!command.complete())
         {
             Tracing.trace("Discarding partial response to {} (timed out)", 
message.from);
-            MessagingService.instance().incrementDroppedMessages(message);
+            MessagingService.instance().incrementDroppedMessages(message, 
System.currentTimeMillis() - message.constructionTime.timestamp);
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java 
b/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java
index 58c80fb..2a94c9f 100644
--- a/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java
@@ -18,6 +18,8 @@
 package org.apache.cassandra.metrics;
 
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+
 import org.apache.cassandra.net.MessagingService;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
@@ -30,9 +32,17 @@ public class DroppedMessageMetrics
     /** Number of dropped messages */
     public final Meter dropped;
 
+    /** The dropped latency within node */
+    public final Timer internalDroppedLatency;
+
+    /** The cross node dropped latency */
+    public final Timer crossNodeDroppedLatency;
+
     public DroppedMessageMetrics(MessagingService.Verb verb)
     {
         MetricNameFactory factory = new DefaultNameFactory("DroppedMessage", 
verb.toString());
         dropped = Metrics.meter(factory.createMetricName("Dropped"));
+        internalDroppedLatency = 
Metrics.timer(factory.createMetricName("InternalDroppedLatency"));
+        crossNodeDroppedLatency = 
Metrics.timer(factory.createMetricName("CrossNodeDroppedLatency"));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java 
b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index 818cfc6..d9f8b7c 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -44,10 +44,11 @@ public class MessageDeliveryTask implements Runnable
     public void run()
     {
         MessagingService.Verb verb = message.verb;
+        long timeTaken = System.currentTimeMillis() - 
message.constructionTime.timestamp;
         if (MessagingService.DROPPABLE_VERBS.contains(verb)
-            && System.currentTimeMillis() > message.constructionTime.timestamp 
+ message.getTimeout())
+            && timeTaken > message.getTimeout())
         {
-            MessagingService.instance().incrementDroppedMessages(message);
+            MessagingService.instance().incrementDroppedMessages(message, 
timeTaken);
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index fab082a..d95c49b 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -957,9 +957,20 @@ public final class MessagingService implements 
MessagingServiceMBean
         incrementDroppedMessages(verb, false);
     }
 
-    public void incrementDroppedMessages(MessageIn message)
+    public void incrementDroppedMessages(Verb verb, long timeTaken)
     {
-        incrementDroppedMessages(message.verb, 
message.constructionTime.isCrossNode);
+        incrementDroppedMessages(verb, timeTaken, false);
+    }
+
+    public void incrementDroppedMessages(MessageIn message, long timeTaken)
+    {
+        incrementDroppedMessages(message.verb, timeTaken, 
message.constructionTime.isCrossNode);
+    }
+
+    public void incrementDroppedMessages(Verb verb, long timeTaken, boolean 
isCrossNodeTimeout)
+    {
+        assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not 
legally be dropped";
+        incrementDroppedMessages(droppedMessagesMap.get(verb), timeTaken, 
isCrossNodeTimeout);
     }
 
     public void incrementDroppedMessages(Verb verb, boolean isCrossNodeTimeout)
@@ -968,6 +979,15 @@ public final class MessagingService implements 
MessagingServiceMBean
         incrementDroppedMessages(droppedMessagesMap.get(verb), 
isCrossNodeTimeout);
     }
 
+    private void incrementDroppedMessages(DroppedMessages droppedMessages, 
long timeTaken, boolean isCrossNodeTimeout)
+    {
+        if (isCrossNodeTimeout)
+            droppedMessages.metrics.crossNodeDroppedLatency.update(timeTaken, 
TimeUnit.MILLISECONDS);
+        else
+            droppedMessages.metrics.internalDroppedLatency.update(timeTaken, 
TimeUnit.MILLISECONDS);
+        incrementDroppedMessages(droppedMessages, isCrossNodeTimeout);
+    }
+
     private void incrementDroppedMessages(DroppedMessages droppedMessages, 
boolean isCrossNodeTimeout)
     {
         droppedMessages.metrics.dropped.mark();
@@ -1000,11 +1020,14 @@ public final class MessagingService implements 
MessagingServiceMBean
             int droppedCrossNodeTimeout = 
droppedMessages.droppedCrossNodeTimeout.getAndSet(0);
             if (droppedInternalTimeout > 0 || droppedCrossNodeTimeout > 0)
             {
-                ret.add(String.format("%s messages were dropped in last %d ms: 
%d for internal timeout and %d for cross node timeout",
-                                      verb,
-                                      LOG_DROPPED_INTERVAL_IN_MS,
-                                      droppedInternalTimeout,
-                                      droppedCrossNodeTimeout));
+                ret.add(String.format("%s messages were dropped in last %d ms: 
%d for internal timeout and %d for cross node timeout."
+                                     + " Mean internal dropped latency: %d ms 
and Mean cross-node dropped latency: %d ms",
+                                     verb,
+                                     LOG_DROPPED_INTERVAL_IN_MS,
+                                     droppedInternalTimeout,
+                                     droppedCrossNodeTimeout,
+                                     
TimeUnit.NANOSECONDS.toMillis((long)droppedMessages.metrics.internalDroppedLatency.getSnapshot().getMean()),
+                                     
TimeUnit.NANOSECONDS.toMillis((long)droppedMessages.metrics.crossNodeDroppedLatency.getSnapshot().getMean())));
             }
         }
         return ret;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index f161607..2e32f16 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1732,7 +1732,7 @@ public class StorageProxy implements StorageProxyMBean
                 }
                 else
                 {
-                    MessagingService.instance().incrementDroppedMessages(verb);
+                    MessagingService.instance().incrementDroppedMessages(verb, 
System.currentTimeMillis() - constructionTime);
                     handler.onFailure(FBUtilities.getBroadcastAddress());
                 }
 
@@ -2383,9 +2383,10 @@ public class StorageProxy implements StorageProxyMBean
 
         public final void run()
         {
-            if (System.currentTimeMillis() > constructionTime + timeout)
+            long timeTaken = System.currentTimeMillis() - constructionTime;
+            if (timeTaken > timeout)
             {
-                MessagingService.instance().incrementDroppedMessages(verb);
+                MessagingService.instance().incrementDroppedMessages(verb, 
timeTaken);
                 return;
             }
             try
@@ -2411,9 +2412,11 @@ public class StorageProxy implements StorageProxyMBean
 
         public final void run()
         {
-            if (System.currentTimeMillis() > constructionTime + 
DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION))
+            long mutationTimeout = 
DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION);
+            long timeTaken = System.currentTimeMillis() - constructionTime;
+            if (timeTaken > mutationTimeout)
             {
-                
MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION);
+                
MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION,
 timeTaken);
                 HintRunnable runnable = new 
HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress()))
                 {
                     protected void runMayThrow() throws Exception

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java 
b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index 75c146e..3b9c957 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -15,22 +15,22 @@ public class MessagingServiceTest
     {
         MessagingService.Verb verb = MessagingService.Verb.READ;
 
-        for (int i = 0; i < 5000; i++)
-            messagingService.incrementDroppedMessages(verb, i % 2 == 0);
+        for (int i = 1; i <= 5000; i++)
+            messagingService.incrementDroppedMessages(verb, i, i % 2 == 0);
 
         List<String> logs = messagingService.getDroppedMessagesLogs();
         assertEquals(1, logs.size());
-        assertEquals("READ messages were dropped in last 5000 ms: 2500 for 
internal timeout and 2500 for cross node timeout", logs.get(0));
+        assertEquals("READ messages were dropped in last 5000 ms: 2500 for 
internal timeout and 2500 for cross node timeout. Mean internal dropped 
latency: 2730 ms and Mean cross-node dropped latency: 2731 ms", logs.get(0));
         assertEquals(5000, 
(int)messagingService.getDroppedMessages().get(verb.toString()));
 
         logs = messagingService.getDroppedMessagesLogs();
         assertEquals(0, logs.size());
 
         for (int i = 0; i < 2500; i++)
-            messagingService.incrementDroppedMessages(verb, i % 2 == 0);
+            messagingService.incrementDroppedMessages(verb, i, i % 2 == 0);
 
         logs = messagingService.getDroppedMessagesLogs();
-        assertEquals("READ messages were dropped in last 5000 ms: 1250 for 
internal timeout and 1250 for cross node timeout", logs.get(0));
+        assertEquals("READ messages were dropped in last 5000 ms: 1250 for 
internal timeout and 1250 for cross node timeout. Mean internal dropped 
latency: 2277 ms and Mean cross-node dropped latency: 2278 ms", logs.get(0));
         assertEquals(7500, 
(int)messagingService.getDroppedMessages().get(verb.toString()));
     }
 

Reply via email to