Wait for tracing events before returning response and query at same consistency 
level client side

patch by Stefania Alborghetti; reviewed by Paulo Motta for CASSANDRA-11465


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7bd65a12
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7bd65a12
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7bd65a12

Branch: refs/heads/cassandra-3.0
Commit: 7bd65a129c63091d6885f92afe77a41c4fc46a6f
Parents: 04ef62c
Author: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Authored: Thu Jul 14 14:15:39 2016 +0800
Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Committed: Fri Jul 29 15:28:03 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 bin/cqlsh.py                                    |  2 +-
 .../cassandra/concurrent/StageManager.java      | 18 ++++++++-
 .../apache/cassandra/tracing/TraceState.java    | 41 +++++++++++++++++++-
 4 files changed, 58 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7bd65a12/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ebebbdc..9a16ee3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.8
+ * Wait for tracing events before returning response and query at same 
consistency level client side (CASSANDRA-11465)
  * cqlsh copyutil should get host metadata by connected address 
(CASSANDRA-11979)
  * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)
  * Synchronize ThriftServer::stop() (CASSANDRA-12105)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7bd65a12/bin/cqlsh.py
----------------------------------------------------------------------
diff --git a/bin/cqlsh.py b/bin/cqlsh.py
index a3eacdd..b631450 100644
--- a/bin/cqlsh.py
+++ b/bin/cqlsh.py
@@ -1211,7 +1211,7 @@ class Shell(cmd.Cmd):
 
             if self.tracing_enabled:
                 try:
-                    for trace in 
future.get_all_query_traces(self.max_trace_wait):
+                    for trace in 
future.get_all_query_traces(max_wait_per=self.max_trace_wait, 
query_cl=self.consistency_level):
                         print_trace(self, trace)
                 except TraceUnavailable:
                     msg = "Statement trace did not complete within %d seconds; 
trace data may be incomplete." % (self.session.max_trace_wait,)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7bd65a12/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java 
b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 114795e..343648c 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -24,7 +24,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.config.DatabaseDescriptor.*;
@@ -112,9 +111,19 @@ public class StageManager
         }
     }
 
+    public final static Runnable NO_OP_TASK = new Runnable()
+    {
+        public void run()
+        {
+
+        }
+    };
+
     /**
      * A TPE that disallows submit so that we don't need to worry about 
unwrapping exceptions on the
-     * tracing stage.  See CASSANDRA-1123 for background.
+     * tracing stage.  See CASSANDRA-1123 for background. We allow submitting 
NO_OP tasks, to allow
+     * a final wait on pending trace events since typically the tracing 
executor is single-threaded, see
+     * CASSANDRA-11465.
      */
     private static class ExecuteOnlyExecutor extends ThreadPoolExecutor 
implements LocalAwareExecutorService
     {
@@ -137,6 +146,11 @@ public class StageManager
         @Override
         public Future<?> submit(Runnable task)
         {
+            if (task.equals(NO_OP_TASK))
+            {
+                assert getMaximumPoolSize() == 1 : "Cannot wait for pending 
tasks if running more than 1 thread";
+                return super.submit(task);
+            }
             throw new UnsupportedOperationException();
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7bd65a12/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java 
b/src/java/org/apache/cassandra/tracing/TraceState.java
index e882e67..03e510f 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Stopwatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.slf4j.helpers.MessageFormatter;
 
 import org.apache.cassandra.concurrent.Stage;
@@ -36,6 +38,7 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.exceptions.OverloadedException;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.utils.progress.ProgressEvent;
 import org.apache.cassandra.utils.progress.ProgressEventNotifier;
@@ -47,6 +50,10 @@ import org.apache.cassandra.utils.progress.ProgressListener;
  */
 public class TraceState implements ProgressEventNotifier
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(TraceState.class);
+    private static final int WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS =
+    
Integer.valueOf(System.getProperty("cassandra.wait_for_tracing_events_timeout_secs",
 "1"));
+
     public final UUID sessionId;
     public final InetAddress coordinator;
     public final Stopwatch watch;
@@ -119,6 +126,8 @@ public class TraceState implements ProgressEventNotifier
 
     public synchronized void stop()
     {
+        waitForPendingEvents();
+
         status = Status.STOPPED;
         notifyAll();
     }
@@ -181,6 +190,8 @@ public class TraceState implements ProgressEventNotifier
         final int elapsed = elapsed();
 
         executeMutation(TraceKeyspace.makeEventMutation(sessionIdBytes, 
message, elapsed, threadName, ttl));
+        if (logger.isTraceEnabled())
+            logger.trace("Adding <{}> to trace events", message);
 
         for (ProgressListener listener : listeners)
         {
@@ -194,7 +205,7 @@ public class TraceState implements ProgressEventNotifier
         {
             protected void runMayThrow() throws Exception
             {
-            mutateWithCatch(mutation);
+                mutateWithCatch(mutation);
             }
         });
     }
@@ -228,6 +239,33 @@ public class TraceState implements ProgressEventNotifier
         }
     }
 
+    /**
+     * Post a no-op event to the TRACING stage, so that we can be sure that 
any previous mutations
+     * have at least been applied to one replica. This works because the 
tracking executor only
+     * has one thread in its pool, see {@link StageManager#tracingExecutor()}.
+     */
+    protected void waitForPendingEvents()
+    {
+        if (WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS <= 0)
+            return;
+
+        try
+        {
+            if (logger.isTraceEnabled())
+                logger.trace("Waiting for up to {} seconds for trace events to 
complete",
+                             +WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS);
+
+            
StageManager.getStage(Stage.TRACING).submit(StageManager.NO_OP_TASK)
+                        .get(WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, 
TimeUnit.SECONDS);
+        }
+        catch (Throwable t)
+        {
+            JVMStabilityInspector.inspectThrowable(t);
+            logger.debug("Failed to wait for tracing events to complete: {}", 
t);
+        }
+    }
+
+
     public boolean acquireReference()
     {
         while (true)
@@ -242,6 +280,7 @@ public class TraceState implements ProgressEventNotifier
 
     public int releaseReference()
     {
+        waitForPendingEvents();
         return references.decrementAndGet();
     }
 }

Reply via email to