Repository: cassandra Updated Branches: refs/heads/trunk bb25f5bdd -> c9ef25fd8
Revert "Add latency logging for dropped messages" This reverts commit 3c8d87f4324e5ff8bf6b1c3652e9c5eacf03bc20. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bd5c8bbc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bd5c8bbc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bd5c8bbc Branch: refs/heads/trunk Commit: bd5c8bbc04e017089743b27cce55635dac00b98e Parents: bb25f5b Author: Joshua McKenzie <jmcken...@apache.org> Authored: Thu Dec 24 07:59:31 2015 -0500 Committer: Joshua McKenzie <jmcken...@apache.org> Committed: Thu Dec 24 07:59:31 2015 -0500 ---------------------------------------------------------------------- .../cassandra/net/MessageDeliveryTask.java | 42 ++----------------- .../apache/cassandra/service/StorageProxy.java | 44 ++------------------ 2 files changed, 7 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd5c8bbc/src/java/org/apache/cassandra/net/MessageDeliveryTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java index bede3d8..818cfc6 100644 --- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java +++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java @@ -18,13 +18,11 @@ package org.apache.cassandra.net; import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; import java.util.EnumSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.db.IMutation; + import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.index.IndexNotAvailableException; @@ -45,11 +43,10 @@ public class MessageDeliveryTask implements Runnable public void run() { - long timeTaken = System.currentTimeMillis() - message.constructionTime.timestamp; MessagingService.Verb verb = message.verb; - if (MessagingService.DROPPABLE_VERBS.contains(verb)&& message.getTimeout() > timeTaken) + if (MessagingService.DROPPABLE_VERBS.contains(verb) + && System.currentTimeMillis() > message.constructionTime.timestamp + message.getTimeout()) { - LogDroppedMessageDetails(timeTaken); MessagingService.instance().incrementDroppedMessages(message); return; } @@ -85,37 +82,6 @@ public class MessageDeliveryTask implements Runnable Gossiper.instance.setLastProcessedMessageAt(message.constructionTime.timestamp); } - private void LogDroppedMessageDetails(long timeTaken) - { - logger.debug("MessageDeliveryTask ran after {} ms, allowed time was {} ms. Dropping message {}", - timeTaken, message.getTimeout(), message.toString()); - // Print KS and CF if Payload is mutation or a list of mutations (sent due to schema announcements) - IMutation mutation; - if (message.payload instanceof IMutation) - { - mutation = (IMutation)message.payload; - if (mutation != null) - { - logger.debug("MessageDeliveryTask dropped mutation of KS {}, CF {}", mutation.getKeyspaceName(), Arrays.toString(mutation.getColumnFamilyIds().toArray())); - } - } - else if (message.payload instanceof Collection<?>) - { - Collection<?> payloadItems = (Collection<?>)message.payload; - for (Object payloadItem : payloadItems) - { - if (payloadItem instanceof IMutation) - { - mutation = (IMutation)payloadItem; - if (mutation != null) - { - logger.debug("MessageDeliveryTask dropped mutation of KS {}, CF {}", mutation.getKeyspaceName(), Arrays.toString(mutation.getColumnFamilyIds().toArray())); - } - } - } - } - } - private void handleFailure(Throwable t) { if (message.doCallbackOnFailure()) @@ -129,4 +95,4 @@ public class MessageDeliveryTask implements Runnable private static final EnumSet<MessagingService.Verb> GOSSIP_VERBS = EnumSet.of(MessagingService.Verb.GOSSIP_DIGEST_ACK, MessagingService.Verb.GOSSIP_DIGEST_ACK2, MessagingService.Verb.GOSSIP_DIGEST_SYN); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd5c8bbc/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 1c30cd7..f161607 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1198,7 +1198,7 @@ public class StorageProxy implements StorageProxyMBean submitHint(mutation, endpointsToHint, responseHandler); if (insertLocal) - performLocally(stage, mutation, mutation::apply, responseHandler); + performLocally(stage, mutation::apply, responseHandler); if (dcGroups != null) { @@ -1286,27 +1286,6 @@ public class StorageProxy implements StorageProxyMBean }); } - private static void performLocally(Stage stage, IMutation mutation, final Runnable runnable, final IAsyncCallbackWithFailure<?> handler) - { - StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable(mutation) - { - public void runMayThrow() - { - try - { - runnable.run(); - handler.response(null); - } - catch (Exception ex) - { - if (!(ex instanceof WriteTimeoutException)) - logger.error("Failed to apply mutation locally : {}", ex); - handler.onFailure(FBUtilities.getBroadcastAddress()); - } - } - }); - } - /** * Handle counter mutation on the coordinator host. * @@ -2429,28 +2408,11 @@ public class StorageProxy implements StorageProxyMBean private static abstract class LocalMutationRunnable implements Runnable { private final long constructionTime = System.currentTimeMillis(); - private IMutation mutation; - - public LocalMutationRunnable(IMutation mutation) - { - this.mutation = mutation; - } - - public LocalMutationRunnable() - { - } public final void run() { - long mutationTimeout = DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION); - if (System.currentTimeMillis() > constructionTime + mutationTimeout) + if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION)) { - long timeTaken = System.currentTimeMillis() - constructionTime; - logger.debug("LocalMutationRunnable thread ran after {} ms, allowed time was {} ms. ", timeTaken, mutationTimeout); - if (this.mutation != null) - { - logger.debug("MessageDeliveryTask dropped mutation of KS {}, CF {}", this.mutation.getKeyspaceName(), Arrays.toString(this.mutation.getColumnFamilyIds().toArray())); - } MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION); HintRunnable runnable = new HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress())) { @@ -2634,4 +2596,4 @@ public class StorageProxy implements StorageProxyMBean public long getReadRepairRepairedBackground() { return ReadRepairMetrics.repairedBackground.getCount(); } -} \ No newline at end of file +}