Repository: cassandra
Updated Branches:
  refs/heads/trunk bb25f5bdd -> c9ef25fd8


Revert "Add latency logging for dropped messages"

This reverts commit 3c8d87f4324e5ff8bf6b1c3652e9c5eacf03bc20.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bd5c8bbc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bd5c8bbc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bd5c8bbc

Branch: refs/heads/trunk
Commit: bd5c8bbc04e017089743b27cce55635dac00b98e
Parents: bb25f5b
Author: Joshua McKenzie <jmcken...@apache.org>
Authored: Thu Dec 24 07:59:31 2015 -0500
Committer: Joshua McKenzie <jmcken...@apache.org>
Committed: Thu Dec 24 07:59:31 2015 -0500

----------------------------------------------------------------------
 .../cassandra/net/MessageDeliveryTask.java      | 42 ++-----------------
 .../apache/cassandra/service/StorageProxy.java  | 44 ++------------------
 2 files changed, 7 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd5c8bbc/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java 
b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index bede3d8..818cfc6 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -18,13 +18,11 @@
 package org.apache.cassandra.net;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.EnumSet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.cassandra.db.IMutation;
+
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.index.IndexNotAvailableException;
@@ -45,11 +43,10 @@ public class MessageDeliveryTask implements Runnable
 
     public void run()
     {
-        long timeTaken = System.currentTimeMillis() - 
message.constructionTime.timestamp;
         MessagingService.Verb verb = message.verb;
-        if (MessagingService.DROPPABLE_VERBS.contains(verb)&& 
message.getTimeout() > timeTaken)
+        if (MessagingService.DROPPABLE_VERBS.contains(verb)
+            && System.currentTimeMillis() > message.constructionTime.timestamp 
+ message.getTimeout())
         {
-            LogDroppedMessageDetails(timeTaken);
             MessagingService.instance().incrementDroppedMessages(message);
             return;
         }
@@ -85,37 +82,6 @@ public class MessageDeliveryTask implements Runnable
             
Gossiper.instance.setLastProcessedMessageAt(message.constructionTime.timestamp);
     }
 
-    private void LogDroppedMessageDetails(long timeTaken)
-    {
-        logger.debug("MessageDeliveryTask ran after {} ms, allowed time was {} 
ms. Dropping message {}",
-                timeTaken, message.getTimeout(), message.toString());
-        // Print KS and CF if Payload is mutation or a list of mutations (sent 
due to schema announcements)
-        IMutation mutation;
-        if (message.payload instanceof IMutation)
-        {
-            mutation = (IMutation)message.payload;
-            if (mutation != null)
-            {
-                logger.debug("MessageDeliveryTask dropped mutation of KS {}, 
CF {}", mutation.getKeyspaceName(), 
Arrays.toString(mutation.getColumnFamilyIds().toArray()));
-            }
-        }
-        else if (message.payload instanceof Collection<?>)
-        {
-            Collection<?> payloadItems = (Collection<?>)message.payload;
-            for (Object payloadItem : payloadItems)
-            {
-                if (payloadItem instanceof IMutation)
-                {
-                    mutation = (IMutation)payloadItem;
-                    if (mutation != null)
-                    {
-                        logger.debug("MessageDeliveryTask dropped mutation of 
KS {}, CF {}", mutation.getKeyspaceName(), 
Arrays.toString(mutation.getColumnFamilyIds().toArray()));
-                    }
-                }
-            }
-        }
-    }
-
     private void handleFailure(Throwable t)
     {
         if (message.doCallbackOnFailure())
@@ -129,4 +95,4 @@ public class MessageDeliveryTask implements Runnable
     private static final EnumSet<MessagingService.Verb> GOSSIP_VERBS = 
EnumSet.of(MessagingService.Verb.GOSSIP_DIGEST_ACK,
                                                                                
   MessagingService.Verb.GOSSIP_DIGEST_ACK2,
                                                                                
   MessagingService.Verb.GOSSIP_DIGEST_SYN);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd5c8bbc/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 1c30cd7..f161607 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1198,7 +1198,7 @@ public class StorageProxy implements StorageProxyMBean
             submitHint(mutation, endpointsToHint, responseHandler);
 
         if (insertLocal)
-            performLocally(stage, mutation, mutation::apply, responseHandler);
+            performLocally(stage, mutation::apply, responseHandler);
 
         if (dcGroups != null)
         {
@@ -1286,27 +1286,6 @@ public class StorageProxy implements StorageProxyMBean
         });
     }
 
-    private static void performLocally(Stage stage, IMutation mutation, final 
Runnable runnable, final IAsyncCallbackWithFailure<?> handler)
-    {
-        StageManager.getStage(stage).maybeExecuteImmediately(new 
LocalMutationRunnable(mutation)
-        {
-            public void runMayThrow()
-            {
-                try
-                {
-                    runnable.run();
-                    handler.response(null);
-                }
-                catch (Exception ex)
-                {
-                    if (!(ex instanceof WriteTimeoutException))
-                        logger.error("Failed to apply mutation locally : {}", 
ex);
-                    handler.onFailure(FBUtilities.getBroadcastAddress());
-                }
-            }
-        });
-    }
-
     /**
      * Handle counter mutation on the coordinator host.
      *
@@ -2429,28 +2408,11 @@ public class StorageProxy implements StorageProxyMBean
     private static abstract class LocalMutationRunnable implements Runnable
     {
         private final long constructionTime = System.currentTimeMillis();
-        private IMutation mutation;
-
-        public LocalMutationRunnable(IMutation mutation)
-        {
-            this.mutation = mutation;
-        }
-
-        public LocalMutationRunnable()
-        {
-        }
 
         public final void run()
         {
-            long mutationTimeout = 
DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION);
-            if (System.currentTimeMillis() > constructionTime + 
mutationTimeout)
+            if (System.currentTimeMillis() > constructionTime + 
DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION))
             {
-                long timeTaken = System.currentTimeMillis() - constructionTime;
-                logger.debug("LocalMutationRunnable thread ran after {} ms, 
allowed time was {} ms. ", timeTaken, mutationTimeout);
-                if (this.mutation != null)
-                {
-                    logger.debug("MessageDeliveryTask dropped mutation of KS 
{}, CF {}", this.mutation.getKeyspaceName(), 
Arrays.toString(this.mutation.getColumnFamilyIds().toArray()));
-                }
                 
MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION);
                 HintRunnable runnable = new 
HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress()))
                 {
@@ -2634,4 +2596,4 @@ public class StorageProxy implements StorageProxyMBean
     public long getReadRepairRepairedBackground() {
         return ReadRepairMetrics.repairedBackground.getCount();
     }
-}
\ No newline at end of file
+}

Reply via email to