Add latency logging for dropped messages Patch by akale; reviewed by pmotta for CASSANDRA-10580
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c9ef25fd Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c9ef25fd Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c9ef25fd Branch: refs/heads/trunk Commit: c9ef25fd81501005b6484baf064081efc557f3f4 Parents: bd5c8bb Author: anubhavkale <anubh...@microsoft.com> Authored: Tue Dec 15 21:39:16 2015 -0800 Committer: Joshua McKenzie <jmcken...@apache.org> Committed: Thu Dec 24 08:01:53 2015 -0500 ---------------------------------------------------------------------- .../cassandra/db/ReadCommandVerbHandler.java | 2 +- .../metrics/DroppedMessageMetrics.java | 10 ++++++ .../cassandra/net/MessageDeliveryTask.java | 5 +-- .../apache/cassandra/net/MessagingService.java | 37 ++++++++++++++++---- .../apache/cassandra/service/StorageProxy.java | 13 ++++--- .../cassandra/net/MessagingServiceTest.java | 10 +++--- 6 files changed, 57 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java index 9eaa8fa..b2fb876 100644 --- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java @@ -53,7 +53,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand> if (!command.complete()) { Tracing.trace("Discarding partial response to {} (timed out)", message.from); - MessagingService.instance().incrementDroppedMessages(message); + MessagingService.instance().incrementDroppedMessages(message, System.currentTimeMillis() - message.constructionTime.timestamp); return; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java b/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java index 58c80fb..2a94c9f 100644 --- a/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java +++ b/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java @@ -18,6 +18,8 @@ package org.apache.cassandra.metrics; import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; + import org.apache.cassandra.net.MessagingService; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; @@ -30,9 +32,17 @@ public class DroppedMessageMetrics /** Number of dropped messages */ public final Meter dropped; + /** The dropped latency within node */ + public final Timer internalDroppedLatency; + + /** The cross node dropped latency */ + public final Timer crossNodeDroppedLatency; + public DroppedMessageMetrics(MessagingService.Verb verb) { MetricNameFactory factory = new DefaultNameFactory("DroppedMessage", verb.toString()); dropped = Metrics.meter(factory.createMetricName("Dropped")); + internalDroppedLatency = Metrics.timer(factory.createMetricName("InternalDroppedLatency")); + crossNodeDroppedLatency = Metrics.timer(factory.createMetricName("CrossNodeDroppedLatency")); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/net/MessageDeliveryTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java index 818cfc6..d9f8b7c 100644 --- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java +++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java @@ -44,10 +44,11 @@ public class MessageDeliveryTask implements Runnable public void run() { MessagingService.Verb verb = message.verb; + long timeTaken = System.currentTimeMillis() - message.constructionTime.timestamp; if (MessagingService.DROPPABLE_VERBS.contains(verb) - && System.currentTimeMillis() > message.constructionTime.timestamp + message.getTimeout()) + && timeTaken > message.getTimeout()) { - MessagingService.instance().incrementDroppedMessages(message); + MessagingService.instance().incrementDroppedMessages(message, timeTaken); return; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index fab082a..d95c49b 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -957,9 +957,20 @@ public final class MessagingService implements MessagingServiceMBean incrementDroppedMessages(verb, false); } - public void incrementDroppedMessages(MessageIn message) + public void incrementDroppedMessages(Verb verb, long timeTaken) { - incrementDroppedMessages(message.verb, message.constructionTime.isCrossNode); + incrementDroppedMessages(verb, timeTaken, false); + } + + public void incrementDroppedMessages(MessageIn message, long timeTaken) + { + incrementDroppedMessages(message.verb, timeTaken, message.constructionTime.isCrossNode); + } + + public void incrementDroppedMessages(Verb verb, long timeTaken, boolean isCrossNodeTimeout) + { + assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped"; + incrementDroppedMessages(droppedMessagesMap.get(verb), timeTaken, isCrossNodeTimeout); } public void incrementDroppedMessages(Verb verb, boolean isCrossNodeTimeout) @@ -968,6 +979,15 @@ public final class MessagingService implements MessagingServiceMBean incrementDroppedMessages(droppedMessagesMap.get(verb), isCrossNodeTimeout); } + private void incrementDroppedMessages(DroppedMessages droppedMessages, long timeTaken, boolean isCrossNodeTimeout) + { + if (isCrossNodeTimeout) + droppedMessages.metrics.crossNodeDroppedLatency.update(timeTaken, TimeUnit.MILLISECONDS); + else + droppedMessages.metrics.internalDroppedLatency.update(timeTaken, TimeUnit.MILLISECONDS); + incrementDroppedMessages(droppedMessages, isCrossNodeTimeout); + } + private void incrementDroppedMessages(DroppedMessages droppedMessages, boolean isCrossNodeTimeout) { droppedMessages.metrics.dropped.mark(); @@ -1000,11 +1020,14 @@ public final class MessagingService implements MessagingServiceMBean int droppedCrossNodeTimeout = droppedMessages.droppedCrossNodeTimeout.getAndSet(0); if (droppedInternalTimeout > 0 || droppedCrossNodeTimeout > 0) { - ret.add(String.format("%s messages were dropped in last %d ms: %d for internal timeout and %d for cross node timeout", - verb, - LOG_DROPPED_INTERVAL_IN_MS, - droppedInternalTimeout, - droppedCrossNodeTimeout)); + ret.add(String.format("%s messages were dropped in last %d ms: %d for internal timeout and %d for cross node timeout." + + " Mean internal dropped latency: %d ms and Mean cross-node dropped latency: %d ms", + verb, + LOG_DROPPED_INTERVAL_IN_MS, + droppedInternalTimeout, + droppedCrossNodeTimeout, + TimeUnit.NANOSECONDS.toMillis((long)droppedMessages.metrics.internalDroppedLatency.getSnapshot().getMean()), + TimeUnit.NANOSECONDS.toMillis((long)droppedMessages.metrics.crossNodeDroppedLatency.getSnapshot().getMean()))); } } return ret; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index f161607..2e32f16 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1732,7 +1732,7 @@ public class StorageProxy implements StorageProxyMBean } else { - MessagingService.instance().incrementDroppedMessages(verb); + MessagingService.instance().incrementDroppedMessages(verb, System.currentTimeMillis() - constructionTime); handler.onFailure(FBUtilities.getBroadcastAddress()); } @@ -2383,9 +2383,10 @@ public class StorageProxy implements StorageProxyMBean public final void run() { - if (System.currentTimeMillis() > constructionTime + timeout) + long timeTaken = System.currentTimeMillis() - constructionTime; + if (timeTaken > timeout) { - MessagingService.instance().incrementDroppedMessages(verb); + MessagingService.instance().incrementDroppedMessages(verb, timeTaken); return; } try @@ -2411,9 +2412,11 @@ public class StorageProxy implements StorageProxyMBean public final void run() { - if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION)) + long mutationTimeout = DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION); + long timeTaken = System.currentTimeMillis() - constructionTime; + if (timeTaken > mutationTimeout) { - MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION); + MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION, timeTaken); HintRunnable runnable = new HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress())) { protected void runMayThrow() throws Exception http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/test/unit/org/apache/cassandra/net/MessagingServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java index 75c146e..3b9c957 100644 --- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java +++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java @@ -15,22 +15,22 @@ public class MessagingServiceTest { MessagingService.Verb verb = MessagingService.Verb.READ; - for (int i = 0; i < 5000; i++) - messagingService.incrementDroppedMessages(verb, i % 2 == 0); + for (int i = 1; i <= 5000; i++) + messagingService.incrementDroppedMessages(verb, i, i % 2 == 0); List<String> logs = messagingService.getDroppedMessagesLogs(); assertEquals(1, logs.size()); - assertEquals("READ messages were dropped in last 5000 ms: 2500 for internal timeout and 2500 for cross node timeout", logs.get(0)); + assertEquals("READ messages were dropped in last 5000 ms: 2500 for internal timeout and 2500 for cross node timeout. Mean internal dropped latency: 2730 ms and Mean cross-node dropped latency: 2731 ms", logs.get(0)); assertEquals(5000, (int)messagingService.getDroppedMessages().get(verb.toString())); logs = messagingService.getDroppedMessagesLogs(); assertEquals(0, logs.size()); for (int i = 0; i < 2500; i++) - messagingService.incrementDroppedMessages(verb, i % 2 == 0); + messagingService.incrementDroppedMessages(verb, i, i % 2 == 0); logs = messagingService.getDroppedMessagesLogs(); - assertEquals("READ messages were dropped in last 5000 ms: 1250 for internal timeout and 1250 for cross node timeout", logs.get(0)); + assertEquals("READ messages were dropped in last 5000 ms: 1250 for internal timeout and 1250 for cross node timeout. Mean internal dropped latency: 2277 ms and Mean cross-node dropped latency: 2278 ms", logs.get(0)); assertEquals(7500, (int)messagingService.getDroppedMessages().get(verb.toString())); }