Repository: cassandra
Updated Branches:
  refs/heads/trunk 9e2893853 -> 6739434c6


Improve trace messages for RR

patch by Robert Stupp; reviewed by Jason Brown for CASSANDRA-9479


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/353d4a05
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/353d4a05
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/353d4a05

Branch: refs/heads/trunk
Commit: 353d4a052c866cb230e06e69e99d9c5c8c8d955c
Parents: f2db756
Author: Robert Stupp <sn...@snazy.de>
Authored: Sun Jun 28 10:24:34 2015 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Sun Jun 28 10:24:34 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                      |  1 +
 .../apache/cassandra/net/MessagingService.java   |  2 +-
 .../cassandra/net/OutboundTcpConnection.java     |  2 +-
 .../cassandra/service/AbstractReadExecutor.java  | 17 +++++++++++++++++
 .../apache/cassandra/service/ReadCallback.java   | 19 ++++++++++++++++++-
 .../cassandra/service/RowDataResolver.java       |  2 ++
 6 files changed, 40 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/353d4a05/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 32f0873..6a137a3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.17
+ * Improve trace messages for RR (CASSANDRA-9479)
  * Fix suboptimal secondary index selection when restricted
    clustering column is also indexed (CASSANDRA-9631)
  * (cqlsh) Add min_threshold to DTCS option autocomplete (CASSANDRA-9385)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/353d4a05/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index d570faf..ee6b87b 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -722,7 +722,7 @@ public final class MessagingService implements 
MessagingServiceMBean
     {
         TraceState state = Tracing.instance.initializeFromMessage(message);
         if (state != null)
-            state.trace("Message received from {}", message.from);
+            state.trace("{} message received from {}", message.verb, 
message.from);
 
         Verb verb = message.verb;
         message = SinkManager.processInboundMessage(message, id);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/353d4a05/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 5559df2..af61dd4 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -186,7 +186,7 @@ public class OutboundTcpConnection extends Thread
             {
                 UUID sessionId = 
UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
                 TraceState state = Tracing.instance.get(sessionId);
-                String message = String.format("Sending message to %s", 
poolReference.endPoint());
+                String message = String.format("Sending %s message to %s", 
qm.message.verb, poolReference.endPoint());
                 // session may have already finished; see CASSANDRA-5668
                 if (state == null)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/353d4a05/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java 
b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 3f57e73..2f2370d 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -43,6 +43,8 @@ import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -61,12 +63,14 @@ public abstract class AbstractReadExecutor
     protected final List<InetAddress> targetReplicas;
     protected final RowDigestResolver resolver;
     protected final ReadCallback<ReadResponse, Row> handler;
+    protected final TraceState traceState;
 
     AbstractReadExecutor(ReadCommand command, ConsistencyLevel 
consistencyLevel, List<InetAddress> targetReplicas)
     {
         this.command = command;
         this.targetReplicas = targetReplicas;
         resolver = new RowDigestResolver(command.ksName, command.key);
+        traceState = Tracing.instance.get();
         handler = new ReadCallback<>(resolver, consistencyLevel, command, 
targetReplicas);
     }
 
@@ -81,11 +85,15 @@ public abstract class AbstractReadExecutor
         {
             if (isLocalRequest(endpoint))
             {
+                if (traceState != null)
+                    traceState.trace("reading data locally");
                 logger.trace("reading data locally");
                 StageManager.getStage(Stage.READ).execute(new 
LocalReadRunnable(command, handler));
             }
             else
             {
+                if (traceState != null)
+                    traceState.trace("reading data from {}", endpoint);
                 logger.trace("reading data from {}", endpoint);
                 MessagingService.instance().sendRR(command.createMessage(), 
endpoint, handler);
             }
@@ -101,11 +109,15 @@ public abstract class AbstractReadExecutor
         {
             if (isLocalRequest(endpoint))
             {
+                if (traceState != null)
+                    traceState.trace("reading digest locally");
                 logger.trace("reading digest locally");
                 StageManager.getStage(Stage.READ).execute(new 
LocalReadRunnable(digestCommand, handler));
             }
             else
             {
+                if (traceState != null)
+                    traceState.trace("reading digest from {}", endpoint);
                 logger.trace("reading digest from {}", endpoint);
                 MessagingService.instance().sendRR(message, endpoint, handler);
             }
@@ -158,7 +170,10 @@ public abstract class AbstractReadExecutor
             return new NeverSpeculatingReadExecutor(command, consistencyLevel, 
targetReplicas);
 
         if (repairDecision != ReadRepairDecision.NONE)
+        {
+            Tracing.trace("Read-repair {}", repairDecision);
             ReadRepairMetrics.attempted.mark();
+        }
 
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.cfName);
         RetryType retryType = cfs.metadata.getSpeculativeRetry().type;
@@ -279,6 +294,8 @@ public abstract class AbstractReadExecutor
                 }
 
                 InetAddress extraReplica = Iterables.getLast(targetReplicas);
+                if (traceState != null)
+                    traceState.trace("speculating read retry on {}", 
extraReplica);
                 logger.trace("speculating read retry on {}", extraReplica);
                 
MessagingService.instance().sendRR(retryCommand.createMessage(), extraReplica, 
handler);
                 speculated = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/353d4a05/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java 
b/src/java/org/apache/cassandra/service/ReadCallback.java
index 150fabe..1315102 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -40,6 +40,8 @@ import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SimpleCondition;
 
@@ -99,6 +101,7 @@ public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallback<TMessag
         {
             // Same as for writes, see AbstractWriteResponseHandler
             ReadTimeoutException ex = new 
ReadTimeoutException(consistencyLevel, received.get(), blockfor, 
resolver.isDataPresent());
+            Tracing.trace("Read timeout: {}", ex.toString());
             if (logger.isDebugEnabled())
                 logger.debug("Read timeout: {}", ex.toString());
             throw ex;
@@ -119,7 +122,12 @@ public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallback<TMessag
             // kick off a background digest comparison if this is a result 
that (may have) arrived after
             // the original resolve that get() kicks off as soon as the 
condition is signaled
             if (blockfor < endpoints.size() && n == endpoints.size())
-                StageManager.getStage(Stage.READ_REPAIR).execute(new 
AsyncRepairRunner());
+            {
+                TraceState traceState = Tracing.instance.get();
+                if (traceState != null)
+                    traceState.trace("Initiating read-repair");
+                StageManager.getStage(Stage.READ_REPAIR).execute(new 
AsyncRepairRunner(traceState));
+            }
         }
     }
 
@@ -163,6 +171,13 @@ public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallback<TMessag
 
     private class AsyncRepairRunner implements Runnable
     {
+        private final TraceState traceState;
+
+        public AsyncRepairRunner(TraceState traceState)
+        {
+            this.traceState = traceState;
+        }
+
         public void run()
         {
             // If the resolver is a RowDigestResolver, we need to do a full 
data read if there is a mismatch.
@@ -176,6 +191,8 @@ public class ReadCallback<TMessage, TResolved> implements 
IAsyncCallback<TMessag
             {
                 assert resolver instanceof RowDigestResolver;
 
+                if (traceState != null)
+                    traceState.trace("Digest mismatch: {}", e.toString());
                 if (logger.isDebugEnabled())
                     logger.debug("Digest mismatch:", e);
                 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/353d4a05/src/java/org/apache/cassandra/service/RowDataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDataResolver.java 
b/src/java/org/apache/cassandra/service/RowDataResolver.java
index 00f8753..bf4afbe 100644
--- a/src/java/org/apache/cassandra/service/RowDataResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDataResolver.java
@@ -31,6 +31,7 @@ import 
org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.net.*;
+import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -120,6 +121,7 @@ public class RowDataResolver extends AbstractRowResolver
             MessageOut repairMessage;
             // use a separate verb here because we don't want these to be get 
the white glove hint-
             // on-timeout behavior that a "real" mutation gets
+            Tracing.trace("Sending read-repair-mutation to {}", 
endpoints.get(i));
             repairMessage = 
rowMutation.createMessage(MessagingService.Verb.READ_REPAIR);
             results.add(MessagingService.instance().sendRR(repairMessage, 
endpoints.get(i)));
         }

Reply via email to