Updated Branches: refs/heads/cassandra-1.2 f30015c86 -> e5c34d7c2 refs/heads/trunk 093e188a4 -> 26018be22
improve tracing patch by jbellis; reviewed by slebresne for CASSANDRA-5638 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0c81eaec Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0c81eaec Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0c81eaec Branch: refs/heads/cassandra-1.2 Commit: 0c81eaecb2572d9c70e033aa2c76288611386d8f Parents: f30015c Author: Jonathan Ellis <jbel...@apache.org> Authored: Fri Jun 14 10:34:57 2013 -0700 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Tue Jun 18 08:40:57 2013 -0500 ---------------------------------------------------------------------- .../apache/cassandra/cql3/QueryProcessor.java | 6 +-- .../apache/cassandra/db/ReadVerbHandler.java | 2 - .../apache/cassandra/service/StorageProxy.java | 50 ++++---------------- 3 files changed, 10 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c81eaec/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index dae9cc9..513c96e 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -111,6 +111,7 @@ public class QueryProcessor private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException { + logger.trace("Process {} @CL.{}", statement, cl); ClientState clientState = queryState.getClientState(); statement.validate(clientState); statement.checkAccess(clientState); @@ -121,7 +122,6 @@ public class QueryProcessor public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState) throws RequestExecutionException, RequestValidationException { - logger.trace("CQL QUERY: {}", queryString); CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement; if (prepared.getBoundsTerms() > 0) throw new InvalidRequestException("Cannot execute query with bind variables"); @@ -187,8 +187,6 @@ public class QueryProcessor public static ResultMessage.Prepared prepare(String queryString, ClientState clientState, boolean forThrift) throws RequestValidationException { - logger.trace("CQL QUERY: {}", queryString); - ParsedStatement.Prepared prepared = getStatement(queryString, clientState); ResultMessage.Prepared msg = storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift); @@ -245,7 +243,7 @@ public class QueryProcessor private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState) throws RequestValidationException { - Tracing.trace("Parsing statement"); + Tracing.trace("Parsing {}", queryStr); ParsedStatement statement = parseStatement(queryStr); // Set keyspace for statement that require login http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c81eaec/src/java/org/apache/cassandra/db/ReadVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java index a06035a..a05f7a2 100644 --- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java @@ -54,8 +54,6 @@ public class ReadVerbHandler implements IVerbHandler<ReadCommand> { if (command.isDigestQuery()) { - if (logger.isTraceEnabled()) - logger.trace("digest is " + ByteBufferUtil.bytesToHex(ColumnFamily.digest(row.cf))); return new ReadResponse(ColumnFamily.digest(row.cf)); } else http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c81eaec/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 5517387..adb3f2d 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -138,9 +138,6 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel consistency_level) throws IOException { - if (logger.isTraceEnabled()) - logger.trace("insert writing local & replicate " + mutation.toString(true)); - Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level); runnable.run(); } @@ -155,9 +152,6 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel consistency_level) throws IOException { - if (logger.isTraceEnabled()) - logger.trace("insert writing local & replicate " + mutation.toString(true)); - Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level); StageManager.getStage(Stage.MUTATION).execute(runnable); } @@ -177,7 +171,6 @@ public class StorageProxy implements StorageProxyMBean throws UnavailableException, OverloadedException, WriteTimeoutException { Tracing.trace("Determining replicas for mutation"); - logger.trace("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level); final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); long startTime = System.nanoTime(); @@ -259,7 +252,6 @@ public class StorageProxy implements StorageProxyMBean { Tracing.trace("Determining replicas for atomic batch"); long startTime = System.nanoTime(); - logger.trace("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level); List<WriteResponseHandlerWrapper> wrappers = new ArrayList<WriteResponseHandlerWrapper>(mutations.size()); String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); @@ -513,9 +505,6 @@ public class StorageProxy implements StorageProxyMBean else { // belongs on a different server - if (logger.isTraceEnabled()) - logger.trace("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination); - String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination); Multimap<MessageOut, InetAddress> messages = dcMessages.get(dc); if (messages == null) @@ -523,7 +512,6 @@ public class StorageProxy implements StorageProxyMBean messages = HashMultimap.create(); dcMessages.put(dc, messages); } - messages.put(rm.createMessage(), destination); } } @@ -653,19 +641,15 @@ public class StorageProxy implements StorageProxyMBean CompactEndpointSerializationHelper.serialize(destination, dos); String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout()); dos.writeUTF(id); - logger.trace("Adding FWD message to {}@{}", id, destination); } message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray()); // send the combined message + forward headers - String id = MessagingService.instance().sendRR(message, target, handler); - logger.trace("Sending message to {}@{}", id, target); + Tracing.trace("Enqueuing message to {}", target); + MessagingService.instance().sendRR(message, target, handler); } private static void insertLocal(final RowMutation rm, final AbstractWriteResponseHandler responseHandler) { - if (logger.isTraceEnabled()) - logger.trace("insert writing local " + rm.toString(true)); - Runnable runnable = new DroppableRunnable(MessagingService.Verb.MUTATION) { public void runMayThrow() throws IOException @@ -713,8 +697,7 @@ public class StorageProxy implements StorageProxyMBean // Forward the actual update to the chosen leader replica AbstractWriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.COUNTER); - if (logger.isTraceEnabled()) - logger.trace("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key()) + " to " + endpoint); + Tracing.trace("Enqueuing counter update to {}", endpoint); MessagingService.instance().sendRR(cm.makeMutationMessage(), endpoint, responseHandler); return responseHandler; } @@ -886,7 +869,6 @@ public class StorageProxy implements StorageProxyMBean ReadCommand command = commands.get(i); Table table = Table.open(command.getKeyspace()); assert !command.isDigestQuery(); - logger.trace("Command/ConsistencyLevel is {}/{}", command, consistency_level); List<InetAddress> endpoints = getLiveSortedEndpoints(table, command.key); CFMetaData cfm = Schema.instance.getCFMetaData(command.getKeyspace(), command.getColumnFamilyName()); @@ -908,12 +890,11 @@ public class StorageProxy implements StorageProxyMBean InetAddress dataPoint = endpoints.get(0); if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS) { - logger.trace("reading data locally"); StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler)); } else { - logger.trace("reading data from {}", dataPoint); + Tracing.trace("Enqueuing data request to {}", dataPoint); MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler); } @@ -928,12 +909,11 @@ public class StorageProxy implements StorageProxyMBean { if (digestPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS) { - logger.trace("reading digest locally"); StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler)); } else { - logger.trace("reading digest from {}", digestPoint); + Tracing.trace("Enqueuing digest request to {}", dataPoint); // (We lazy-construct the digest Message object since it may not be necessary if we // are doing a local digest read, or no digest reads at all.) if (message == null) @@ -1063,8 +1043,6 @@ public class StorageProxy implements StorageProxyMBean protected void runMayThrow() { - logger.trace("LocalReadRunnable reading {}", command); - Table table = Table.open(command.table); Row r = command.getRow(table); ReadResponse result = ReadVerbHandler.getResponse(command, r); @@ -1088,8 +1066,6 @@ public class StorageProxy implements StorageProxyMBean protected void runMayThrow() throws ExecutionException, InterruptedException { - logger.trace("LocalReadRunnable reading {}", command); - RangeSliceReply result = new RangeSliceReply(RangeSliceVerbHandler.executeLocally(command)); MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), System.currentTimeMillis() - start); handler.response(result); @@ -1124,7 +1100,6 @@ public class StorageProxy implements StorageProxyMBean throws UnavailableException, ReadTimeoutException { Tracing.trace("Determining replicas to query"); - logger.trace("Command/ConsistencyLevel is {}/{}", command.toString(), consistency_level); long startTime = System.nanoTime(); Table table = Table.open(command.keyspace); @@ -1211,7 +1186,6 @@ public class StorageProxy implements StorageProxyMBean && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS) { - logger.trace("reading data locally"); StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler)); } else @@ -1219,8 +1193,8 @@ public class StorageProxy implements StorageProxyMBean MessageOut<RangeSliceCommand> message = nodeCmd.createMessage(); for (InetAddress endpoint : filteredEndpoints) { + logger.trace("Enqueuing request to {}", endpoint); MessagingService.instance().sendRR(message, endpoint, handler); - logger.trace("reading {} from {}", nodeCmd, endpoint); } } @@ -1231,7 +1205,6 @@ public class StorageProxy implements StorageProxyMBean rows.add(row); if (nodeCmd.countCQL3Rows) cql3RowCount += row.getLiveCount(commandPredicate); - logger.trace("range slices read {}", row.key); } FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout()); } @@ -1295,7 +1268,6 @@ public class StorageProxy implements StorageProxyMBean public void response(MessageIn<UUID> message) { // record the response from the remote node. - logger.trace("Received schema check response from {}", message.from.getHostAddress()); versions.put(message.from, message.payload); latch.countDown(); } @@ -1320,8 +1292,6 @@ public class StorageProxy implements StorageProxyMBean throw new AssertionError("This latch shouldn't have been interrupted."); } - logger.trace("My version is {}", myVersion); - // maps versions to hosts that are on that version. Map<String, List<String>> results = new HashMap<String, List<String>>(); Iterable<InetAddress> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers()); @@ -1364,7 +1334,6 @@ public class StorageProxy implements StorageProxyMBean // special case for bounds containing exactly 1 (non-minimum) token if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum(StorageService.getPartitioner())) { - logger.trace("restricted single token match for query {}", queryRange); return Collections.singletonList(queryRange); } @@ -1399,8 +1368,6 @@ public class StorageProxy implements StorageProxyMBean remainder = splits.right; } ranges.add(remainder); - if (logger.isDebugEnabled()) - logger.trace("restricted ranges for query {} are {}", queryRange, ranges); return ranges; } @@ -1512,7 +1479,7 @@ public class StorageProxy implements StorageProxyMBean if (hintWindowExpired) { HintedHandOffManager.instance.metrics.incrPastWindow(ep); - logger.trace("not hinting {} which has been down {}ms", ep, Gossiper.instance.getEndpointDowntime(ep)); + Tracing.trace("Not hinting {} which has been down {}ms", ep, Gossiper.instance.getEndpointDowntime(ep)); } return !hintWindowExpired; } @@ -1544,14 +1511,13 @@ public class StorageProxy implements StorageProxyMBean final TruncateResponseHandler responseHandler = new TruncateResponseHandler(blockFor); // Send out the truncate calls and track the responses with the callbacks. - logger.trace("Starting to send truncate messages to hosts {}", allEndpoints); + Tracing.trace("Enqueuing truncate messages to hosts {}", allEndpoints); final Truncation truncation = new Truncation(keyspace, cfname); MessageOut<Truncation> message = truncation.createMessage(); for (InetAddress endpoint : allEndpoints) MessagingService.instance().sendRR(message, endpoint, responseHandler); // Wait for all - logger.trace("Sent all truncate messages, now waiting for {} responses", blockFor); responseHandler.get(); }