Repository: cassandra
Updated Branches:
  refs/heads/trunk 3916e4867 -> f5866ca2b


Add repair tracing

Patch by Ben Chan; reviewed by jmckenzie for CASSANDRA-5483


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f5866ca2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f5866ca2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f5866ca2

Branch: refs/heads/trunk
Commit: f5866ca2bac8ce530bb4e20832ff61e622206815
Parents: 3916e48
Author: Ben Chan <usrbi...@yahoo.com>
Authored: Wed Dec 3 13:15:10 2014 -0600
Committer: Joshua McKenzie <jmcken...@apache.org>
Committed: Wed Dec 3 13:15:10 2014 -0600

----------------------------------------------------------------------
 conf/cassandra.yaml                             |   4 +
 .../DebuggableThreadPoolExecutor.java           |  15 +++
 .../org/apache/cassandra/config/Config.java     |   4 +
 .../cassandra/config/DatabaseDescriptor.java    |  10 ++
 .../org/apache/cassandra/db/CFRowAdder.java     |  13 +-
 .../org/apache/cassandra/net/MessageOut.java    |   7 +-
 .../cassandra/net/OutboundTcpConnection.java    |   4 +-
 .../apache/cassandra/repair/LocalSyncTask.java  |  39 +++++-
 .../apache/cassandra/repair/RemoteSyncTask.java |   5 +-
 .../org/apache/cassandra/repair/RepairJob.java  |   4 +
 .../apache/cassandra/repair/RepairSession.java  |  23 +++-
 .../org/apache/cassandra/repair/SyncTask.java   |   3 +
 .../org/apache/cassandra/repair/Validator.java  |   6 +-
 .../cassandra/repair/messages/RepairOption.java |  18 ++-
 .../cassandra/service/ActiveRepairService.java  |   2 +-
 .../cassandra/service/StorageService.java       | 128 ++++++++++++++++++-
 .../cassandra/service/StorageServiceMBean.java  |   2 +
 .../org/apache/cassandra/tools/NodeProbe.java   |   5 +
 .../org/apache/cassandra/tools/NodeTool.java    |  11 +-
 .../cassandra/tracing/ExpiredTraceState.java    |   4 +-
 .../apache/cassandra/tracing/TraceKeyspace.java |  21 +--
 .../apache/cassandra/tracing/TraceState.java    |  87 ++++++++++++-
 .../org/apache/cassandra/tracing/Tracing.java   |  98 ++++++++++++--
 23 files changed, 465 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 9c4f0b5..f458ed8 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -765,3 +765,7 @@ internode_compression: all
 # reducing overhead from the TCP protocol itself, at the cost of increasing
 # latency if you block for cross-datacenter responses.
 inter_dc_tcp_nodelay: false
+
+# TTL for different trace types used during logging of the repair process.
+tracetype_query_ttl: 86400
+tracetype_repair_ttl: 604800

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java 
b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
index ea04af3..fe6cade 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
@@ -100,6 +100,21 @@ public class DebuggableThreadPoolExecutor extends 
ThreadPoolExecutor implements
     }
 
     /**
+     * Creates a thread pool that creates new threads as needed, but
+     * will reuse previously constructed threads when they are
+     * available.
+     * @param threadPoolName the name of the threads created by this executor
+     * @return The new DebuggableThreadPoolExecutor
+     */
+    public static DebuggableThreadPoolExecutor 
createCachedThreadpoolWithMaxSize(String threadPoolName)
+    {
+        return new DebuggableThreadPoolExecutor(0, Integer.MAX_VALUE,
+                                                60L, TimeUnit.SECONDS,
+                                                new 
SynchronousQueue<Runnable>(),
+                                                new 
NamedThreadFactory(threadPoolName));
+    }
+
+    /**
      * Returns a ThreadPoolExecutor with a fixed number of threads.
      * When all threads are actively executing tasks, new tasks are queued.
      * If (most) threads are expected to be idle most of the time, prefer 
createWithMaxSize() instead.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index ca6276c..8c3021d 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -214,6 +214,10 @@ public class Config
     private static final CsvPreference STANDARD_SURROUNDING_SPACES_NEED_QUOTES 
= new CsvPreference.Builder(CsvPreference.STANDARD_PREFERENCE)
                                                                                
                   .surroundingSpacesNeedQuotes(true).build();
 
+    // TTL for different types of trace events.
+    public Integer tracetype_query_ttl = 60 * 60 * 24;
+    public Integer tracetype_repair_ttl = 60 * 60 * 24 * 7;
+
     public static boolean getOutboundBindAny()
     {
         return outboundBindAny;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index a359cce..c34c6ea 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1601,4 +1601,14 @@ public class DatabaseDescriptor
         String arch = System.getProperty("os.arch");
         return arch.contains("64") || arch.contains("sparcv9");
     }
+
+    public static int getTracetypeRepairTTL()
+    {
+        return conf.tracetype_repair_ttl;
+    }
+
+    public static int getTracetypeQueryTTL()
+    {
+        return conf.tracetype_query_ttl;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/db/CFRowAdder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CFRowAdder.java 
b/src/java/org/apache/cassandra/db/CFRowAdder.java
index 3ff9171..6fab8d5 100644
--- a/src/java/org/apache/cassandra/db/CFRowAdder.java
+++ b/src/java/org/apache/cassandra/db/CFRowAdder.java
@@ -41,13 +41,20 @@ public class CFRowAdder
     public final ColumnFamily cf;
     public final Composite prefix;
     public final long timestamp;
+    public final int ttl;
     private final int ldt;
 
     public CFRowAdder(ColumnFamily cf, Composite prefix, long timestamp)
     {
+        this(cf, prefix, timestamp, 0);
+    }
+
+    public CFRowAdder(ColumnFamily cf, Composite prefix, long timestamp, int 
ttl)
+    {
         this.cf = cf;
         this.prefix = prefix;
         this.timestamp = timestamp;
+        this.ttl = ttl;
         this.ldt = (int) (System.currentTimeMillis() / 1000);
 
         // If a CQL3 table, add the row marker
@@ -103,7 +110,11 @@ public class CFRowAdder
             AbstractType valueType = def.type.isCollection()
                                    ? ((CollectionType) 
def.type).valueComparator()
                                    : def.type;
-            cf.addColumn(new BufferCell(name, value instanceof ByteBuffer ? 
(ByteBuffer)value : valueType.decompose(value), timestamp));
+            ByteBuffer valueBytes = value instanceof ByteBuffer ? 
(ByteBuffer)value : valueType.decompose(value);
+            if (ttl == 0)
+                cf.addColumn(new BufferCell(name, valueBytes, timestamp));
+            else
+                cf.addColumn(new BufferExpiringCell(name, valueBytes, 
timestamp, ttl));
         }
         return this;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java 
b/src/java/org/apache/cassandra/net/MessageOut.java
index 70c4f5c..5193c2b 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
 import static org.apache.cassandra.tracing.Tracing.TRACE_HEADER;
+import static org.apache.cassandra.tracing.Tracing.TRACE_TYPE;
 import static org.apache.cassandra.tracing.Tracing.isTracing;
 
 public class MessageOut<T>
@@ -57,8 +58,10 @@ public class MessageOut<T>
         this(verb,
              payload,
              serializer,
-             isTracing() ? ImmutableMap.of(TRACE_HEADER, 
UUIDGen.decompose(Tracing.instance.getSessionId()))
-                         : Collections.<String, byte[]>emptyMap());
+             isTracing()
+                 ? ImmutableMap.of(TRACE_HEADER, 
UUIDGen.decompose(Tracing.instance.getSessionId()),
+                                   TRACE_TYPE, new byte[] { 
Tracing.TraceType.serialize(Tracing.instance.getTraceType()) })
+                 : Collections.<String, byte[]>emptyMap());
     }
 
     private MessageOut(MessagingService.Verb verb, T payload, 
IVersionedSerializer<T> serializer, Map<String, byte[]> parameters)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index a0ad011..cddce07 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -215,7 +215,9 @@ public class OutboundTcpConnection extends Thread
                 // session may have already finished; see CASSANDRA-5668
                 if (state == null)
                 {
-                    TraceState.trace(ByteBuffer.wrap(sessionBytes), message, 
-1);
+                    byte[] traceTypeBytes = 
qm.message.parameters.get(Tracing.TRACE_TYPE);
+                    Tracing.TraceType traceType = traceTypeBytes == null ? 
Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
+                    TraceState.trace(ByteBuffer.wrap(sessionBytes), message, 
-1, traceType.getTTL(), null);
                 }
                 else
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java 
b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index a43d326..bbb6362 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -26,10 +26,13 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamEvent;
 import org.apache.cassandra.streaming.StreamEventHandler;
 import org.apache.cassandra.streaming.StreamPlan;
 import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -37,6 +40,8 @@ import org.apache.cassandra.utils.FBUtilities;
  */
 public class LocalSyncTask extends SyncTask implements StreamEventHandler
 {
+    private final TraceState state = Tracing.instance.get();
+
     private static final Logger logger = 
LoggerFactory.getLogger(LocalSyncTask.class);
 
     private final long repairedAt;
@@ -58,7 +63,9 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
         InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint : 
r2.endpoint;
         InetAddress preferred = SystemKeyspace.getPreferredIP(dst);
 
-        logger.info(String.format("[repair #%s] Performing streaming repair of 
%d ranges with %s", desc.sessionId, differences.size(), dst));
+        String message = String.format("Performing streaming repair of %d 
ranges with %s", differences.size(), dst);
+        logger.info("[repair #{}] {}", desc.sessionId, message);
+        Tracing.traceRepair(message);
         new StreamPlan("Repair", repairedAt, 1, false).listeners(this)
                                             .flushBeforeTransfer(true)
                                             // request ranges from the remote 
node
@@ -68,11 +75,37 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
                                             .execute();
     }
 
-    public void handleStreamEvent(StreamEvent event) { /* noop */ }
+    public void handleStreamEvent(StreamEvent event)
+    {
+        if (state == null)
+            return;
+        switch (event.eventType)
+        {
+            case STREAM_PREPARED:
+                StreamEvent.SessionPreparedEvent spe = 
(StreamEvent.SessionPreparedEvent) event;
+                state.trace("Streaming session with {} prepared", 
spe.session.peer);
+                break;
+            case STREAM_COMPLETE:
+                StreamEvent.SessionCompleteEvent sce = 
(StreamEvent.SessionCompleteEvent) event;
+                state.trace("Streaming session with {} {}", sce.peer, 
sce.success ? "completed successfully" : "failed");
+                break;
+            case FILE_PROGRESS:
+                ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress;
+                state.trace("{}/{} bytes ({}%) {} idx:{}{}",
+                            new Object[] { pi.currentBytes,
+                                           pi.totalBytes,
+                                           pi.currentBytes * 100 / 
pi.totalBytes,
+                                           pi.direction == 
ProgressInfo.Direction.OUT ? "sent to" : "received from",
+                                           pi.sessionIndex,
+                                           pi.peer });
+        }
+    }
 
     public void onSuccess(StreamState result)
     {
-        logger.info(String.format("[repair #%s] Sync complete between %s and 
%s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily));
+        String message = String.format("Sync complete between %s and %s on 
%s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily);
+        logger.info("[repair #{}] {}", desc.sessionId, message);
+        Tracing.traceRepair(message);
         set(stat);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java 
b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
index ca5c998..ededc40 100644
--- a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.RepairException;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -49,7 +50,9 @@ public class RemoteSyncTask extends SyncTask
     {
         InetAddress local = FBUtilities.getBroadcastAddress();
         SyncRequest request = new SyncRequest(desc, local, r1.endpoint, 
r2.endpoint, differences);
-        logger.info(String.format("[repair #%s] Forwarding streaming repair of 
%d ranges to %s (to be streamed with %s)", desc.sessionId, 
request.ranges.size(), request.src, request.dst));
+        String message = String.format("Forwarding streaming repair of %d 
ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, 
request.dst);
+        logger.info("[repair #{}] {}", desc.sessionId, message);
+        Tracing.traceRepair(message);
         MessagingService.instance().sendOneWay(request.createMessage(), 
request.src);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java 
b/src/java/org/apache/cassandra/repair/RepairJob.java
index 708ee70..5c649af 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -180,6 +181,9 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
      */
     private ListenableFuture<List<TreeResponse>> 
sendValidationRequest(Collection<InetAddress> endpoints)
     {
+        String message = String.format("Requesting merkle trees for %s (to 
%s)", desc.columnFamily, endpoints);
+        logger.info("[repair #{}] {}", desc.sessionId, message);
+        Tracing.traceRepair(message);
         int gcBefore = 
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
         List<ListenableFuture<TreeResponse>> tasks = new 
ArrayList<>(endpoints.size());
         for (InetAddress endpoint : endpoints)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java 
b/src/java/org/apache/cassandra/repair/RepairSession.java
index d9787e2..9a8f645 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -30,11 +30,12 @@ import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
+import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
 import org.apache.cassandra.utils.Pair;
@@ -99,7 +100,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
     private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, RemoteSyncTask> 
syncingTasks = new ConcurrentHashMap<>();
 
     // Tasks(snapshot, validate request, differencing, ...) are run on 
taskExecutor
-    private final ListeningExecutorService taskExecutor = 
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new 
NamedThreadFactory("RepairJobTask")));
+    private final ListeningExecutorService taskExecutor = 
MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask"));
 
     private volatile boolean terminated = false;
 
@@ -172,7 +173,9 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
             return;
         }
 
-        logger.info(String.format("[repair #%s] Received merkle tree for %s 
from %s", getId(), desc.columnFamily, endpoint));
+        String message = String.format("Received merkle tree for %s from %s", 
desc.columnFamily, endpoint);
+        logger.info("[repair #{}] {}", getId(), message);
+        Tracing.traceRepair(message);
         task.treeReceived(tree);
     }
 
@@ -215,14 +218,17 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
      */
     public void start(ListeningExecutorService executor)
     {
+        String message;
         if (terminated)
             return;
 
         logger.info(String.format("[repair #%s] new session: will sync %s on 
range %s for %s.%s", getId(), repairedNodes(), range, keyspace, 
Arrays.toString(cfnames)));
+        Tracing.traceRepair("Syncing range {}", range);
 
         if (endpoints.isEmpty())
         {
-            logger.info(String.format("[repair #%s] No neighbors to repair 
with on range %s: session completed", getId(), range));
+            logger.info("[repair #{}] {}", getId(), message = 
String.format("No neighbors to repair with on range %s: session completed", 
range));
+            Tracing.traceRepair(message);
             set(new RepairSessionResult(id, keyspace, range, 
Lists.<RepairResult>newArrayList()));
             return;
         }
@@ -232,7 +238,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
         {
             if (!FailureDetector.instance.isAlive(endpoint))
             {
-                String message = String.format("Cannot proceed on repair 
because a neighbor (%s) is dead: session failed", endpoint);
+                message = String.format("Cannot proceed on repair because a 
neighbor (%s) is dead: session failed", endpoint);
                 logger.error("[repair #{}] {}", getId(), message);
                 setException(new IOException(message));
                 return;
@@ -254,8 +260,10 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
             public void onSuccess(List<RepairResult> results)
             {
                 // this repair session is completed
-                logger.info(String.format("[repair #%s] session completed 
successfully", getId()));
+                logger.info("[repair #{}] {}", getId(), "Session completed 
successfully");
+                Tracing.traceRepair("Completed sync of range {}", range);
                 set(new RepairSessionResult(id, keyspace, range, results));
+
                 taskExecutor.shutdown();
                 // mark this session as terminated
                 terminate();
@@ -263,7 +271,8 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
 
             public void onFailure(Throwable t)
             {
-                logger.error("Repair job failed", t);
+                logger.error(String.format("[repair #%s] Session completed 
with the following error", getId()), t);
+                Tracing.traceRepair("Session completed with the following 
error: {}", t);
                 setException(t);
             }
         });

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/SyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java 
b/src/java/org/apache/cassandra/repair/SyncTask.java
index 3ce5532..7350a66 100644
--- a/src/java/org/apache/cassandra/repair/SyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SyncTask.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.MerkleTree;
 
 /**
@@ -65,12 +66,14 @@ public abstract class SyncTask extends 
AbstractFuture<SyncStat> implements Runna
         if (differences.isEmpty())
         {
             logger.info(String.format(format, "are consistent"));
+            Tracing.traceRepair("Endpoint {} is consistent with {} for {}", 
r1.endpoint, r2.endpoint, desc.columnFamily);
             set(stat);
             return;
         }
 
         // non-0 difference: perform streaming repair
         logger.info(String.format(format, "have " + differences.size() + " 
range(s) out of sync"));
+        Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} 
for {}", r1.endpoint, differences.size(), r2.endpoint, desc.columnFamily);
         startSync(differences);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java 
b/src/java/org/apache/cassandra/repair/Validator.java
index 641717e..972afd6 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
 
@@ -241,7 +242,10 @@ public class Validator implements Runnable
     {
         // respond to the request that triggered this validation
         if (!initiator.equals(FBUtilities.getBroadcastAddress()))
-            logger.info(String.format("[repair #%s] Sending completed merkle 
tree to %s for %s/%s", desc.sessionId, initiator, desc.keyspace, 
desc.columnFamily));
+        {
+            logger.info(String.format("[repair #%s] Sending completed merkle 
tree to %s for %s.%s", desc.sessionId, initiator, desc.keyspace, 
desc.columnFamily));
+            Tracing.traceRepair("Sending completed merkle tree to {} for 
{}.{}", initiator, desc.keyspace, desc.columnFamily);
+        }
         MessagingService.instance().sendOneWay(new ValidationComplete(desc, 
tree).createMessage(), initiator);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java 
b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index d11f94f..5987aed 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -40,6 +40,7 @@ public class RepairOption
     public static final String COLUMNFAMILIES_KEY = "columnFamilies";
     public static final String DATACENTERS_KEY = "dataCenters";
     public static final String HOSTS_KEY = "hosts";
+    public static final String TRACE_KEY = "trace";
 
     // we don't want to push nodes too much for repair
     public static final int MAX_JOB_THREADS = 4;
@@ -76,6 +77,11 @@ public class RepairOption
      *             <td>false</td>
      *         </tr>
      *         <tr>
+     *             <td>trace</td>
+     *             <td>"true" if repair is traced.</td>
+     *             <td>false</td>
+     *         </tr>
+     *         <tr>
      *             <td>jobThreads</td>
      *             <td>Number of threads to use to run repair job.</td>
      *             <td>1</td>
@@ -117,6 +123,7 @@ public class RepairOption
         RepairParallelism parallelism = 
RepairParallelism.fromName(options.get(PARALLELISM_KEY));
         boolean primaryRange = 
Boolean.parseBoolean(options.get(PRIMARY_RANGE_KEY));
         boolean incremental = 
Boolean.parseBoolean(options.get(INCREMENTAL_KEY));
+        boolean trace = Boolean.parseBoolean(options.get(TRACE_KEY));
 
         int jobThreads = 1;
         if (options.containsKey(JOB_THREADS_KEY))
@@ -146,7 +153,7 @@ public class RepairOption
             }
         }
 
-        RepairOption option = new RepairOption(parallelism, primaryRange, 
incremental, jobThreads, ranges);
+        RepairOption option = new RepairOption(parallelism, primaryRange, 
incremental, trace, jobThreads, ranges);
 
         // data centers
         String dataCentersStr = options.get(DATACENTERS_KEY);
@@ -203,6 +210,7 @@ public class RepairOption
     private final RepairParallelism parallelism;
     private final boolean primaryRange;
     private final boolean incremental;
+    private final boolean trace;
     private final int jobThreads;
 
     private final Collection<String> columnFamilies = new HashSet<>();
@@ -210,11 +218,12 @@ public class RepairOption
     private final Collection<String> hosts = new HashSet<>();
     private final Collection<Range<Token>> ranges = new HashSet<>();
 
-    public RepairOption(RepairParallelism parallelism, boolean primaryRange, 
boolean incremental, int jobThreads, Collection<Range<Token>> ranges)
+    public RepairOption(RepairParallelism parallelism, boolean primaryRange, 
boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> 
ranges)
     {
         this.parallelism = parallelism;
         this.primaryRange = primaryRange;
         this.incremental = incremental;
+        this.trace = trace;
         this.jobThreads = jobThreads;
         this.ranges.addAll(ranges);
     }
@@ -234,6 +243,11 @@ public class RepairOption
         return incremental;
     }
 
+    public boolean isTraced()
+    {
+        return trace;
+    }
+
     public int getJobThreads()
     {
         return jobThreads;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 252bcd1..e4b7fff 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -79,7 +79,7 @@ public class ActiveRepairService
 
     public static enum Status
     {
-        STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED
+        STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED, RUNNING
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index cf2152b..4ec23a6 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -55,6 +55,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.auth.Auth;
 import org.apache.cassandra.concurrent.*;
 import org.apache.cassandra.config.*;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
@@ -87,6 +91,9 @@ import org.apache.cassandra.thrift.EndpointDetails;
 import org.apache.cassandra.thrift.TokenRange;
 import org.apache.cassandra.thrift.cassandraConstants;
 import org.apache.cassandra.tracing.TraceKeyspace;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.*;
 
 import static java.nio.charset.StandardCharsets.ISO_8859_1;
@@ -2474,7 +2481,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             parallelismDegree = RepairParallelism.PARALLEL;
         }
 
-        RepairOption options = new RepairOption(parallelismDegree, 
primaryRange, !fullRepair, 1, Collections.<Range<Token>>emptyList());
+        RepairOption options = new RepairOption(parallelismDegree, 
primaryRange, !fullRepair, false, 1, Collections.<Range<Token>>emptyList());
         if (dataCenters != null)
         {
             options.getDataCenters().addAll(dataCenters);
@@ -2536,7 +2543,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         }
         Collection<Range<Token>> repairingRange = 
createRepairRangeFrom(beginToken, endToken);
 
-        RepairOption options = new RepairOption(parallelismDegree, false, 
!fullRepair, 1, repairingRange);
+        RepairOption options = new RepairOption(parallelismDegree, false, 
!fullRepair, false, 1, repairingRange);
         options.getDataCenters().addAll(dataCenters);
         if (hosts != null)
         {
@@ -2620,6 +2627,75 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return cmd;
     }
 
+    private Thread createQueryThread(final int cmd, final UUID sessionId)
+    {
+        return new Thread(new WrappedRunnable()
+        {
+            // Query events within a time interval that overlaps the last by 
one second. Ignore duplicates. Ignore local traces.
+            // Wake up upon local trace activity. Query when notified of trace 
activity with a timeout that doubles every two timeouts.
+            public void runMayThrow() throws Exception
+            {
+                TraceState state = Tracing.instance.get(sessionId);
+                if (state == null)
+                    throw new Exception("no tracestate");
+
+                String format = "select event_id, source, activity from %s.%s 
where session_id = ? and event_id > ? and event_id < ?;";
+                String query = String.format(format, TraceKeyspace.NAME, 
TraceKeyspace.EVENTS_TABLE);
+                SelectStatement statement = (SelectStatement) 
QueryProcessor.parseStatement(query).prepare().statement;
+
+                ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
+                InetAddress source = FBUtilities.getBroadcastAddress();
+
+                HashSet<UUID>[] seen = new HashSet[] { new HashSet<UUID>(), 
new HashSet<UUID>() };
+                int si = 0;
+                UUID uuid;
+
+                long tlast = System.currentTimeMillis(), tcur;
+
+                TraceState.Status status;
+                long minWaitMillis = 125;
+                long maxWaitMillis = 1000 * 1024L;
+                long timeout = minWaitMillis;
+                boolean shouldDouble = false;
+
+                while ((status = state.waitActivity(timeout)) != 
TraceState.Status.STOPPED)
+                {
+                    if (status == TraceState.Status.IDLE)
+                    {
+                        timeout = shouldDouble ? Math.min(timeout * 2, 
maxWaitMillis) : timeout;
+                        shouldDouble = !shouldDouble;
+                    }
+                    else
+                    {
+                        timeout = minWaitMillis;
+                        shouldDouble = false;
+                    }
+                    ByteBuffer tminBytes = 
ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast - 1000));
+                    ByteBuffer tmaxBytes = 
ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur = System.currentTimeMillis()));
+                    QueryOptions options = 
QueryOptions.forInternalCalls(ConsistencyLevel.ONE, 
Lists.newArrayList(sessionIdBytes, tminBytes, tmaxBytes));
+                    ResultMessage.Rows rows = 
statement.execute(QueryState.forInternalCalls(), options);
+                    UntypedResultSet result = 
UntypedResultSet.create(rows.result);
+
+                    for (UntypedResultSet.Row r : result)
+                    {
+                        if (source.equals(r.getInetAddress("source")))
+                            continue;
+                        if ((uuid = r.getUUID("event_id")).timestamp() > (tcur 
- 1000) * 10000)
+                            seen[si].add(uuid);
+                        if (seen[si == 0 ? 1 : 0].contains(uuid))
+                            continue;
+                        String message = String.format("%s: %s", 
r.getInetAddress("source"), r.getString("activity"));
+                        sendNotification("repair", message, new int[]{cmd, 
ActiveRepairService.Status.RUNNING.ordinal()});
+                    }
+                    tlast = tcur;
+
+                    si = si == 0 ? 1 : 0;
+                    seen[si].clear();
+                }
+            }
+        });
+    }
+
     private FutureTask<Object> createRepairTask(final int cmd, final String 
keyspace, final RepairOption options)
     {
         if (!options.getDataCenters().isEmpty() && 
options.getDataCenters().contains(DatabaseDescriptor.getLocalDataCenter()))
@@ -2631,10 +2707,34 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         {
             protected void runMayThrow() throws Exception
             {
+                final TraceState traceState;
+
+                String[] columnFamilies = 
options.getColumnFamilies().toArray(new 
String[options.getColumnFamilies().size()]);
+                Iterable<ColumnFamilyStore> validColumnFamilies = 
getValidColumnFamilies(false, false, keyspace, columnFamilies);
+
                 final long startTime = System.currentTimeMillis();
                 String message = String.format("Starting repair command #%d, 
repairing keyspace %s with %s", cmd, keyspace, options);
                 logger.info(message);
                 sendNotification("repair", message, new int[]{cmd, 
ActiveRepairService.Status.STARTED.ordinal()});
+                if (options.isTraced())
+                {
+                    StringBuilder cfsb = new StringBuilder();
+                    for (ColumnFamilyStore cfs : validColumnFamilies)
+                        cfsb.append(", 
").append(cfs.keyspace.getName()).append(".").append(cfs.name);
+
+                    UUID sessionId = 
Tracing.instance.newSession(Tracing.TraceType.REPAIR);
+                    traceState = Tracing.instance.begin("repair", 
ImmutableMap.of("keyspace", keyspace, "columnFamilies", cfsb.substring(2)));
+                    Tracing.traceRepair(message);
+                    traceState.enableActivityNotification();
+                    traceState.setNotificationHandle(new int[]{ cmd, 
ActiveRepairService.Status.RUNNING.ordinal() });
+                    Thread queryThread = createQueryThread(cmd, sessionId);
+                    queryThread.setName("RepairTracePolling");
+                    queryThread.start();
+                }
+                else
+                {
+                    traceState = null;
+                }
 
                 final Set<InetAddress> allNeighbors = new HashSet<>();
                 Map<Range, Set<InetAddress>> rangeToNeighbors = new 
HashMap<>();
@@ -2656,10 +2756,9 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
                 // Validate columnfamilies
                 List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
-                String[] columnFamilies = 
options.getColumnFamilies().toArray(new 
String[options.getColumnFamilies().size()]);
                 try
                 {
-                    Iterables.addAll(columnFamilyStores, 
getValidColumnFamilies(false, false, keyspace, columnFamilies));
+                    Iterables.addAll(columnFamilyStores, validColumnFamilies);
                 }
                 catch (IllegalArgumentException e)
                 {
@@ -2760,12 +2859,23 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
                     private void repairComplete()
                     {
-                        String duration = 
DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
-                                                                               
   true, true);
+                        String duration = 
DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, 
true, true);
                         String message = String.format("Repair command #%d 
finished in %s", cmd, duration);
                         sendNotification("repair", message,
                                          new int[]{cmd, 
ActiveRepairService.Status.FINISHED.ordinal()});
                         logger.info(message);
+                        if (options.isTraced())
+                        {
+                            traceState.setNotificationHandle(null);
+                            // Because 
DebuggableThreadPoolExecutor#afterExecute and this callback
+                            // run in a nondeterministic order (within the 
same thread), the
+                            // TraceState may have been nulled out at this 
point. The TraceState
+                            // should be traceState, so just set it without 
bothering to check if it
+                            // actually was nulled out.
+                            Tracing.instance.set(traceState);
+                            Tracing.traceRepair(message);
+                            Tracing.instance.stopSession();
+                        }
                         executor.shutdownNow();
                     }
                 });
@@ -3774,6 +3884,12 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return Collections.unmodifiableList(keyspaceNamesList);
     }
 
+    public List<String> getNonSystemKeyspaces()
+    {
+        List<String> keyspaceNamesList = new 
ArrayList<>(Schema.instance.getNonSystemKeyspaces());
+        return Collections.unmodifiableList(keyspaceNamesList);
+    }
+
     public void updateSnitch(String epSnitchClassName, Boolean dynamic, 
Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double 
dynamicBadnessThreshold) throws ClassNotFoundException
     {
         IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 1865f7f..70b2b81 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -386,6 +386,8 @@ public interface StorageServiceMBean extends 
NotificationEmitter
 
     public List<String> getKeyspaces();
 
+    public List<String> getNonSystemKeyspaces();
+
     /**
      * Change endpointsnitch class and dynamic-ness (and dynamic attributes) 
at runtime
      * @param epSnitchClassName        the canonical path name for a class 
implementing IEndpointSnitch

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index d749481..ea24530 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -737,6 +737,11 @@ public class NodeProbe implements AutoCloseable
         return ssProxy.getKeyspaces();
     }
 
+    public List<String> getNonSystemKeyspaces()
+    {
+        return ssProxy.getNonSystemKeyspaces();
+    }
+
     public String getClusterName()
     {
         return ssProxy.getClusterName();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java 
b/src/java/org/apache/cassandra/tools/NodeTool.java
index 612af8a..9314225 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -318,10 +318,15 @@ public class NodeTool
 
         protected List<String> parseOptionalKeyspace(List<String> cmdArgs, 
NodeProbe nodeProbe)
         {
+            return parseOptionalKeyspace(cmdArgs, nodeProbe, false);
+        }
+
+        protected List<String> parseOptionalKeyspace(List<String> cmdArgs, 
NodeProbe nodeProbe, boolean includeSystemKS)
+        {
             List<String> keyspaces = new ArrayList<>();
 
             if (cmdArgs == null || cmdArgs.isEmpty())
-                keyspaces.addAll(nodeProbe.getKeyspaces());
+                keyspaces.addAll(includeSystemKS ? nodeProbe.getKeyspaces() : 
nodeProbe.getNonSystemKeyspaces());
             else
                 keyspaces.add(cmdArgs.get(0));
 
@@ -1757,6 +1762,9 @@ public class NodeTool
                                                                                
      "WARNING: increasing this puts more load on repairing nodes, so be 
careful. (default: 1, max: 4)")
         private int numJobThreads = 1;
 
+        @Option(title = "trace_repair", name = {"-tr", "--trace"}, description 
= "Use -tr to trace the repair. Traces are logged to system_traces.events.")
+        private boolean trace = false;
+
         @Override
         public void execute(NodeProbe probe)
         {
@@ -1778,6 +1786,7 @@ public class NodeTool
                 options.put(RepairOption.PRIMARY_RANGE_KEY, 
Boolean.toString(primaryRange));
                 options.put(RepairOption.INCREMENTAL_KEY, 
Boolean.toString(!fullRepair));
                 options.put(RepairOption.JOB_THREADS_KEY, 
Integer.toString(numJobThreads));
+                options.put(RepairOption.TRACE_KEY, Boolean.toString(trace));
                 options.put(RepairOption.COLUMNFAMILIES_KEY, 
StringUtils.join(cfnames, ","));
                 if (!startToken.isEmpty() || !endToken.isEmpty())
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java 
b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
index 37a013b..5cc3c21 100644
--- a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
+++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
@@ -27,9 +27,9 @@ import org.apache.cassandra.utils.FBUtilities;
 
 public class ExpiredTraceState extends TraceState
 {
-    public ExpiredTraceState(UUID sessionId)
+    public ExpiredTraceState(UUID sessionId, Tracing.TraceType traceType)
     {
-        super(FBUtilities.getBroadcastAddress(), sessionId);
+        super(FBUtilities.getBroadcastAddress(), sessionId, traceType);
     }
 
     public int elapsed()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java 
b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
index 4d234bd..72a7c47 100644
--- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
+++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
@@ -39,8 +39,8 @@ public final class TraceKeyspace
 {
     public static final String NAME = "system_traces";
 
-    private static final String SESSIONS_TABLE = "sessions";
-    private static final String EVENTS_TABLE = "events";
+    public static final String SESSIONS_TABLE = "sessions";
+    public static final String EVENTS_TABLE = "events";
 
     private static final int DAY = (int) TimeUnit.DAYS.toSeconds(1);
 
@@ -48,6 +48,7 @@ public final class TraceKeyspace
         compile(SESSIONS_TABLE, "tracing sessions",
                 "CREATE TABLE %s ("
                 + "session_id uuid,"
+                + "command text,"
                 + "coordinator inet,"
                 + "duration int,"
                 + "parameters map<text, text>,"
@@ -79,38 +80,42 @@ public final class TraceKeyspace
         return new KSMetaData(NAME, SimpleStrategy.class, 
ImmutableMap.of("replication_factor", "2"), true, tables);
     }
 
-    static Mutation toStopSessionMutation(ByteBuffer sessionId, int elapsed)
+    static Mutation toStopSessionMutation(ByteBuffer sessionId, int elapsed, 
int ttl)
     {
         Mutation mutation = new Mutation(NAME, sessionId);
         ColumnFamily cells = mutation.addOrGet(SessionsTable);
 
-        CFRowAdder adder = new CFRowAdder(cells, 
cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros());
+        ttl = ttl == DAY ? 0 : ttl;
+        CFRowAdder adder = new CFRowAdder(cells, 
cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros(), 
ttl);
         adder.add("duration", elapsed);
 
         return mutation;
     }
 
-    static Mutation toStartSessionMutation(ByteBuffer sessionId, Map<String, 
String> parameters, String request, long startedAt)
+    static Mutation toStartSessionMutation(ByteBuffer sessionId, Map<String, 
String> parameters, String request, long startedAt, String command, int ttl)
     {
         Mutation mutation = new Mutation(NAME, sessionId);
         ColumnFamily cells = mutation.addOrGet(TraceKeyspace.SessionsTable);
 
-        CFRowAdder adder = new CFRowAdder(cells, 
cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros());
+        ttl = ttl == DAY ? 0 : ttl;
+        CFRowAdder adder = new CFRowAdder(cells, 
cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros(), 
ttl);
         adder.add("coordinator", FBUtilities.getBroadcastAddress());
         for (Map.Entry<String, String> entry : parameters.entrySet())
             adder.addMapEntry("parameters", entry.getKey(), entry.getValue());
         adder.add("request", request);
         adder.add("started_at", new Date(startedAt));
+        adder.add("command", command);
 
         return mutation;
     }
 
-    static Mutation toEventMutation(ByteBuffer sessionId, String message, int 
elapsed, String threadName)
+    static Mutation toEventMutation(ByteBuffer sessionId, String message, int 
elapsed, String threadName, int ttl)
     {
         Mutation mutation = new Mutation(NAME, sessionId);
         ColumnFamily cells = mutation.addOrGet(EventsTable);
 
-        CFRowAdder adder = new CFRowAdder(cells, 
cells.metadata().comparator.make(UUIDGen.getTimeUUID()), 
FBUtilities.timestampMicros());
+        ttl = ttl == DAY ? 0 : ttl;
+        CFRowAdder adder = new CFRowAdder(cells, 
cells.metadata().comparator.make(UUIDGen.getTimeUUID()), 
FBUtilities.timestampMicros(), ttl);
         adder.add("activity", message);
         adder.add("source", FBUtilities.getBroadcastAddress());
         if (elapsed >= 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java 
b/src/java/org/apache/cassandra/tracing/TraceState.java
index 04abce3..f7d2741 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -28,6 +28,7 @@ import org.slf4j.helpers.MessageFormatter;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.WrappedRunnable;
 
@@ -41,6 +42,20 @@ public class TraceState
     public final InetAddress coordinator;
     public final Stopwatch watch;
     public final ByteBuffer sessionIdBytes;
+    public final Tracing.TraceType traceType;
+    public final int ttl;
+
+    private boolean notify;
+    private Object notificationHandle;
+
+    public enum Status
+    {
+        IDLE,
+        ACTIVE,
+        STOPPED;
+    }
+
+    private Status status;
 
     // Multiple requests can use the same TraceState at a time, so we need to 
reference count.
     // See CASSANDRA-7626 for more details.
@@ -48,13 +63,33 @@ public class TraceState
 
     public TraceState(InetAddress coordinator, UUID sessionId)
     {
+        this(coordinator, sessionId, Tracing.TraceType.QUERY);
+    }
+
+    public TraceState(InetAddress coordinator, UUID sessionId, 
Tracing.TraceType traceType)
+    {
         assert coordinator != null;
         assert sessionId != null;
 
         this.coordinator = coordinator;
         this.sessionId = sessionId;
         sessionIdBytes = ByteBufferUtil.bytes(sessionId);
+        this.traceType = traceType;
+        this.ttl = traceType.getTTL();
         watch = Stopwatch.createStarted();
+        this.status = Status.IDLE;
+    }
+
+    public void enableActivityNotification()
+    {
+        assert traceType == Tracing.TraceType.REPAIR;
+        notify = true;
+    }
+
+    public void setNotificationHandle(Object handle)
+    {
+        assert traceType == Tracing.TraceType.REPAIR;
+        notificationHandle = handle;
     }
 
     public int elapsed()
@@ -63,6 +98,46 @@ public class TraceState
         return elapsed < Integer.MAX_VALUE ? (int) elapsed : Integer.MAX_VALUE;
     }
 
+    public synchronized void stop()
+    {
+        status = Status.STOPPED;
+        notifyAll();
+    }
+
+    /*
+     * Returns immediately if there has been trace activity since the last
+     * call, otherwise waits until there is trace activity, or until the
+     * timeout expires.
+     * @param timeout timeout in milliseconds
+     * @return activity status
+     */
+    public synchronized Status waitActivity(long timeout)
+    {
+        if (status == Status.IDLE)
+        {
+            try
+            {
+                wait(timeout);
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException();
+            }
+        }
+        if (status == Status.ACTIVE)
+        {
+            status = Status.IDLE;
+            return Status.ACTIVE;
+        }
+        return status;
+    }
+
+    private synchronized void notifyActivity()
+    {
+        status = Status.ACTIVE;
+        notifyAll();
+    }
+
     public void trace(String format, Object arg)
     {
         trace(MessageFormatter.format(format, arg).getMessage());
@@ -80,18 +155,24 @@ public class TraceState
 
     public void trace(String message)
     {
-        TraceState.trace(sessionIdBytes, message, elapsed());
+        if (notify)
+            notifyActivity();
+
+        TraceState.trace(sessionIdBytes, message, elapsed(), ttl, 
notificationHandle);
     }
 
-    public static void trace(final ByteBuffer sessionId, final String message, 
final int elapsed)
+    public static void trace(final ByteBuffer sessionId, final String message, 
final int elapsed, final int ttl, final Object notificationHandle)
     {
         final String threadName = Thread.currentThread().getName();
 
+        if (notificationHandle != null)
+            StorageService.instance.sendNotification("repair", message, 
notificationHandle);
+
         StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
         {
             public void runMayThrow()
             {
-                
Tracing.mutateWithCatch(TraceKeyspace.toEventMutation(sessionId, message, 
elapsed, threadName));
+                
Tracing.mutateWithCatch(TraceKeyspace.toEventMutation(sessionId, message, 
elapsed, threadName, ttl));
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java 
b/src/java/org/apache/cassandra/tracing/Tracing.java
index 773ccd4..5e76957 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.exceptions.OverloadedException;
@@ -43,6 +44,7 @@ import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
+
 /**
  * A trace session context. Able to track and store trace sessions. A session 
is usually a user initiated query, and may
  * have multiple local and remote events before it is completed. All events 
and sessions are stored at keyspace.
@@ -50,6 +52,38 @@ import org.apache.cassandra.utils.UUIDGen;
 public class Tracing
 {
     public static final String TRACE_HEADER = "TraceSession";
+    public static final String TRACE_TYPE = "TraceType";
+    public static final String TRACE_TTL = "TraceTTL";
+
+    public enum TraceType
+    {
+        NONE,
+        QUERY,
+        REPAIR;
+
+        private static final TraceType[] ALL_VALUES = values();
+
+        public static TraceType deserialize(byte b)
+        {
+            if (b < 0 || ALL_VALUES.length <= b)
+                return NONE;
+            return ALL_VALUES[b];
+        }
+
+        public static byte serialize(TraceType value)
+        {
+            return (byte) value.ordinal();
+        }
+
+        private static final int[] TTLS = { 
DatabaseDescriptor.getTracetypeQueryTTL(),
+                                            
DatabaseDescriptor.getTracetypeQueryTTL(),
+                                            
DatabaseDescriptor.getTracetypeRepairTTL() };
+
+        public int getTTL()
+        {
+            return TTLS[ordinal()];
+        }
+    }
 
     private static final Logger logger = 
LoggerFactory.getLogger(Tracing.class);
 
@@ -67,6 +101,18 @@ public class Tracing
         return state.get().sessionId;
     }
 
+    public TraceType getTraceType()
+    {
+        assert isTracing();
+        return state.get().traceType;
+    }
+
+    public int getTTL()
+    {
+        assert isTracing();
+        return state.get().ttl;
+    }
+
     /**
      * Indicates if the current thread's execution is being traced.
      */
@@ -77,14 +123,24 @@ public class Tracing
 
     public UUID newSession()
     {
-        return 
newSession(TimeUUIDType.instance.compose(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())));
+        return newSession(TraceType.QUERY);
+    }
+
+    public UUID newSession(TraceType traceType)
+    {
+        return 
newSession(TimeUUIDType.instance.compose(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())),
 traceType);
     }
 
     public UUID newSession(UUID sessionId)
     {
+        return newSession(sessionId, TraceType.QUERY);
+    }
+
+    public UUID newSession(UUID sessionId, TraceType traceType)
+    {
         assert state.get() == null;
 
-        TraceState ts = new TraceState(localAddress, sessionId);
+        TraceState ts = new TraceState(localAddress, sessionId, traceType);
         state.set(ts);
         sessions.put(sessionId, ts);
 
@@ -111,15 +167,17 @@ public class Tracing
         {
             final int elapsed = state.elapsed();
             final ByteBuffer sessionId = state.sessionIdBytes;
+            final int ttl = state.ttl;
 
             StageManager.getStage(Stage.TRACING).execute(new Runnable()
             {
                 public void run()
                 {
-                    
mutateWithCatch(TraceKeyspace.toStopSessionMutation(sessionId, elapsed));
+                    
mutateWithCatch(TraceKeyspace.toStopSessionMutation(sessionId, elapsed, ttl));
                 }
             });
 
+            state.stop();
             sessions.remove(state.sessionId);
             this.state.set(null);
         }
@@ -140,20 +198,25 @@ public class Tracing
         state.set(tls);
     }
 
-    public void begin(final String request, final Map<String, String> 
parameters)
+    public TraceState begin(final String request, final Map<String, String> 
parameters)
     {
         assert isTracing();
 
+        final TraceState state = this.state.get();
         final long startedAt = System.currentTimeMillis();
-        final ByteBuffer sessionId = state.get().sessionIdBytes;
+        final ByteBuffer sessionId = state.sessionIdBytes;
+        final String command = state.traceType.toString();
+        final int ttl = state.ttl;
 
         StageManager.getStage(Stage.TRACING).execute(new Runnable()
         {
             public void run()
             {
-                
mutateWithCatch(TraceKeyspace.toStartSessionMutation(sessionId, parameters, 
request, startedAt));
+                
mutateWithCatch(TraceKeyspace.toStartSessionMutation(sessionId, parameters, 
request, startedAt, command, ttl));
             }
         });
+
+        return state;
     }
 
     /**
@@ -163,7 +226,7 @@ public class Tracing
      */
     public TraceState initializeFromMessage(final MessageIn<?> message)
     {
-        final byte[] sessionBytes = 
message.parameters.get(Tracing.TRACE_HEADER);
+        final byte[] sessionBytes = message.parameters.get(TRACE_HEADER);
 
         if (sessionBytes == null)
             return null;
@@ -174,19 +237,36 @@ public class Tracing
         if (ts != null && ts.acquireReference())
             return ts;
 
+        byte[] tmpBytes;
+        TraceType traceType = TraceType.QUERY;
+        if ((tmpBytes = message.parameters.get(TRACE_TYPE)) != null)
+            traceType = TraceType.deserialize(tmpBytes[0]);
+
         if (message.verb == MessagingService.Verb.REQUEST_RESPONSE)
         {
             // received a message for a session we've already closed out.  see 
CASSANDRA-5668
-            return new ExpiredTraceState(sessionId);
+            return new ExpiredTraceState(sessionId, traceType);
         }
         else
         {
-            ts = new TraceState(message.from, sessionId);
+            ts = new TraceState(message.from, sessionId, traceType);
             sessions.put(sessionId, ts);
             return ts;
         }
     }
 
+
+    // repair just gets a varargs method since it's so heavyweight anyway
+    public static void traceRepair(String format, Object... args)
+    {
+        final TraceState state = instance.get();
+        if (state == null) // inline isTracing to avoid implicit two calls to 
state.get()
+            return;
+
+        state.trace(format, args);
+    }
+
+    // normal traces get zero-, one-, and two-argument overloads so common 
case doesn't need to create varargs array
     public static void trace(String message)
     {
         final TraceState state = instance.get();

Reply via email to