Repository: cassandra Updated Branches: refs/heads/trunk 6148f5214 -> 66d3428e3
Add metric for number of dropped mutations patch by Anubhav Kale; reviewed by Paulo Motta for CASSANDRA-10866 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/66d3428e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/66d3428e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/66d3428e Branch: refs/heads/trunk Commit: 66d3428e3fe64851fa7587ee69b53e20bb7c09b5 Parents: 6148f52 Author: anubhavkale <anubh...@microsoft.com> Authored: Fri Jan 15 10:20:42 2016 -0500 Committer: Carl Yeksigian <c...@apache.org> Committed: Fri Jan 15 10:20:42 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/metrics/TableMetrics.java | 4 +++ .../apache/cassandra/net/MessagingService.java | 27 ++++++++++++++++++++ .../apache/cassandra/service/StorageProxy.java | 22 ++++++++++++---- .../org/apache/cassandra/tools/NodeProbe.java | 1 + .../cassandra/tools/nodetool/TableStats.java | 1 + 6 files changed, 51 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/66d3428e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 61284f2..33b9f9b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.4 + * Add metric for number of dropped mutations (CASSANDRA-10866) * Simplify row cache invalidation code (CASSANDRA-10396) * Support user-defined compaction through nodetool (CASSANDRA-10660) * Stripe view locks by key and table ID to reduce contention (CASSANDRA-10981) http://git-wip-us.apache.org/repos/asf/cassandra/blob/66d3428e/src/java/org/apache/cassandra/metrics/TableMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index c8c214e..6492833 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -142,6 +142,9 @@ public class TableMetrics /** Time spent waiting for free memtable space, either on- or off-heap */ public final Histogram waitingOnFreeMemtableSpace; + /** Dropped Mutations Count */ + public final Counter droppedMutations; + private final MetricNameFactory factory; private final MetricNameFactory aliasFactory; private static final MetricNameFactory globalFactory = new AllTableMetricNameFactory("Table"); @@ -621,6 +624,7 @@ public class TableMetrics rowCacheHitOutOfRange = createTableCounter("RowCacheHitOutOfRange"); rowCacheHit = createTableCounter("RowCacheHit"); rowCacheMiss = createTableCounter("RowCacheMiss"); + droppedMutations = createTableCounter("DroppedMutations"); casPrepare = new LatencyMetrics(factory, "CasPrepare", cfs.keyspace.metric.casPrepare); casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose); http://git-wip-us.apache.org/repos/asf/cassandra/blob/66d3428e/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 76f4967..2bfa46c 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -953,6 +953,15 @@ public final class MessagingService implements MessagingServiceMBean return versions.containsKey(endpoint); } + public void incrementDroppedMutations(Optional<IMutation> mutationOpt, long timeTaken) + { + if (mutationOpt.isPresent()) + { + updateDroppedMutationCount(mutationOpt.get()); + } + incrementDroppedMessages(Verb.MUTATION, timeTaken); + } + public void incrementDroppedMessages(Verb verb) { incrementDroppedMessages(verb, false); @@ -965,6 +974,10 @@ public final class MessagingService implements MessagingServiceMBean public void incrementDroppedMessages(MessageIn message, long timeTaken) { + if (message.payload instanceof IMutation) + { + updateDroppedMutationCount((IMutation) message.payload); + } incrementDroppedMessages(message.verb, timeTaken, message.constructionTime.isCrossNode); } @@ -980,6 +993,20 @@ public final class MessagingService implements MessagingServiceMBean incrementDroppedMessages(droppedMessagesMap.get(verb), isCrossNodeTimeout); } + private void updateDroppedMutationCount(IMutation mutation) + { + assert mutation != null : "Mutation should not be null when updating dropped mutations count"; + + for (UUID columnFamilyId : mutation.getColumnFamilyIds()) + { + ColumnFamilyStore cfs = Keyspace.open(mutation.getKeyspaceName()).getColumnFamilyStore(columnFamilyId); + if (cfs != null) + { + cfs.metric.droppedMutations.inc(); + } + } + } + private void incrementDroppedMessages(DroppedMessages droppedMessages, long timeTaken, boolean isCrossNodeTimeout) { if (isCrossNodeTimeout) http://git-wip-us.apache.org/repos/asf/cassandra/blob/66d3428e/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 27da486..77e65ec 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -944,7 +944,7 @@ public class StorageProxy implements StorageProxyMBean logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, target, batch.size()); if (canDoLocalRequest(target)) - performLocally(Stage.MUTATION, () -> BatchlogManager.store(batch), handler); + performLocally(Stage.MUTATION, Optional.empty(), () -> BatchlogManager.store(batch), handler); else MessagingService.instance().sendRR(message, target, handler); } @@ -1233,7 +1233,7 @@ public class StorageProxy implements StorageProxyMBean submitHint(mutation, endpointsToHint, responseHandler); if (insertLocal) - performLocally(stage, mutation::apply, responseHandler); + performLocally(stage, Optional.of(mutation), mutation::apply, responseHandler); if (dcGroups != null) { @@ -1322,9 +1322,9 @@ public class StorageProxy implements StorageProxyMBean }); } - private static void performLocally(Stage stage, final Runnable runnable, final IAsyncCallbackWithFailure<?> handler) + private static void performLocally(Stage stage, Optional<IMutation> mutation, final Runnable runnable, final IAsyncCallbackWithFailure<?> handler) { - StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable() + StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable(mutation) { public void runMayThrow() { @@ -2473,6 +2473,18 @@ public class StorageProxy implements StorageProxyMBean { private final long constructionTime = System.currentTimeMillis(); + private final Optional<IMutation> mutationOpt; + + public LocalMutationRunnable(Optional<IMutation> mutationOpt) + { + this.mutationOpt = mutationOpt; + } + + public LocalMutationRunnable() + { + this.mutationOpt = Optional.empty(); + } + public final void run() { final MessagingService.Verb verb = verb(); @@ -2481,7 +2493,7 @@ public class StorageProxy implements StorageProxyMBean if (timeTaken > mutationTimeout) { if (MessagingService.DROPPABLE_VERBS.contains(verb)) - MessagingService.instance().incrementDroppedMessages(verb, timeTaken); + MessagingService.instance().incrementDroppedMutations(mutationOpt, timeTaken); HintRunnable runnable = new HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress())) { protected void runMayThrow() throws Exception http://git-wip-us.apache.org/repos/asf/cassandra/blob/66d3428e/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index a8d23ca..2bc516a 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1227,6 +1227,7 @@ public class NodeProbe implements AutoCloseable case "WriteTotalLatency": case "ReadTotalLatency": case "PendingFlushes": + case "DroppedMutations": return JMX.newMBeanProxy(mbeanServerConn, oName, CassandraMetricsRegistry.JmxCounterMBean.class).getCount(); case "CoordinatorReadLatency": case "CoordinatorScanLatency": http://git-wip-us.apache.org/repos/asf/cassandra/blob/66d3428e/src/java/org/apache/cassandra/tools/nodetool/TableStats.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/TableStats.java b/src/java/org/apache/cassandra/tools/nodetool/TableStats.java index fe664ff..681af5b 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/TableStats.java +++ b/src/java/org/apache/cassandra/tools/nodetool/TableStats.java @@ -219,6 +219,7 @@ public class TableStats extends NodeToolCmd histogram = (CassandraMetricsRegistry.JmxHistogramMBean) probe.getColumnFamilyMetric(keyspaceName, tableName, "TombstoneScannedHistogram"); System.out.println("\t\tAverage tombstones per slice (last five minutes): " + histogram.getMean()); System.out.println("\t\tMaximum tombstones per slice (last five minutes): " + histogram.getMax()); + System.out.println("\t\tDropped Mutations: " + format((Long) probe.getColumnFamilyMetric(keyspaceName, tableName, "DroppedMutations"), humanReadable)); System.out.println(""); }