Repository: cassandra
Updated Branches:
  refs/heads/trunk c94a9f236 -> bf25e668f


Add support for custom tracing implementations

Patch by Mick Semb Wever; reviewed by tjake for CASSANDRA-10392


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bf25e668
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bf25e668
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bf25e668

Branch: refs/heads/trunk
Commit: bf25e668f416b1c279986d1b23fee2a0192d8022
Parents: c94a9f2
Author: Mick Semb Wever <m...@apache.org>
Authored: Wed Feb 24 10:26:44 2016 -0500
Committer: T Jake Luciani <j...@apache.org>
Committed: Wed Feb 24 13:28:18 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/net/MessageOut.java    |   7 +-
 .../cassandra/net/OutboundTcpConnection.java    |   2 +-
 .../apache/cassandra/service/QueryState.java    |  12 +-
 .../cassandra/tracing/ExpiredTraceState.java    |  24 ++-
 .../apache/cassandra/tracing/TraceState.java    |  61 +------
 .../cassandra/tracing/TraceStateImpl.java       |  74 ++++++++
 .../org/apache/cassandra/tracing/Tracing.java   | 105 ++++++-----
 .../apache/cassandra/tracing/TracingImpl.java   |  89 ++++++++++
 .../transport/messages/ExecuteMessage.java      |   2 +-
 .../apache/cassandra/tracing/TracingTest.java   | 173 +++++++++++++++++++
 11 files changed, 437 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf25e668/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 50a298e..bc21818 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.4
+ * Allow custom tracing implementations (CASSANDRA-10392)
  * Extract LoaderOptions to be able to be used from outside (CASSANDRA-10637)
  * fix OnDiskIndexTest to properly treat empty ranges (CASSANDRA-11205)
  * fix TrackerTest to handle new notifications (CASSANDRA-11178)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf25e668/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java 
b/src/java/org/apache/cassandra/net/MessageOut.java
index a524e7a..bc5c41b 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -33,10 +33,6 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
-
-import static org.apache.cassandra.tracing.Tracing.TRACE_HEADER;
-import static org.apache.cassandra.tracing.Tracing.TRACE_TYPE;
 import static org.apache.cassandra.tracing.Tracing.isTracing;
 
 public class MessageOut<T>
@@ -61,8 +57,7 @@ public class MessageOut<T>
              payload,
              serializer,
              isTracing()
-                 ? ImmutableMap.of(TRACE_HEADER, 
UUIDGen.decompose(Tracing.instance.getSessionId()),
-                                   TRACE_TYPE, new byte[] { 
Tracing.TraceType.serialize(Tracing.instance.getTraceType()) })
+                 ? Tracing.instance.getTraceHeaders()
                  : Collections.<String, byte[]>emptyMap());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf25e668/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 7b6e26e..8b1ecf3 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -284,7 +284,7 @@ public class OutboundTcpConnection extends Thread
                 {
                     byte[] traceTypeBytes = 
qm.message.parameters.get(Tracing.TRACE_TYPE);
                     Tracing.TraceType traceType = traceTypeBytes == null ? 
Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
-                    
TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), message, -1, 
traceType.getTTL());
+                    Tracing.instance.trace(ByteBuffer.wrap(sessionBytes), 
message, traceType.getTTL());
                 }
                 else
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf25e668/src/java/org/apache/cassandra/service/QueryState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/QueryState.java 
b/src/java/org/apache/cassandra/service/QueryState.java
index ddbc959..c70c692 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -18,6 +18,9 @@
 package org.apache.cassandra.service;
 
 import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -76,14 +79,19 @@ public class QueryState
 
     public void createTracingSession()
     {
+        createTracingSession(Collections.EMPTY_MAP);
+    }
+
+    public void createTracingSession(Map<String,ByteBuffer> customPayload)
+    {
         UUID session = this.preparedTracingSession;
         if (session == null)
         {
-            Tracing.instance.newSession();
+            Tracing.instance.newSession(customPayload);
         }
         else
         {
-            Tracing.instance.newSession(session);
+            Tracing.instance.newSession(session, customPayload);
             this.preparedTracingSession = null;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf25e668/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java 
b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
index 5cc3c21..fbe2c33 100644
--- a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
+++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
@@ -1,5 +1,5 @@
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,33 +7,39 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 package org.apache.cassandra.tracing;
 
-import java.util.UUID;
-
 import org.apache.cassandra.utils.FBUtilities;
 
-public class ExpiredTraceState extends TraceState
+class ExpiredTraceState extends TraceState
 {
-    public ExpiredTraceState(UUID sessionId, Tracing.TraceType traceType)
+    private final TraceState delegate;
+
+    ExpiredTraceState(TraceState delegate)
     {
-        super(FBUtilities.getBroadcastAddress(), sessionId, traceType);
+        super(FBUtilities.getBroadcastAddress(), delegate.sessionId, 
delegate.traceType);
+        this.delegate = delegate;
     }
 
     public int elapsed()
     {
         return -1;
     }
+
+    protected void traceImpl(String message)
+    {
+        delegate.traceImpl(message);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf25e668/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java 
b/src/java/org/apache/cassandra/tracing/TraceState.java
index e882e67..5365d09 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.tracing;
 
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -29,14 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import com.google.common.base.Stopwatch;
 import org.slf4j.helpers.MessageFormatter;
 
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.exceptions.OverloadedException;
-import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.utils.progress.ProgressEvent;
 import org.apache.cassandra.utils.progress.ProgressEventNotifier;
 import org.apache.cassandra.utils.progress.ProgressListener;
@@ -45,7 +37,7 @@ import org.apache.cassandra.utils.progress.ProgressListener;
  * ThreadLocal state for a tracing session. The presence of an instance of 
this class as a ThreadLocal denotes that an
  * operation is being traced.
  */
-public class TraceState implements ProgressEventNotifier
+public abstract class TraceState implements ProgressEventNotifier
 {
     public final UUID sessionId;
     public final InetAddress coordinator;
@@ -71,7 +63,7 @@ public class TraceState implements ProgressEventNotifier
     // See CASSANDRA-7626 for more details.
     private final AtomicInteger references = new AtomicInteger(1);
 
-    public TraceState(InetAddress coordinator, UUID sessionId, 
Tracing.TraceType traceType)
+    protected TraceState(InetAddress coordinator, UUID sessionId, 
Tracing.TraceType traceType)
     {
         assert coordinator != null;
         assert sessionId != null;
@@ -83,7 +75,7 @@ public class TraceState implements ProgressEventNotifier
         this.ttl = traceType.getTTL();
         watch = Stopwatch.createStarted();
         this.status = Status.IDLE;
-}
+    }
 
     /**
      * Activate notification with provided {@code tag} name.
@@ -151,7 +143,7 @@ public class TraceState implements ProgressEventNotifier
         return status;
     }
 
-    private synchronized void notifyActivity()
+    protected synchronized void notifyActivity()
     {
         status = Status.ACTIVE;
         notifyAll();
@@ -177,10 +169,7 @@ public class TraceState implements ProgressEventNotifier
         if (notify)
             notifyActivity();
 
-        final String threadName = Thread.currentThread().getName();
-        final int elapsed = elapsed();
-
-        executeMutation(TraceKeyspace.makeEventMutation(sessionIdBytes, 
message, elapsed, threadName, ttl));
+        traceImpl(message);
 
         for (ProgressListener listener : listeners)
         {
@@ -188,45 +177,7 @@ public class TraceState implements ProgressEventNotifier
         }
     }
 
-    static void executeMutation(final Mutation mutation)
-    {
-        StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
-        {
-            protected void runMayThrow() throws Exception
-            {
-            mutateWithCatch(mutation);
-            }
-        });
-    }
-
-    /**
-     * Called from {@link org.apache.cassandra.net.OutboundTcpConnection} for 
non-local traces (traces
-     * that are not initiated by local node == coordinator).
-     */
-    public static void mutateWithTracing(final ByteBuffer sessionId, final 
String message, final int elapsed, final int ttl)
-    {
-        final String threadName = Thread.currentThread().getName();
-
-        StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
-        {
-            public void runMayThrow()
-            {
-                mutateWithCatch(TraceKeyspace.makeEventMutation(sessionId, 
message, elapsed, threadName, ttl));
-            }
-        });
-    }
-
-    static void mutateWithCatch(Mutation mutation)
-    {
-        try
-        {
-            StorageProxy.mutate(Collections.singletonList(mutation), 
ConsistencyLevel.ANY);
-        }
-        catch (OverloadedException e)
-        {
-            Tracing.logger.warn("Too many nodes are overloaded to save trace 
events");
-        }
-    }
+    protected abstract void traceImpl(String message);
 
     public boolean acquireReference()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf25e668/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java 
b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
new file mode 100644
index 0000000..1bc210a
--- /dev/null
+++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tracing;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+/**
+ * ThreadLocal state for a tracing session. The presence of an instance of 
this class as a ThreadLocal denotes that an
+ * operation is being traced.
+ */
+public class TraceStateImpl extends TraceState
+{
+    public TraceStateImpl(InetAddress coordinator, UUID sessionId, 
Tracing.TraceType traceType)
+    {
+        super(coordinator, sessionId, traceType);
+    }
+
+    protected void traceImpl(String message)
+    {
+        final String threadName = Thread.currentThread().getName();
+        final int elapsed = elapsed();
+
+        executeMutation(TraceKeyspace.makeEventMutation(sessionIdBytes, 
message, elapsed, threadName, ttl));
+    }
+
+    static void executeMutation(final Mutation mutation)
+    {
+        StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
+        {
+            protected void runMayThrow() throws Exception
+            {
+                mutateWithCatch(mutation);
+            }
+        });
+    }
+
+    static void mutateWithCatch(Mutation mutation)
+    {
+        try
+        {
+            StorageProxy.mutate(Collections.singletonList(mutation), 
ConsistencyLevel.ANY);
+        }
+        catch (OverloadedException e)
+        {
+            Tracing.logger.warn("Too many nodes are overloaded to save trace 
events");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf25e668/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java 
b/src/java/org/apache/cassandra/tracing/Tracing.java
index bf9cee7..e69645f 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -21,11 +21,13 @@ package org.apache.cassandra.tracing;
 
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,14 +37,15 @@ import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.UUIDGen;
 
 
 /**
  * A trace session context. Able to track and store trace sessions. A session 
is usually a user initiated query, and may
- * have multiple local and remote events before it is completed. All events 
and sessions are stored at keyspace.
+ * have multiple local and remote events before it is completed.
  */
-public class Tracing implements ExecutorLocal<TraceState>
+public abstract class Tracing implements ExecutorLocal<TraceState>
 {
     public static final String TRACE_HEADER = "TraceSession";
     public static final String TRACE_TYPE = "TraceType";
@@ -77,15 +80,34 @@ public class Tracing implements ExecutorLocal<TraceState>
         }
     }
 
-    static final Logger logger = LoggerFactory.getLogger(Tracing.class);
+    protected static final Logger logger = 
LoggerFactory.getLogger(Tracing.class);
 
     private final InetAddress localAddress = FBUtilities.getLocalAddress();
 
     private final ThreadLocal<TraceState> state = new ThreadLocal<>();
 
-    private final ConcurrentMap<UUID, TraceState> sessions = new 
ConcurrentHashMap<>();
+    protected final ConcurrentMap<UUID, TraceState> sessions = new 
ConcurrentHashMap<>();
 
-    public static final Tracing instance = new Tracing();
+    public static final Tracing instance;
+
+    static {
+        Tracing tracing = null;
+        String customTracingClass = 
System.getProperty("cassandra.custom_tracing_class");
+        if (null != customTracingClass)
+        {
+            try
+            {
+                tracing = FBUtilities.construct(customTracingClass, "Tracing");
+                logger.info("Using {} as tracing queries (as requested with 
-Dcassandra.custom_tracing_class)", customTracingClass);
+            }
+            catch (Exception e)
+            {
+                JVMStabilityInspector.inspectThrowable(e);
+                logger.error("Cannot use class {} for tracing ({}), ignoring 
by defaulting on normal tracing", customTracingClass, e.getMessage());
+            }
+        }
+        instance = null != tracing ? tracing : new TracingImpl();
+    }
 
     public UUID getSessionId()
     {
@@ -110,30 +132,33 @@ public class Tracing implements ExecutorLocal<TraceState>
      */
     public static boolean isTracing()
     {
-        return instance.state.get() != null;
+        return instance.get() != null;
     }
 
-    public UUID newSession()
+    public UUID newSession(Map<String,ByteBuffer> customPayload)
     {
         return newSession(TraceType.QUERY);
     }
 
     public UUID newSession(TraceType traceType)
     {
-        return 
newSession(TimeUUIDType.instance.compose(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())),
 traceType);
+        return newSession(
+                
TimeUUIDType.instance.compose(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())),
+                traceType,
+                Collections.EMPTY_MAP);
     }
 
-    public UUID newSession(UUID sessionId)
+    public UUID newSession(UUID sessionId, Map<String,ByteBuffer> 
customPayload)
     {
-        return newSession(sessionId, TraceType.QUERY);
+        return newSession(sessionId, TraceType.QUERY, Collections.EMPTY_MAP);
     }
 
-    private UUID newSession(UUID sessionId, TraceType traceType)
+    protected UUID newSession(UUID sessionId, TraceType traceType, 
Map<String,ByteBuffer> customPayload)
     {
-        assert state.get() == null;
+        assert get() == null;
 
-        TraceState ts = new TraceState(localAddress, sessionId, traceType);
-        state.set(ts);
+        TraceState ts = newTraceState(localAddress, sessionId, traceType);
+        set(ts);
         sessions.put(sessionId, ts);
 
         return sessionId;
@@ -145,30 +170,29 @@ public class Tracing implements ExecutorLocal<TraceState>
             sessions.remove(state.sessionId);
     }
 
+
     /**
      * Stop the session and record its complete.  Called by coodinator when 
request is complete.
      */
     public void stopSession()
     {
-        TraceState state = this.state.get();
+        TraceState state = get();
         if (state == null) // inline isTracing to avoid implicit two calls to 
state.get()
         {
             logger.trace("request complete");
         }
         else
         {
-            final int elapsed = state.elapsed();
-            final ByteBuffer sessionId = state.sessionIdBytes;
-            final int ttl = state.ttl;
-
-            
TraceState.executeMutation(TraceKeyspace.makeStopSessionMutation(sessionId, 
elapsed, ttl));
+            stopSessionImpl();
 
             state.stop();
             sessions.remove(state.sessionId);
-            this.state.set(null);
+            set(null);
         }
     }
 
+    protected abstract void stopSessionImpl();
+
     public TraceState get()
     {
         return state.get();
@@ -189,24 +213,11 @@ public class Tracing implements ExecutorLocal<TraceState>
         return begin(request, null, parameters);
     }
 
-    public TraceState begin(final String request, final InetAddress client, 
final Map<String, String> parameters)
-    {
-        assert isTracing();
-
-        final TraceState state = this.state.get();
-        final long startedAt = System.currentTimeMillis();
-        final ByteBuffer sessionId = state.sessionIdBytes;
-        final String command = state.traceType.toString();
-        final int ttl = state.ttl;
-
-        
TraceState.executeMutation(TraceKeyspace.makeStartSessionMutation(sessionId, 
client, parameters, request, startedAt, command, ttl));
-
-        return state;
-    }
+    public abstract TraceState begin(String request, InetAddress client, 
Map<String, String> parameters);
 
     /**
      * Determines the tracing context from a message.  Does NOT set the 
threadlocal state.
-     * 
+     *
      * @param message The internode message
      */
     public TraceState initializeFromMessage(final MessageIn<?> message)
@@ -218,7 +229,7 @@ public class Tracing implements ExecutorLocal<TraceState>
 
         assert sessionBytes.length == 16;
         UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
-        TraceState ts = sessions.get(sessionId);
+        TraceState ts = get(sessionId);
         if (ts != null && ts.acquireReference())
             return ts;
 
@@ -230,16 +241,26 @@ public class Tracing implements ExecutorLocal<TraceState>
         if (message.verb == MessagingService.Verb.REQUEST_RESPONSE)
         {
             // received a message for a session we've already closed out.  see 
CASSANDRA-5668
-            return new ExpiredTraceState(sessionId, traceType);
+            return new ExpiredTraceState(newTraceState(message.from, 
sessionId, traceType));
         }
         else
         {
-            ts = new TraceState(message.from, sessionId, traceType);
+            ts = newTraceState(message.from, sessionId, traceType);
             sessions.put(sessionId, ts);
             return ts;
         }
     }
 
+    public Map<String, byte[]> getTraceHeaders()
+    {
+        assert isTracing();
+
+        return ImmutableMap.of(
+                TRACE_HEADER, 
UUIDGen.decompose(Tracing.instance.getSessionId()),
+                TRACE_TYPE, new byte[] { 
Tracing.TraceType.serialize(Tracing.instance.getTraceType()) });
+    }
+
+    protected abstract TraceState newTraceState(InetAddress coordinator, UUID 
sessionId, Tracing.TraceType traceType);
 
     // repair just gets a varargs method since it's so heavyweight anyway
     public static void traceRepair(String format, Object... args)
@@ -287,4 +308,10 @@ public class Tracing implements ExecutorLocal<TraceState>
 
         state.trace(format, args);
     }
+
+    /**
+     * Called from {@link org.apache.cassandra.net.OutboundTcpConnection} for 
non-local traces (traces
+     * that are not initiated by local node == coordinator).
+     */
+    public abstract void trace(ByteBuffer sessionId, String message, int ttl);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf25e668/src/java/org/apache/cassandra/tracing/TracingImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TracingImpl.java 
b/src/java/org/apache/cassandra/tracing/TracingImpl.java
new file mode 100644
index 0000000..52ac183
--- /dev/null
+++ b/src/java/org/apache/cassandra/tracing/TracingImpl.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.tracing;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+/**
+ * A trace session context. Able to track and store trace sessions. A session 
is usually a user initiated query, and may
+ * have multiple local and remote events before it is completed. All events 
and sessions are stored at keyspace.
+ */
+class TracingImpl extends Tracing
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(TracingImpl.class);
+
+    public void stopSessionImpl() {
+        TraceState state = get();
+        int elapsed = state.elapsed();
+        ByteBuffer sessionId = state.sessionIdBytes;
+        int ttl = state.ttl;
+        
TraceStateImpl.executeMutation(TraceKeyspace.makeStopSessionMutation(sessionId, 
elapsed, ttl));
+    }
+
+    public TraceState begin(final String request, final InetAddress client, 
final Map<String, String> parameters)
+    {
+        assert isTracing();
+
+        final TraceState state = get();
+        final long startedAt = System.currentTimeMillis();
+        final ByteBuffer sessionId = state.sessionIdBytes;
+        final String command = state.traceType.toString();
+        final int ttl = state.ttl;
+
+        
TraceStateImpl.executeMutation(TraceKeyspace.makeStartSessionMutation(sessionId,
 client, parameters, request, startedAt, command, ttl));
+
+        return state;
+    }
+
+    @Override
+    protected TraceState newTraceState(InetAddress coordinator, UUID 
sessionId, TraceType traceType)
+    {
+        return new TraceStateImpl(coordinator, sessionId, traceType);
+    }
+
+    /**
+     * Called from {@link org.apache.cassandra.net.OutboundTcpConnection} for 
non-local traces (traces
+     * that are not initiated by local node == coordinator).
+     */
+    public void trace(final ByteBuffer sessionId, final String message, final 
int ttl)
+    {
+        final String threadName = Thread.currentThread().getName();
+
+        StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
+        {
+            public void runMayThrow()
+            {
+                
TraceStateImpl.mutateWithCatch(TraceKeyspace.makeEventMutation(sessionId, 
message, -1, threadName, ttl));
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf25e668/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index e9923b4..df6acc0 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -110,7 +110,7 @@ public class ExecuteMessage extends Message.Request
 
             if (state.traceNextQuery())
             {
-                state.createTracingSession();
+                state.createTracingSession(getCustomPayload());
 
                 ImmutableMap.Builder<String, String> builder = 
ImmutableMap.builder();
                 if (options.getPageSize() > 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf25e668/test/unit/org/apache/cassandra/tracing/TracingTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tracing/TracingTest.java 
b/test/unit/org/apache/cassandra/tracing/TracingTest.java
new file mode 100644
index 0000000..1b0e507
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tracing/TracingTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tracing;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.Test;
+
+import org.apache.cassandra.utils.progress.ProgressEvent;
+import org.apache.cassandra.utils.progress.ProgressListener;
+
+public final class TracingTest
+{
+
+    @Test
+    public void test()
+    {
+        List<String> traces = new ArrayList<>();
+        Tracing tracing = new TracingImpl(traces);
+        tracing.newSession(Tracing.TraceType.NONE);
+        TraceState state = tracing.begin("test-request", 
Collections.<String,String>emptyMap());
+        state.trace("test-1");
+        state.trace("test-2");
+        state.trace("test-3");
+        tracing.stopSession();
+
+        assert null == tracing.get();
+        assert 4 == traces.size();
+        assert "test-request".equals(traces.get(0));
+        assert "test-1".equals(traces.get(1));
+        assert "test-2".equals(traces.get(2));
+        assert "test-3".equals(traces.get(3));
+    }
+
+    @Test
+    public void test_get()
+    {
+        List<String> traces = new ArrayList<>();
+        Tracing tracing = new TracingImpl(traces);
+        tracing.newSession(Tracing.TraceType.NONE);
+        tracing.begin("test-request", Collections.<String,String>emptyMap());
+        tracing.get().trace("test-1");
+        tracing.get().trace("test-2");
+        tracing.get().trace("test-3");
+        tracing.stopSession();
+
+        assert null == tracing.get();
+        assert 4 == traces.size();
+        assert "test-request".equals(traces.get(0));
+        assert "test-1".equals(traces.get(1));
+        assert "test-2".equals(traces.get(2));
+        assert "test-3".equals(traces.get(3));
+    }
+
+    @Test
+    public void test_get_uuid()
+    {
+        List<String> traces = new ArrayList<>();
+        Tracing tracing = new TracingImpl(traces);
+        UUID uuid = tracing.newSession(Tracing.TraceType.NONE);
+        tracing.begin("test-request", Collections.<String,String>emptyMap());
+        tracing.get(uuid).trace("test-1");
+        tracing.get(uuid).trace("test-2");
+        tracing.get(uuid).trace("test-3");
+        tracing.stopSession();
+
+        assert null == tracing.get();
+        assert 4 == traces.size();
+        assert "test-request".equals(traces.get(0));
+        assert "test-1".equals(traces.get(1));
+        assert "test-2".equals(traces.get(2));
+        assert "test-3".equals(traces.get(3));
+    }
+
+    @Test
+    public void test_states()
+    {
+        List<String> traces = new ArrayList<>();
+        Tracing tracing = new TracingImpl(traces);
+        tracing.newSession(Tracing.TraceType.REPAIR);
+        tracing.begin("test-request", Collections.<String,String>emptyMap());
+        tracing.get().enableActivityNotification("test-tag");
+        assert TraceState.Status.IDLE == tracing.get().waitActivity(1);
+        tracing.get().trace("test-1");
+        assert TraceState.Status.ACTIVE == tracing.get().waitActivity(1);
+        tracing.get().stop();
+        assert TraceState.Status.STOPPED == tracing.get().waitActivity(1);
+        tracing.stopSession();
+        assert null == tracing.get();
+    }
+
+    @Test
+    public void test_progress_listener()
+    {
+        List<String> traces = new ArrayList<>();
+        Tracing tracing = new TracingImpl(traces);
+        tracing.newSession(Tracing.TraceType.REPAIR);
+        tracing.begin("test-request", Collections.<String,String>emptyMap());
+        tracing.get().enableActivityNotification("test-tag");
+
+        tracing.get().addProgressListener(
+                new ProgressListener()
+                {
+                    public void progress(String tag, ProgressEvent pe)
+                    {
+                        assert "test-tag".equals(tag);
+                        assert "test-trace".equals(pe.getMessage());
+                    }
+                });
+
+        tracing.get().trace("test-trace");
+        tracing.stopSession();
+        assert null == tracing.get();
+    }
+
+    private class TracingImpl extends Tracing
+    {
+        private final List<String> traces;
+
+        public TracingImpl(List<String> traces)
+        {
+            this.traces = traces;
+        }
+
+        public void stopSessionImpl()
+        {}
+
+        public TraceState begin(String request, InetAddress ia, Map<String, 
String> map)
+        {
+            traces.add(request);
+            return get();
+        }
+
+        protected TraceState newTraceState(InetAddress ia, UUID uuid, 
Tracing.TraceType tt)
+        {
+            return new TraceState(ia, uuid, tt)
+            {
+                protected void traceImpl(String string)
+                {
+                    traces.add(string);
+                }
+
+            };
+        }
+
+        public void trace(ByteBuffer bb, String string, int i)
+        {
+            throw new UnsupportedOperationException("Not supported yet.");
+        }
+    }
+}

Reply via email to