Wait for tracing events before returning response and query at same consistency level client side
patch by Stefania Alborghetti; reviewed by Paulo Motta for CASSANDRA-11465 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7bd65a12 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7bd65a12 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7bd65a12 Branch: refs/heads/cassandra-3.0 Commit: 7bd65a129c63091d6885f92afe77a41c4fc46a6f Parents: 04ef62c Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Thu Jul 14 14:15:39 2016 +0800 Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com> Committed: Fri Jul 29 15:28:03 2016 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + bin/cqlsh.py | 2 +- .../cassandra/concurrent/StageManager.java | 18 ++++++++- .../apache/cassandra/tracing/TraceState.java | 41 +++++++++++++++++++- 4 files changed, 58 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7bd65a12/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ebebbdc..9a16ee3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.8 + * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465) * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979) * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214) * Synchronize ThriftServer::stop() (CASSANDRA-12105) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7bd65a12/bin/cqlsh.py ---------------------------------------------------------------------- diff --git a/bin/cqlsh.py b/bin/cqlsh.py index a3eacdd..b631450 100644 --- a/bin/cqlsh.py +++ b/bin/cqlsh.py @@ -1211,7 +1211,7 @@ class Shell(cmd.Cmd): if self.tracing_enabled: try: - for trace in future.get_all_query_traces(self.max_trace_wait): + for trace in future.get_all_query_traces(max_wait_per=self.max_trace_wait, query_cl=self.consistency_level): print_trace(self, trace) except TraceUnavailable: msg = "Statement trace did not complete within %d seconds; trace data may be incomplete." % (self.session.max_trace_wait,) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7bd65a12/src/java/org/apache/cassandra/concurrent/StageManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java index 114795e..343648c 100644 --- a/src/java/org/apache/cassandra/concurrent/StageManager.java +++ b/src/java/org/apache/cassandra/concurrent/StageManager.java @@ -24,7 +24,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.config.DatabaseDescriptor.*; @@ -112,9 +111,19 @@ public class StageManager } } + public final static Runnable NO_OP_TASK = new Runnable() + { + public void run() + { + + } + }; + /** * A TPE that disallows submit so that we don't need to worry about unwrapping exceptions on the - * tracing stage. See CASSANDRA-1123 for background. + * tracing stage. See CASSANDRA-1123 for background. We allow submitting NO_OP tasks, to allow + * a final wait on pending trace events since typically the tracing executor is single-threaded, see + * CASSANDRA-11465. */ private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements LocalAwareExecutorService { @@ -137,6 +146,11 @@ public class StageManager @Override public Future<?> submit(Runnable task) { + if (task.equals(NO_OP_TASK)) + { + assert getMaximumPoolSize() == 1 : "Cannot wait for pending tasks if running more than 1 thread"; + return super.submit(task); + } throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7bd65a12/src/java/org/apache/cassandra/tracing/TraceState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java index e882e67..03e510f 100644 --- a/src/java/org/apache/cassandra/tracing/TraceState.java +++ b/src/java/org/apache/cassandra/tracing/TraceState.java @@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Stopwatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.slf4j.helpers.MessageFormatter; import org.apache.cassandra.concurrent.Stage; @@ -36,6 +38,7 @@ import org.apache.cassandra.db.Mutation; import org.apache.cassandra.exceptions.OverloadedException; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.WrappedRunnable; import org.apache.cassandra.utils.progress.ProgressEvent; import org.apache.cassandra.utils.progress.ProgressEventNotifier; @@ -47,6 +50,10 @@ import org.apache.cassandra.utils.progress.ProgressListener; */ public class TraceState implements ProgressEventNotifier { + private static final Logger logger = LoggerFactory.getLogger(TraceState.class); + private static final int WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS = + Integer.valueOf(System.getProperty("cassandra.wait_for_tracing_events_timeout_secs", "1")); + public final UUID sessionId; public final InetAddress coordinator; public final Stopwatch watch; @@ -119,6 +126,8 @@ public class TraceState implements ProgressEventNotifier public synchronized void stop() { + waitForPendingEvents(); + status = Status.STOPPED; notifyAll(); } @@ -181,6 +190,8 @@ public class TraceState implements ProgressEventNotifier final int elapsed = elapsed(); executeMutation(TraceKeyspace.makeEventMutation(sessionIdBytes, message, elapsed, threadName, ttl)); + if (logger.isTraceEnabled()) + logger.trace("Adding <{}> to trace events", message); for (ProgressListener listener : listeners) { @@ -194,7 +205,7 @@ public class TraceState implements ProgressEventNotifier { protected void runMayThrow() throws Exception { - mutateWithCatch(mutation); + mutateWithCatch(mutation); } }); } @@ -228,6 +239,33 @@ public class TraceState implements ProgressEventNotifier } } + /** + * Post a no-op event to the TRACING stage, so that we can be sure that any previous mutations + * have at least been applied to one replica. This works because the tracking executor only + * has one thread in its pool, see {@link StageManager#tracingExecutor()}. + */ + protected void waitForPendingEvents() + { + if (WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS <= 0) + return; + + try + { + if (logger.isTraceEnabled()) + logger.trace("Waiting for up to {} seconds for trace events to complete", + +WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS); + + StageManager.getStage(Stage.TRACING).submit(StageManager.NO_OP_TASK) + .get(WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, TimeUnit.SECONDS); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + logger.debug("Failed to wait for tracing events to complete: {}", t); + } + } + + public boolean acquireReference() { while (true) @@ -242,6 +280,7 @@ public class TraceState implements ProgressEventNotifier public int releaseReference() { + waitForPendingEvents(); return references.decrementAndGet(); } }