Repository: cassandra Updated Branches: refs/heads/trunk 3916e4867 -> f5866ca2b
Add repair tracing Patch by Ben Chan; reviewed by jmckenzie for CASSANDRA-5483 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f5866ca2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f5866ca2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f5866ca2 Branch: refs/heads/trunk Commit: f5866ca2bac8ce530bb4e20832ff61e622206815 Parents: 3916e48 Author: Ben Chan <usrbi...@yahoo.com> Authored: Wed Dec 3 13:15:10 2014 -0600 Committer: Joshua McKenzie <jmcken...@apache.org> Committed: Wed Dec 3 13:15:10 2014 -0600 ---------------------------------------------------------------------- conf/cassandra.yaml | 4 + .../DebuggableThreadPoolExecutor.java | 15 +++ .../org/apache/cassandra/config/Config.java | 4 + .../cassandra/config/DatabaseDescriptor.java | 10 ++ .../org/apache/cassandra/db/CFRowAdder.java | 13 +- .../org/apache/cassandra/net/MessageOut.java | 7 +- .../cassandra/net/OutboundTcpConnection.java | 4 +- .../apache/cassandra/repair/LocalSyncTask.java | 39 +++++- .../apache/cassandra/repair/RemoteSyncTask.java | 5 +- .../org/apache/cassandra/repair/RepairJob.java | 4 + .../apache/cassandra/repair/RepairSession.java | 23 +++- .../org/apache/cassandra/repair/SyncTask.java | 3 + .../org/apache/cassandra/repair/Validator.java | 6 +- .../cassandra/repair/messages/RepairOption.java | 18 ++- .../cassandra/service/ActiveRepairService.java | 2 +- .../cassandra/service/StorageService.java | 128 ++++++++++++++++++- .../cassandra/service/StorageServiceMBean.java | 2 + .../org/apache/cassandra/tools/NodeProbe.java | 5 + .../org/apache/cassandra/tools/NodeTool.java | 11 +- .../cassandra/tracing/ExpiredTraceState.java | 4 +- .../apache/cassandra/tracing/TraceKeyspace.java | 21 +-- .../apache/cassandra/tracing/TraceState.java | 87 ++++++++++++- .../org/apache/cassandra/tracing/Tracing.java | 98 ++++++++++++-- 23 files changed, 465 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 9c4f0b5..f458ed8 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -765,3 +765,7 @@ internode_compression: all # reducing overhead from the TCP protocol itself, at the cost of increasing # latency if you block for cross-datacenter responses. inter_dc_tcp_nodelay: false + +# TTL for different trace types used during logging of the repair process. +tracetype_query_ttl: 86400 +tracetype_repair_ttl: 604800 http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java index ea04af3..fe6cade 100644 --- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java @@ -100,6 +100,21 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements } /** + * Creates a thread pool that creates new threads as needed, but + * will reuse previously constructed threads when they are + * available. + * @param threadPoolName the name of the threads created by this executor + * @return The new DebuggableThreadPoolExecutor + */ + public static DebuggableThreadPoolExecutor createCachedThreadpoolWithMaxSize(String threadPoolName) + { + return new DebuggableThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(), + new NamedThreadFactory(threadPoolName)); + } + + /** * Returns a ThreadPoolExecutor with a fixed number of threads. * When all threads are actively executing tasks, new tasks are queued. * If (most) threads are expected to be idle most of the time, prefer createWithMaxSize() instead. http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index ca6276c..8c3021d 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -214,6 +214,10 @@ public class Config private static final CsvPreference STANDARD_SURROUNDING_SPACES_NEED_QUOTES = new CsvPreference.Builder(CsvPreference.STANDARD_PREFERENCE) .surroundingSpacesNeedQuotes(true).build(); + // TTL for different types of trace events. + public Integer tracetype_query_ttl = 60 * 60 * 24; + public Integer tracetype_repair_ttl = 60 * 60 * 24 * 7; + public static boolean getOutboundBindAny() { return outboundBindAny; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index a359cce..c34c6ea 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1601,4 +1601,14 @@ public class DatabaseDescriptor String arch = System.getProperty("os.arch"); return arch.contains("64") || arch.contains("sparcv9"); } + + public static int getTracetypeRepairTTL() + { + return conf.tracetype_repair_ttl; + } + + public static int getTracetypeQueryTTL() + { + return conf.tracetype_query_ttl; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/db/CFRowAdder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CFRowAdder.java b/src/java/org/apache/cassandra/db/CFRowAdder.java index 3ff9171..6fab8d5 100644 --- a/src/java/org/apache/cassandra/db/CFRowAdder.java +++ b/src/java/org/apache/cassandra/db/CFRowAdder.java @@ -41,13 +41,20 @@ public class CFRowAdder public final ColumnFamily cf; public final Composite prefix; public final long timestamp; + public final int ttl; private final int ldt; public CFRowAdder(ColumnFamily cf, Composite prefix, long timestamp) { + this(cf, prefix, timestamp, 0); + } + + public CFRowAdder(ColumnFamily cf, Composite prefix, long timestamp, int ttl) + { this.cf = cf; this.prefix = prefix; this.timestamp = timestamp; + this.ttl = ttl; this.ldt = (int) (System.currentTimeMillis() / 1000); // If a CQL3 table, add the row marker @@ -103,7 +110,11 @@ public class CFRowAdder AbstractType valueType = def.type.isCollection() ? ((CollectionType) def.type).valueComparator() : def.type; - cf.addColumn(new BufferCell(name, value instanceof ByteBuffer ? (ByteBuffer)value : valueType.decompose(value), timestamp)); + ByteBuffer valueBytes = value instanceof ByteBuffer ? (ByteBuffer)value : valueType.decompose(value); + if (ttl == 0) + cf.addColumn(new BufferCell(name, valueBytes, timestamp)); + else + cf.addColumn(new BufferExpiringCell(name, valueBytes, timestamp, ttl)); } return this; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/net/MessageOut.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java index 70c4f5c..5193c2b 100644 --- a/src/java/org/apache/cassandra/net/MessageOut.java +++ b/src/java/org/apache/cassandra/net/MessageOut.java @@ -36,6 +36,7 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; import static org.apache.cassandra.tracing.Tracing.TRACE_HEADER; +import static org.apache.cassandra.tracing.Tracing.TRACE_TYPE; import static org.apache.cassandra.tracing.Tracing.isTracing; public class MessageOut<T> @@ -57,8 +58,10 @@ public class MessageOut<T> this(verb, payload, serializer, - isTracing() ? ImmutableMap.of(TRACE_HEADER, UUIDGen.decompose(Tracing.instance.getSessionId())) - : Collections.<String, byte[]>emptyMap()); + isTracing() + ? ImmutableMap.of(TRACE_HEADER, UUIDGen.decompose(Tracing.instance.getSessionId()), + TRACE_TYPE, new byte[] { Tracing.TraceType.serialize(Tracing.instance.getTraceType()) }) + : Collections.<String, byte[]>emptyMap()); } private MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, Map<String, byte[]> parameters) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index a0ad011..cddce07 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -215,7 +215,9 @@ public class OutboundTcpConnection extends Thread // session may have already finished; see CASSANDRA-5668 if (state == null) { - TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1); + byte[] traceTypeBytes = qm.message.parameters.get(Tracing.TRACE_TYPE); + Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]); + TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1, traceType.getTTL(), null); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/LocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java index a43d326..bbb6362 100644 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -26,10 +26,13 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.StreamEvent; import org.apache.cassandra.streaming.StreamEventHandler; import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.tracing.TraceState; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; /** @@ -37,6 +40,8 @@ import org.apache.cassandra.utils.FBUtilities; */ public class LocalSyncTask extends SyncTask implements StreamEventHandler { + private final TraceState state = Tracing.instance.get(); + private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class); private final long repairedAt; @@ -58,7 +63,9 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint; InetAddress preferred = SystemKeyspace.getPreferredIP(dst); - logger.info(String.format("[repair #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, differences.size(), dst)); + String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst); + logger.info("[repair #{}] {}", desc.sessionId, message); + Tracing.traceRepair(message); new StreamPlan("Repair", repairedAt, 1, false).listeners(this) .flushBeforeTransfer(true) // request ranges from the remote node @@ -68,11 +75,37 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler .execute(); } - public void handleStreamEvent(StreamEvent event) { /* noop */ } + public void handleStreamEvent(StreamEvent event) + { + if (state == null) + return; + switch (event.eventType) + { + case STREAM_PREPARED: + StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event; + state.trace("Streaming session with {} prepared", spe.session.peer); + break; + case STREAM_COMPLETE: + StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event; + state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed"); + break; + case FILE_PROGRESS: + ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress; + state.trace("{}/{} bytes ({}%) {} idx:{}{}", + new Object[] { pi.currentBytes, + pi.totalBytes, + pi.currentBytes * 100 / pi.totalBytes, + pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from", + pi.sessionIndex, + pi.peer }); + } + } public void onSuccess(StreamState result) { - logger.info(String.format("[repair #%s] Sync complete between %s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily)); + String message = String.format("Sync complete between %s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily); + logger.info("[repair #{}] {}", desc.sessionId, message); + Tracing.traceRepair(message); set(stat); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/RemoteSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java index ca5c998..ededc40 100644 --- a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java +++ b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java @@ -28,6 +28,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.RepairException; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.SyncRequest; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; /** @@ -49,7 +50,9 @@ public class RemoteSyncTask extends SyncTask { InetAddress local = FBUtilities.getBroadcastAddress(); SyncRequest request = new SyncRequest(desc, local, r1.endpoint, r2.endpoint, differences); - logger.info(String.format("[repair #%s] Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", desc.sessionId, request.ranges.size(), request.src, request.dst)); + String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst); + logger.info("[repair #{}] {}", desc.sessionId, message); + Tracing.traceRepair(message); MessagingService.instance().sendOneWay(request.createMessage(), request.src); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 708ee70..5c649af 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -180,6 +181,9 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable */ private ListenableFuture<List<TreeResponse>> sendValidationRequest(Collection<InetAddress> endpoints) { + String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints); + logger.info("[repair #{}] {}", desc.sessionId, message); + Tracing.traceRepair(message); int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis()); List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size()); for (InetAddress endpoint : endpoints) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index d9787e2..9a8f645 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -30,11 +30,12 @@ import com.google.common.util.concurrent.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTree; import org.apache.cassandra.utils.Pair; @@ -99,7 +100,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, RemoteSyncTask> syncingTasks = new ConcurrentHashMap<>(); // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor - private final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new NamedThreadFactory("RepairJobTask"))); + private final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask")); private volatile boolean terminated = false; @@ -172,7 +173,9 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement return; } - logger.info(String.format("[repair #%s] Received merkle tree for %s from %s", getId(), desc.columnFamily, endpoint)); + String message = String.format("Received merkle tree for %s from %s", desc.columnFamily, endpoint); + logger.info("[repair #{}] {}", getId(), message); + Tracing.traceRepair(message); task.treeReceived(tree); } @@ -215,14 +218,17 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement */ public void start(ListeningExecutorService executor) { + String message; if (terminated) return; logger.info(String.format("[repair #%s] new session: will sync %s on range %s for %s.%s", getId(), repairedNodes(), range, keyspace, Arrays.toString(cfnames))); + Tracing.traceRepair("Syncing range {}", range); if (endpoints.isEmpty()) { - logger.info(String.format("[repair #%s] No neighbors to repair with on range %s: session completed", getId(), range)); + logger.info("[repair #{}] {}", getId(), message = String.format("No neighbors to repair with on range %s: session completed", range)); + Tracing.traceRepair(message); set(new RepairSessionResult(id, keyspace, range, Lists.<RepairResult>newArrayList())); return; } @@ -232,7 +238,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement { if (!FailureDetector.instance.isAlive(endpoint)) { - String message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint); + message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint); logger.error("[repair #{}] {}", getId(), message); setException(new IOException(message)); return; @@ -254,8 +260,10 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement public void onSuccess(List<RepairResult> results) { // this repair session is completed - logger.info(String.format("[repair #%s] session completed successfully", getId())); + logger.info("[repair #{}] {}", getId(), "Session completed successfully"); + Tracing.traceRepair("Completed sync of range {}", range); set(new RepairSessionResult(id, keyspace, range, results)); + taskExecutor.shutdown(); // mark this session as terminated terminate(); @@ -263,7 +271,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement public void onFailure(Throwable t) { - logger.error("Repair job failed", t); + logger.error(String.format("[repair #%s] Session completed with the following error", getId()), t); + Tracing.traceRepair("Session completed with the following error: {}", t); setException(t); } }); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/SyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java b/src/java/org/apache/cassandra/repair/SyncTask.java index 3ce5532..7350a66 100644 --- a/src/java/org/apache/cassandra/repair/SyncTask.java +++ b/src/java/org/apache/cassandra/repair/SyncTask.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.MerkleTree; /** @@ -65,12 +66,14 @@ public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runna if (differences.isEmpty()) { logger.info(String.format(format, "are consistent")); + Tracing.traceRepair("Endpoint {} is consistent with {} for {}", r1.endpoint, r2.endpoint, desc.columnFamily); set(stat); return; } // non-0 difference: perform streaming repair logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync")); + Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", r1.endpoint, differences.size(), r2.endpoint, desc.columnFamily); startSync(differences); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/Validator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java index 641717e..972afd6 100644 --- a/src/java/org/apache/cassandra/repair/Validator.java +++ b/src/java/org/apache/cassandra/repair/Validator.java @@ -34,6 +34,7 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.compaction.AbstractCompactedRow; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.ValidationComplete; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTree; @@ -241,7 +242,10 @@ public class Validator implements Runnable { // respond to the request that triggered this validation if (!initiator.equals(FBUtilities.getBroadcastAddress())) - logger.info(String.format("[repair #%s] Sending completed merkle tree to %s for %s/%s", desc.sessionId, initiator, desc.keyspace, desc.columnFamily)); + { + logger.info(String.format("[repair #%s] Sending completed merkle tree to %s for %s.%s", desc.sessionId, initiator, desc.keyspace, desc.columnFamily)); + Tracing.traceRepair("Sending completed merkle tree to {} for {}.{}", initiator, desc.keyspace, desc.columnFamily); + } MessagingService.instance().sendOneWay(new ValidationComplete(desc, tree).createMessage(), initiator); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/messages/RepairOption.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java index d11f94f..5987aed 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java @@ -40,6 +40,7 @@ public class RepairOption public static final String COLUMNFAMILIES_KEY = "columnFamilies"; public static final String DATACENTERS_KEY = "dataCenters"; public static final String HOSTS_KEY = "hosts"; + public static final String TRACE_KEY = "trace"; // we don't want to push nodes too much for repair public static final int MAX_JOB_THREADS = 4; @@ -76,6 +77,11 @@ public class RepairOption * <td>false</td> * </tr> * <tr> + * <td>trace</td> + * <td>"true" if repair is traced.</td> + * <td>false</td> + * </tr> + * <tr> * <td>jobThreads</td> * <td>Number of threads to use to run repair job.</td> * <td>1</td> @@ -117,6 +123,7 @@ public class RepairOption RepairParallelism parallelism = RepairParallelism.fromName(options.get(PARALLELISM_KEY)); boolean primaryRange = Boolean.parseBoolean(options.get(PRIMARY_RANGE_KEY)); boolean incremental = Boolean.parseBoolean(options.get(INCREMENTAL_KEY)); + boolean trace = Boolean.parseBoolean(options.get(TRACE_KEY)); int jobThreads = 1; if (options.containsKey(JOB_THREADS_KEY)) @@ -146,7 +153,7 @@ public class RepairOption } } - RepairOption option = new RepairOption(parallelism, primaryRange, incremental, jobThreads, ranges); + RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges); // data centers String dataCentersStr = options.get(DATACENTERS_KEY); @@ -203,6 +210,7 @@ public class RepairOption private final RepairParallelism parallelism; private final boolean primaryRange; private final boolean incremental; + private final boolean trace; private final int jobThreads; private final Collection<String> columnFamilies = new HashSet<>(); @@ -210,11 +218,12 @@ public class RepairOption private final Collection<String> hosts = new HashSet<>(); private final Collection<Range<Token>> ranges = new HashSet<>(); - public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, int jobThreads, Collection<Range<Token>> ranges) + public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges) { this.parallelism = parallelism; this.primaryRange = primaryRange; this.incremental = incremental; + this.trace = trace; this.jobThreads = jobThreads; this.ranges.addAll(ranges); } @@ -234,6 +243,11 @@ public class RepairOption return incremental; } + public boolean isTraced() + { + return trace; + } + public int getJobThreads() { return jobThreads; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 252bcd1..e4b7fff 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -79,7 +79,7 @@ public class ActiveRepairService public static enum Status { - STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED + STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED, RUNNING } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index cf2152b..4ec23a6 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -55,6 +55,10 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.auth.Auth; import org.apache.cassandra.concurrent.*; import org.apache.cassandra.config.*; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.statements.SelectStatement; import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; @@ -87,6 +91,9 @@ import org.apache.cassandra.thrift.EndpointDetails; import org.apache.cassandra.thrift.TokenRange; import org.apache.cassandra.thrift.cassandraConstants; import org.apache.cassandra.tracing.TraceKeyspace; +import org.apache.cassandra.tracing.TraceState; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.*; import static java.nio.charset.StandardCharsets.ISO_8859_1; @@ -2474,7 +2481,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE parallelismDegree = RepairParallelism.PARALLEL; } - RepairOption options = new RepairOption(parallelismDegree, primaryRange, !fullRepair, 1, Collections.<Range<Token>>emptyList()); + RepairOption options = new RepairOption(parallelismDegree, primaryRange, !fullRepair, false, 1, Collections.<Range<Token>>emptyList()); if (dataCenters != null) { options.getDataCenters().addAll(dataCenters); @@ -2536,7 +2543,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken); - RepairOption options = new RepairOption(parallelismDegree, false, !fullRepair, 1, repairingRange); + RepairOption options = new RepairOption(parallelismDegree, false, !fullRepair, false, 1, repairingRange); options.getDataCenters().addAll(dataCenters); if (hosts != null) { @@ -2620,6 +2627,75 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return cmd; } + private Thread createQueryThread(final int cmd, final UUID sessionId) + { + return new Thread(new WrappedRunnable() + { + // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces. + // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts. + public void runMayThrow() throws Exception + { + TraceState state = Tracing.instance.get(sessionId); + if (state == null) + throw new Exception("no tracestate"); + + String format = "select event_id, source, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;"; + String query = String.format(format, TraceKeyspace.NAME, TraceKeyspace.EVENTS_TABLE); + SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement; + + ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId); + InetAddress source = FBUtilities.getBroadcastAddress(); + + HashSet<UUID>[] seen = new HashSet[] { new HashSet<UUID>(), new HashSet<UUID>() }; + int si = 0; + UUID uuid; + + long tlast = System.currentTimeMillis(), tcur; + + TraceState.Status status; + long minWaitMillis = 125; + long maxWaitMillis = 1000 * 1024L; + long timeout = minWaitMillis; + boolean shouldDouble = false; + + while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED) + { + if (status == TraceState.Status.IDLE) + { + timeout = shouldDouble ? Math.min(timeout * 2, maxWaitMillis) : timeout; + shouldDouble = !shouldDouble; + } + else + { + timeout = minWaitMillis; + shouldDouble = false; + } + ByteBuffer tminBytes = ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast - 1000)); + ByteBuffer tmaxBytes = ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur = System.currentTimeMillis())); + QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(sessionIdBytes, tminBytes, tmaxBytes)); + ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options); + UntypedResultSet result = UntypedResultSet.create(rows.result); + + for (UntypedResultSet.Row r : result) + { + if (source.equals(r.getInetAddress("source"))) + continue; + if ((uuid = r.getUUID("event_id")).timestamp() > (tcur - 1000) * 10000) + seen[si].add(uuid); + if (seen[si == 0 ? 1 : 0].contains(uuid)) + continue; + String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity")); + sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.RUNNING.ordinal()}); + } + tlast = tcur; + + si = si == 0 ? 1 : 0; + seen[si].clear(); + } + } + }); + } + private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final RepairOption options) { if (!options.getDataCenters().isEmpty() && options.getDataCenters().contains(DatabaseDescriptor.getLocalDataCenter())) @@ -2631,10 +2707,34 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { protected void runMayThrow() throws Exception { + final TraceState traceState; + + String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]); + Iterable<ColumnFamilyStore> validColumnFamilies = getValidColumnFamilies(false, false, keyspace, columnFamilies); + final long startTime = System.currentTimeMillis(); String message = String.format("Starting repair command #%d, repairing keyspace %s with %s", cmd, keyspace, options); logger.info(message); sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()}); + if (options.isTraced()) + { + StringBuilder cfsb = new StringBuilder(); + for (ColumnFamilyStore cfs : validColumnFamilies) + cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name); + + UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR); + traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies", cfsb.substring(2))); + Tracing.traceRepair(message); + traceState.enableActivityNotification(); + traceState.setNotificationHandle(new int[]{ cmd, ActiveRepairService.Status.RUNNING.ordinal() }); + Thread queryThread = createQueryThread(cmd, sessionId); + queryThread.setName("RepairTracePolling"); + queryThread.start(); + } + else + { + traceState = null; + } final Set<InetAddress> allNeighbors = new HashSet<>(); Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>(); @@ -2656,10 +2756,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // Validate columnfamilies List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(); - String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]); try { - Iterables.addAll(columnFamilyStores, getValidColumnFamilies(false, false, keyspace, columnFamilies)); + Iterables.addAll(columnFamilyStores, validColumnFamilies); } catch (IllegalArgumentException e) { @@ -2760,12 +2859,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private void repairComplete() { - String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, - true, true); + String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, true, true); String message = String.format("Repair command #%d finished in %s", cmd, duration); sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); logger.info(message); + if (options.isTraced()) + { + traceState.setNotificationHandle(null); + // Because DebuggableThreadPoolExecutor#afterExecute and this callback + // run in a nondeterministic order (within the same thread), the + // TraceState may have been nulled out at this point. The TraceState + // should be traceState, so just set it without bothering to check if it + // actually was nulled out. + Tracing.instance.set(traceState); + Tracing.traceRepair(message); + Tracing.instance.stopSession(); + } executor.shutdownNow(); } }); @@ -3774,6 +3884,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return Collections.unmodifiableList(keyspaceNamesList); } + public List<String> getNonSystemKeyspaces() + { + List<String> keyspaceNamesList = new ArrayList<>(Schema.instance.getNonSystemKeyspaces()); + return Collections.unmodifiableList(keyspaceNamesList); + } + public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException { IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 1865f7f..70b2b81 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -386,6 +386,8 @@ public interface StorageServiceMBean extends NotificationEmitter public List<String> getKeyspaces(); + public List<String> getNonSystemKeyspaces(); + /** * Change endpointsnitch class and dynamic-ness (and dynamic attributes) at runtime * @param epSnitchClassName the canonical path name for a class implementing IEndpointSnitch http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index d749481..ea24530 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -737,6 +737,11 @@ public class NodeProbe implements AutoCloseable return ssProxy.getKeyspaces(); } + public List<String> getNonSystemKeyspaces() + { + return ssProxy.getNonSystemKeyspaces(); + } + public String getClusterName() { return ssProxy.getClusterName(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index 612af8a..9314225 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -318,10 +318,15 @@ public class NodeTool protected List<String> parseOptionalKeyspace(List<String> cmdArgs, NodeProbe nodeProbe) { + return parseOptionalKeyspace(cmdArgs, nodeProbe, false); + } + + protected List<String> parseOptionalKeyspace(List<String> cmdArgs, NodeProbe nodeProbe, boolean includeSystemKS) + { List<String> keyspaces = new ArrayList<>(); if (cmdArgs == null || cmdArgs.isEmpty()) - keyspaces.addAll(nodeProbe.getKeyspaces()); + keyspaces.addAll(includeSystemKS ? nodeProbe.getKeyspaces() : nodeProbe.getNonSystemKeyspaces()); else keyspaces.add(cmdArgs.get(0)); @@ -1757,6 +1762,9 @@ public class NodeTool "WARNING: increasing this puts more load on repairing nodes, so be careful. (default: 1, max: 4)") private int numJobThreads = 1; + @Option(title = "trace_repair", name = {"-tr", "--trace"}, description = "Use -tr to trace the repair. Traces are logged to system_traces.events.") + private boolean trace = false; + @Override public void execute(NodeProbe probe) { @@ -1778,6 +1786,7 @@ public class NodeTool options.put(RepairOption.PRIMARY_RANGE_KEY, Boolean.toString(primaryRange)); options.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!fullRepair)); options.put(RepairOption.JOB_THREADS_KEY, Integer.toString(numJobThreads)); + options.put(RepairOption.TRACE_KEY, Boolean.toString(trace)); options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(cfnames, ",")); if (!startToken.isEmpty() || !endToken.isEmpty()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java index 37a013b..5cc3c21 100644 --- a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java +++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java @@ -27,9 +27,9 @@ import org.apache.cassandra.utils.FBUtilities; public class ExpiredTraceState extends TraceState { - public ExpiredTraceState(UUID sessionId) + public ExpiredTraceState(UUID sessionId, Tracing.TraceType traceType) { - super(FBUtilities.getBroadcastAddress(), sessionId); + super(FBUtilities.getBroadcastAddress(), sessionId, traceType); } public int elapsed() http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/tracing/TraceKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java index 4d234bd..72a7c47 100644 --- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java +++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java @@ -39,8 +39,8 @@ public final class TraceKeyspace { public static final String NAME = "system_traces"; - private static final String SESSIONS_TABLE = "sessions"; - private static final String EVENTS_TABLE = "events"; + public static final String SESSIONS_TABLE = "sessions"; + public static final String EVENTS_TABLE = "events"; private static final int DAY = (int) TimeUnit.DAYS.toSeconds(1); @@ -48,6 +48,7 @@ public final class TraceKeyspace compile(SESSIONS_TABLE, "tracing sessions", "CREATE TABLE %s (" + "session_id uuid," + + "command text," + "coordinator inet," + "duration int," + "parameters map<text, text>," @@ -79,38 +80,42 @@ public final class TraceKeyspace return new KSMetaData(NAME, SimpleStrategy.class, ImmutableMap.of("replication_factor", "2"), true, tables); } - static Mutation toStopSessionMutation(ByteBuffer sessionId, int elapsed) + static Mutation toStopSessionMutation(ByteBuffer sessionId, int elapsed, int ttl) { Mutation mutation = new Mutation(NAME, sessionId); ColumnFamily cells = mutation.addOrGet(SessionsTable); - CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros()); + ttl = ttl == DAY ? 0 : ttl; + CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros(), ttl); adder.add("duration", elapsed); return mutation; } - static Mutation toStartSessionMutation(ByteBuffer sessionId, Map<String, String> parameters, String request, long startedAt) + static Mutation toStartSessionMutation(ByteBuffer sessionId, Map<String, String> parameters, String request, long startedAt, String command, int ttl) { Mutation mutation = new Mutation(NAME, sessionId); ColumnFamily cells = mutation.addOrGet(TraceKeyspace.SessionsTable); - CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros()); + ttl = ttl == DAY ? 0 : ttl; + CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros(), ttl); adder.add("coordinator", FBUtilities.getBroadcastAddress()); for (Map.Entry<String, String> entry : parameters.entrySet()) adder.addMapEntry("parameters", entry.getKey(), entry.getValue()); adder.add("request", request); adder.add("started_at", new Date(startedAt)); + adder.add("command", command); return mutation; } - static Mutation toEventMutation(ByteBuffer sessionId, String message, int elapsed, String threadName) + static Mutation toEventMutation(ByteBuffer sessionId, String message, int elapsed, String threadName, int ttl) { Mutation mutation = new Mutation(NAME, sessionId); ColumnFamily cells = mutation.addOrGet(EventsTable); - CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.make(UUIDGen.getTimeUUID()), FBUtilities.timestampMicros()); + ttl = ttl == DAY ? 0 : ttl; + CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.make(UUIDGen.getTimeUUID()), FBUtilities.timestampMicros(), ttl); adder.add("activity", message); adder.add("source", FBUtilities.getBroadcastAddress()); if (elapsed >= 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/tracing/TraceState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java index 04abce3..f7d2741 100644 --- a/src/java/org/apache/cassandra/tracing/TraceState.java +++ b/src/java/org/apache/cassandra/tracing/TraceState.java @@ -28,6 +28,7 @@ import org.slf4j.helpers.MessageFormatter; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.WrappedRunnable; @@ -41,6 +42,20 @@ public class TraceState public final InetAddress coordinator; public final Stopwatch watch; public final ByteBuffer sessionIdBytes; + public final Tracing.TraceType traceType; + public final int ttl; + + private boolean notify; + private Object notificationHandle; + + public enum Status + { + IDLE, + ACTIVE, + STOPPED; + } + + private Status status; // Multiple requests can use the same TraceState at a time, so we need to reference count. // See CASSANDRA-7626 for more details. @@ -48,13 +63,33 @@ public class TraceState public TraceState(InetAddress coordinator, UUID sessionId) { + this(coordinator, sessionId, Tracing.TraceType.QUERY); + } + + public TraceState(InetAddress coordinator, UUID sessionId, Tracing.TraceType traceType) + { assert coordinator != null; assert sessionId != null; this.coordinator = coordinator; this.sessionId = sessionId; sessionIdBytes = ByteBufferUtil.bytes(sessionId); + this.traceType = traceType; + this.ttl = traceType.getTTL(); watch = Stopwatch.createStarted(); + this.status = Status.IDLE; + } + + public void enableActivityNotification() + { + assert traceType == Tracing.TraceType.REPAIR; + notify = true; + } + + public void setNotificationHandle(Object handle) + { + assert traceType == Tracing.TraceType.REPAIR; + notificationHandle = handle; } public int elapsed() @@ -63,6 +98,46 @@ public class TraceState return elapsed < Integer.MAX_VALUE ? (int) elapsed : Integer.MAX_VALUE; } + public synchronized void stop() + { + status = Status.STOPPED; + notifyAll(); + } + + /* + * Returns immediately if there has been trace activity since the last + * call, otherwise waits until there is trace activity, or until the + * timeout expires. + * @param timeout timeout in milliseconds + * @return activity status + */ + public synchronized Status waitActivity(long timeout) + { + if (status == Status.IDLE) + { + try + { + wait(timeout); + } + catch (InterruptedException e) + { + throw new RuntimeException(); + } + } + if (status == Status.ACTIVE) + { + status = Status.IDLE; + return Status.ACTIVE; + } + return status; + } + + private synchronized void notifyActivity() + { + status = Status.ACTIVE; + notifyAll(); + } + public void trace(String format, Object arg) { trace(MessageFormatter.format(format, arg).getMessage()); @@ -80,18 +155,24 @@ public class TraceState public void trace(String message) { - TraceState.trace(sessionIdBytes, message, elapsed()); + if (notify) + notifyActivity(); + + TraceState.trace(sessionIdBytes, message, elapsed(), ttl, notificationHandle); } - public static void trace(final ByteBuffer sessionId, final String message, final int elapsed) + public static void trace(final ByteBuffer sessionId, final String message, final int elapsed, final int ttl, final Object notificationHandle) { final String threadName = Thread.currentThread().getName(); + if (notificationHandle != null) + StorageService.instance.sendNotification("repair", message, notificationHandle); + StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() { public void runMayThrow() { - Tracing.mutateWithCatch(TraceKeyspace.toEventMutation(sessionId, message, elapsed, threadName)); + Tracing.mutateWithCatch(TraceKeyspace.toEventMutation(sessionId, message, elapsed, threadName, ttl)); } }); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/tracing/Tracing.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java index 773ccd4..5e76957 100644 --- a/src/java/org/apache/cassandra/tracing/Tracing.java +++ b/src/java/org/apache/cassandra/tracing/Tracing.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.exceptions.OverloadedException; @@ -43,6 +44,7 @@ import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; + /** * A trace session context. Able to track and store trace sessions. A session is usually a user initiated query, and may * have multiple local and remote events before it is completed. All events and sessions are stored at keyspace. @@ -50,6 +52,38 @@ import org.apache.cassandra.utils.UUIDGen; public class Tracing { public static final String TRACE_HEADER = "TraceSession"; + public static final String TRACE_TYPE = "TraceType"; + public static final String TRACE_TTL = "TraceTTL"; + + public enum TraceType + { + NONE, + QUERY, + REPAIR; + + private static final TraceType[] ALL_VALUES = values(); + + public static TraceType deserialize(byte b) + { + if (b < 0 || ALL_VALUES.length <= b) + return NONE; + return ALL_VALUES[b]; + } + + public static byte serialize(TraceType value) + { + return (byte) value.ordinal(); + } + + private static final int[] TTLS = { DatabaseDescriptor.getTracetypeQueryTTL(), + DatabaseDescriptor.getTracetypeQueryTTL(), + DatabaseDescriptor.getTracetypeRepairTTL() }; + + public int getTTL() + { + return TTLS[ordinal()]; + } + } private static final Logger logger = LoggerFactory.getLogger(Tracing.class); @@ -67,6 +101,18 @@ public class Tracing return state.get().sessionId; } + public TraceType getTraceType() + { + assert isTracing(); + return state.get().traceType; + } + + public int getTTL() + { + assert isTracing(); + return state.get().ttl; + } + /** * Indicates if the current thread's execution is being traced. */ @@ -77,14 +123,24 @@ public class Tracing public UUID newSession() { - return newSession(TimeUUIDType.instance.compose(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()))); + return newSession(TraceType.QUERY); + } + + public UUID newSession(TraceType traceType) + { + return newSession(TimeUUIDType.instance.compose(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())), traceType); } public UUID newSession(UUID sessionId) { + return newSession(sessionId, TraceType.QUERY); + } + + public UUID newSession(UUID sessionId, TraceType traceType) + { assert state.get() == null; - TraceState ts = new TraceState(localAddress, sessionId); + TraceState ts = new TraceState(localAddress, sessionId, traceType); state.set(ts); sessions.put(sessionId, ts); @@ -111,15 +167,17 @@ public class Tracing { final int elapsed = state.elapsed(); final ByteBuffer sessionId = state.sessionIdBytes; + final int ttl = state.ttl; StageManager.getStage(Stage.TRACING).execute(new Runnable() { public void run() { - mutateWithCatch(TraceKeyspace.toStopSessionMutation(sessionId, elapsed)); + mutateWithCatch(TraceKeyspace.toStopSessionMutation(sessionId, elapsed, ttl)); } }); + state.stop(); sessions.remove(state.sessionId); this.state.set(null); } @@ -140,20 +198,25 @@ public class Tracing state.set(tls); } - public void begin(final String request, final Map<String, String> parameters) + public TraceState begin(final String request, final Map<String, String> parameters) { assert isTracing(); + final TraceState state = this.state.get(); final long startedAt = System.currentTimeMillis(); - final ByteBuffer sessionId = state.get().sessionIdBytes; + final ByteBuffer sessionId = state.sessionIdBytes; + final String command = state.traceType.toString(); + final int ttl = state.ttl; StageManager.getStage(Stage.TRACING).execute(new Runnable() { public void run() { - mutateWithCatch(TraceKeyspace.toStartSessionMutation(sessionId, parameters, request, startedAt)); + mutateWithCatch(TraceKeyspace.toStartSessionMutation(sessionId, parameters, request, startedAt, command, ttl)); } }); + + return state; } /** @@ -163,7 +226,7 @@ public class Tracing */ public TraceState initializeFromMessage(final MessageIn<?> message) { - final byte[] sessionBytes = message.parameters.get(Tracing.TRACE_HEADER); + final byte[] sessionBytes = message.parameters.get(TRACE_HEADER); if (sessionBytes == null) return null; @@ -174,19 +237,36 @@ public class Tracing if (ts != null && ts.acquireReference()) return ts; + byte[] tmpBytes; + TraceType traceType = TraceType.QUERY; + if ((tmpBytes = message.parameters.get(TRACE_TYPE)) != null) + traceType = TraceType.deserialize(tmpBytes[0]); + if (message.verb == MessagingService.Verb.REQUEST_RESPONSE) { // received a message for a session we've already closed out. see CASSANDRA-5668 - return new ExpiredTraceState(sessionId); + return new ExpiredTraceState(sessionId, traceType); } else { - ts = new TraceState(message.from, sessionId); + ts = new TraceState(message.from, sessionId, traceType); sessions.put(sessionId, ts); return ts; } } + + // repair just gets a varargs method since it's so heavyweight anyway + public static void traceRepair(String format, Object... args) + { + final TraceState state = instance.get(); + if (state == null) // inline isTracing to avoid implicit two calls to state.get() + return; + + state.trace(format, args); + } + + // normal traces get zero-, one-, and two-argument overloads so common case doesn't need to create varargs array public static void trace(String message) { final TraceState state = instance.get();