Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 2a4ab8716 -> 14d7a63b8
Improve trace messages for RR patch by Robert Stupp; reviewed by Jason Brown for CASSANDRA-9479 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/353d4a05 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/353d4a05 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/353d4a05 Branch: refs/heads/cassandra-2.2 Commit: 353d4a052c866cb230e06e69e99d9c5c8c8d955c Parents: f2db756 Author: Robert Stupp <sn...@snazy.de> Authored: Sun Jun 28 10:24:34 2015 +0200 Committer: Robert Stupp <sn...@snazy.de> Committed: Sun Jun 28 10:24:34 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/net/MessagingService.java | 2 +- .../cassandra/net/OutboundTcpConnection.java | 2 +- .../cassandra/service/AbstractReadExecutor.java | 17 +++++++++++++++++ .../apache/cassandra/service/ReadCallback.java | 19 ++++++++++++++++++- .../cassandra/service/RowDataResolver.java | 2 ++ 6 files changed, 40 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/353d4a05/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 32f0873..6a137a3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.17 + * Improve trace messages for RR (CASSANDRA-9479) * Fix suboptimal secondary index selection when restricted clustering column is also indexed (CASSANDRA-9631) * (cqlsh) Add min_threshold to DTCS option autocomplete (CASSANDRA-9385) http://git-wip-us.apache.org/repos/asf/cassandra/blob/353d4a05/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index d570faf..ee6b87b 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -722,7 +722,7 @@ public final class MessagingService implements MessagingServiceMBean { TraceState state = Tracing.instance.initializeFromMessage(message); if (state != null) - state.trace("Message received from {}", message.from); + state.trace("{} message received from {}", message.verb, message.from); Verb verb = message.verb; message = SinkManager.processInboundMessage(message, id); http://git-wip-us.apache.org/repos/asf/cassandra/blob/353d4a05/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 5559df2..af61dd4 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -186,7 +186,7 @@ public class OutboundTcpConnection extends Thread { UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes)); TraceState state = Tracing.instance.get(sessionId); - String message = String.format("Sending message to %s", poolReference.endPoint()); + String message = String.format("Sending %s message to %s", qm.message.verb, poolReference.endPoint()); // session may have already finished; see CASSANDRA-5668 if (state == null) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/353d4a05/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java index 3f57e73..2f2370d 100644 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@ -43,6 +43,8 @@ import org.apache.cassandra.metrics.ReadRepairMetrics; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageProxy.LocalReadRunnable; +import org.apache.cassandra.tracing.TraceState; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; /** @@ -61,12 +63,14 @@ public abstract class AbstractReadExecutor protected final List<InetAddress> targetReplicas; protected final RowDigestResolver resolver; protected final ReadCallback<ReadResponse, Row> handler; + protected final TraceState traceState; AbstractReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas) { this.command = command; this.targetReplicas = targetReplicas; resolver = new RowDigestResolver(command.ksName, command.key); + traceState = Tracing.instance.get(); handler = new ReadCallback<>(resolver, consistencyLevel, command, targetReplicas); } @@ -81,11 +85,15 @@ public abstract class AbstractReadExecutor { if (isLocalRequest(endpoint)) { + if (traceState != null) + traceState.trace("reading data locally"); logger.trace("reading data locally"); StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler)); } else { + if (traceState != null) + traceState.trace("reading data from {}", endpoint); logger.trace("reading data from {}", endpoint); MessagingService.instance().sendRR(command.createMessage(), endpoint, handler); } @@ -101,11 +109,15 @@ public abstract class AbstractReadExecutor { if (isLocalRequest(endpoint)) { + if (traceState != null) + traceState.trace("reading digest locally"); logger.trace("reading digest locally"); StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler)); } else { + if (traceState != null) + traceState.trace("reading digest from {}", endpoint); logger.trace("reading digest from {}", endpoint); MessagingService.instance().sendRR(message, endpoint, handler); } @@ -158,7 +170,10 @@ public abstract class AbstractReadExecutor return new NeverSpeculatingReadExecutor(command, consistencyLevel, targetReplicas); if (repairDecision != ReadRepairDecision.NONE) + { + Tracing.trace("Read-repair {}", repairDecision); ReadRepairMetrics.attempted.mark(); + } ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.cfName); RetryType retryType = cfs.metadata.getSpeculativeRetry().type; @@ -279,6 +294,8 @@ public abstract class AbstractReadExecutor } InetAddress extraReplica = Iterables.getLast(targetReplicas); + if (traceState != null) + traceState.trace("speculating read retry on {}", extraReplica); logger.trace("speculating read retry on {}", extraReplica); MessagingService.instance().sendRR(retryCommand.createMessage(), extraReplica, handler); speculated = true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/353d4a05/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index 150fabe..1315102 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -40,6 +40,8 @@ import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.tracing.TraceState; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.SimpleCondition; @@ -99,6 +101,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag { // Same as for writes, see AbstractWriteResponseHandler ReadTimeoutException ex = new ReadTimeoutException(consistencyLevel, received.get(), blockfor, resolver.isDataPresent()); + Tracing.trace("Read timeout: {}", ex.toString()); if (logger.isDebugEnabled()) logger.debug("Read timeout: {}", ex.toString()); throw ex; @@ -119,7 +122,12 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag // kick off a background digest comparison if this is a result that (may have) arrived after // the original resolve that get() kicks off as soon as the condition is signaled if (blockfor < endpoints.size() && n == endpoints.size()) - StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner()); + { + TraceState traceState = Tracing.instance.get(); + if (traceState != null) + traceState.trace("Initiating read-repair"); + StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner(traceState)); + } } } @@ -163,6 +171,13 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag private class AsyncRepairRunner implements Runnable { + private final TraceState traceState; + + public AsyncRepairRunner(TraceState traceState) + { + this.traceState = traceState; + } + public void run() { // If the resolver is a RowDigestResolver, we need to do a full data read if there is a mismatch. @@ -176,6 +191,8 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag { assert resolver instanceof RowDigestResolver; + if (traceState != null) + traceState.trace("Digest mismatch: {}", e.toString()); if (logger.isDebugEnabled()) logger.debug("Digest mismatch:", e); http://git-wip-us.apache.org/repos/asf/cassandra/blob/353d4a05/src/java/org/apache/cassandra/service/RowDataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RowDataResolver.java b/src/java/org/apache/cassandra/service/RowDataResolver.java index 00f8753..bf4afbe 100644 --- a/src/java/org/apache/cassandra/service/RowDataResolver.java +++ b/src/java/org/apache/cassandra/service/RowDataResolver.java @@ -31,6 +31,7 @@ import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.filter.IDiskAtomFilter; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.net.*; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.FBUtilities; @@ -120,6 +121,7 @@ public class RowDataResolver extends AbstractRowResolver MessageOut repairMessage; // use a separate verb here because we don't want these to be get the white glove hint- // on-timeout behavior that a "real" mutation gets + Tracing.trace("Sending read-repair-mutation to {}", endpoints.get(i)); repairMessage = rowMutation.createMessage(MessagingService.Verb.READ_REPAIR); results.add(MessagingService.instance().sendRR(repairMessage, endpoints.get(i))); }