Merge branch 'cassandra-3.0' into cassandra-3.9
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8e775eaa Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8e775eaa Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8e775eaa Branch: refs/heads/cassandra-3.9 Commit: 8e775eaaf5ee6b355e050ea15f2e334eff680487 Parents: eebadcf 8868b74 Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Fri Jul 29 15:29:51 2016 +0800 Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com> Committed: Fri Jul 29 15:30:27 2016 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + bin/cqlsh.py | 2 +- .../cassandra/concurrent/StageManager.java | 12 +++++-- .../cassandra/tracing/ExpiredTraceState.java | 5 +++ .../apache/cassandra/tracing/TraceState.java | 5 +++ .../cassandra/tracing/TraceStateImpl.java | 38 ++++++++++++++++++++ .../apache/cassandra/tracing/TracingTest.java | 3 ++ 7 files changed, 63 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e775eaa/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index d6a9d16,bb67fc9..ca5b383 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -13,42 -10,6 +13,43 @@@ Merged from 3.0 to connect with too low of a protocol version (CASSANDRA-11464) * NullPointerExpception when reading/compacting table (CASSANDRA-11988) * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144) +Merged from 2.2: ++ * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465) + * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979) + * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214) +Merged from 2.1: + * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850) + + +3.8 + * Fix hdr logging for single operation workloads (CASSANDRA-12145) + * Fix SASI PREFIX search in CONTAINS mode with partial terms (CASSANDRA-12073) + * Increase size of flushExecutor thread pool (CASSANDRA-12071) + * Partial revert of CASSANDRA-11971, cannot recycle buffer in SP.sendMessagesToNonlocalDC (CASSANDRA-11950) + * Upgrade netty to 4.0.39 (CASSANDRA-12032, CASSANDRA-12034) + * Improve details in compaction log message (CASSANDRA-12080) + * Allow unset values in CQLSSTableWriter (CASSANDRA-11911) + * Chunk cache to request compressor-compatible buffers if pool space is exhausted (CASSANDRA-11993) + * Remove DatabaseDescriptor dependencies from SequentialWriter (CASSANDRA-11579) + * Move skip_stop_words filter before stemming (CASSANDRA-12078) + * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957) + * SSTable tools mishandling LocalPartitioner (CASSANDRA-12002) + * When SEPWorker assigned work, set thread name to match pool (CASSANDRA-11966) + * Add cross-DC latency metrics (CASSANDRA-11596) + * Allow terms in selection clause (CASSANDRA-10783) + * Add bind variables to trace (CASSANDRA-11719) + * Switch counter shards' clock to timestamps (CASSANDRA-9811) + * Introduce HdrHistogram and response/service/wait separation to stress tool (CASSANDRA-11853) + * entry-weighers in QueryProcessor should respect partitionKeyBindIndexes field (CASSANDRA-11718) + * Support older ant versions (CASSANDRA-11807) + * Estimate compressed on disk size when deciding if sstable size limit reached (CASSANDRA-11623) + * cassandra-stress profiles should support case sensitive schemas (CASSANDRA-11546) + * Remove DatabaseDescriptor dependency from FileUtils (CASSANDRA-11578) + * Faster streaming (CASSANDRA-9766) + * Add prepared query parameter to trace for "Execute CQL3 prepared query" session (CASSANDRA-11425) + * Add repaired percentage metric (CASSANDRA-11503) + * Add Change-Data-Capture (CASSANDRA-8844) +Merged from 3.0: * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107) * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393) * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e775eaa/bin/cqlsh.py ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e775eaa/src/java/org/apache/cassandra/concurrent/StageManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/concurrent/StageManager.java index ebc4f76,a201e78..64abf00 --- a/src/java/org/apache/cassandra/concurrent/StageManager.java +++ b/src/java/org/apache/cassandra/concurrent/StageManager.java @@@ -113,9 -112,19 +112,13 @@@ public class StageManage } } - public final static Runnable NO_OP_TASK = new Runnable() - { - public void run() - { - - } - }; ++ public final static Runnable NO_OP_TASK = () -> {}; + /** * A TPE that disallows submit so that we don't need to worry about unwrapping exceptions on the - * tracing stage. See CASSANDRA-1123 for background. + * tracing stage. See CASSANDRA-1123 for background. We allow submitting NO_OP tasks, to allow + * a final wait on pending trace events since typically the tracing executor is single-threaded, see + * CASSANDRA-11465. */ private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements LocalAwareExecutorService { http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e775eaa/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tracing/ExpiredTraceState.java index fbe2c33,5cc3c21..bc8d5dd --- a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java +++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java @@@ -37,9 -36,4 +37,14 @@@ class ExpiredTraceState extends TraceSt { return -1; } + + protected void traceImpl(String message) + { + delegate.traceImpl(message); + } ++ ++ protected void waitForPendingEvents() ++ { ++ delegate.waitForPendingEvents(); ++ } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e775eaa/src/java/org/apache/cassandra/tracing/TraceState.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tracing/TraceState.java index 5365d09,03e510f..ec2bc9e --- a/src/java/org/apache/cassandra/tracing/TraceState.java +++ b/src/java/org/apache/cassandra/tracing/TraceState.java @@@ -177,8 -199,73 +179,10 @@@ public abstract class TraceState implem } } - static void executeMutation(final Mutation mutation) - { - StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() - { - protected void runMayThrow() throws Exception - { - mutateWithCatch(mutation); - } - }); - } - - /** - * Called from {@link org.apache.cassandra.net.OutboundTcpConnection} for non-local traces (traces - * that are not initiated by local node == coordinator). - */ - public static void mutateWithTracing(final ByteBuffer sessionId, final String message, final int elapsed, final int ttl) - { - final String threadName = Thread.currentThread().getName(); - - StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() - { - public void runMayThrow() - { - mutateWithCatch(TraceKeyspace.makeEventMutation(sessionId, message, elapsed, threadName, ttl)); - } - }); - } - - static void mutateWithCatch(Mutation mutation) - { - try - { - StorageProxy.mutate(Collections.singletonList(mutation), ConsistencyLevel.ANY); - } - catch (OverloadedException e) - { - Tracing.logger.warn("Too many nodes are overloaded to save trace events"); - } - } - - /** - * Post a no-op event to the TRACING stage, so that we can be sure that any previous mutations - * have at least been applied to one replica. This works because the tracking executor only - * has one thread in its pool, see {@link StageManager#tracingExecutor()}. - */ - protected void waitForPendingEvents() - { - if (WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS <= 0) - return; - - try - { - if (logger.isTraceEnabled()) - logger.trace("Waiting for up to {} seconds for trace events to complete", - +WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS); - - StageManager.getStage(Stage.TRACING).submit(StageManager.NO_OP_TASK) - .get(WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, TimeUnit.SECONDS); - } - catch (Throwable t) - { - JVMStabilityInspector.inspectThrowable(t); - logger.debug("Failed to wait for tracing events to complete: {}", t); - } - } + protected abstract void traceImpl(String message); ++ protected abstract void waitForPendingEvents(); + public boolean acquireReference() { while (true) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e775eaa/src/java/org/apache/cassandra/tracing/TraceStateImpl.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tracing/TraceStateImpl.java index 113ebb7,0000000..e2d3a68 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java +++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java @@@ -1,74 -1,0 +1,112 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tracing; + +import java.net.InetAddress; +import java.util.Collections; +import java.util.UUID; ++import java.util.concurrent.ExecutionException; ++import java.util.concurrent.TimeUnit; ++ ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.exceptions.OverloadedException; +import org.apache.cassandra.service.StorageProxy; ++import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.WrappedRunnable; + +/** + * ThreadLocal state for a tracing session. The presence of an instance of this class as a ThreadLocal denotes that an + * operation is being traced. + */ +public class TraceStateImpl extends TraceState +{ ++ private static final Logger logger = LoggerFactory.getLogger(TraceStateImpl.class); ++ private static final int WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS = ++ Integer.valueOf(System.getProperty("cassandra.wait_for_tracing_events_timeout_secs", "1")); ++ + public TraceStateImpl(InetAddress coordinator, UUID sessionId, Tracing.TraceType traceType) + { + super(coordinator, sessionId, traceType); + } + + protected void traceImpl(String message) + { + final String threadName = Thread.currentThread().getName(); + final int elapsed = elapsed(); + + executeMutation(TraceKeyspace.makeEventMutation(sessionIdBytes, message, elapsed, threadName, ttl)); ++ if (logger.isTraceEnabled()) ++ logger.trace("Adding <{}> to trace events", message); ++ } ++ ++ /** ++ * Post a no-op event to the TRACING stage, so that we can be sure that any previous mutations ++ * have at least been applied to one replica. This works because the tracking executor only ++ * has one thread in its pool, see {@link StageManager#tracingExecutor()}. ++ */ ++ protected void waitForPendingEvents() ++ { ++ if (WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS <= 0) ++ return; ++ ++ try ++ { ++ if (logger.isTraceEnabled()) ++ logger.trace("Waiting for up to {} seconds for trace events to complete", ++ WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS); ++ ++ StageManager.getStage(Stage.TRACING).submit(StageManager.NO_OP_TASK) ++ .get(WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, TimeUnit.SECONDS); ++ } ++ catch (Throwable t) ++ { ++ JVMStabilityInspector.inspectThrowable(t); ++ logger.debug("Failed to wait for tracing events to complete: {}", t); ++ } + } + + static void executeMutation(final Mutation mutation) + { + StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() + { + protected void runMayThrow() + { + mutateWithCatch(mutation); + } + }); + } + + static void mutateWithCatch(Mutation mutation) + { + try + { + StorageProxy.mutate(Collections.singletonList(mutation), ConsistencyLevel.ANY); + } + catch (OverloadedException e) + { + Tracing.logger.warn("Too many nodes are overloaded to save trace events"); + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e775eaa/test/unit/org/apache/cassandra/tracing/TracingTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/tracing/TracingTest.java index 1b0e507,0000000..30521c0 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/tracing/TracingTest.java +++ b/test/unit/org/apache/cassandra/tracing/TracingTest.java @@@ -1,173 -1,0 +1,176 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tracing; + +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.junit.Test; + +import org.apache.cassandra.utils.progress.ProgressEvent; +import org.apache.cassandra.utils.progress.ProgressListener; + +public final class TracingTest +{ + + @Test + public void test() + { + List<String> traces = new ArrayList<>(); + Tracing tracing = new TracingImpl(traces); + tracing.newSession(Tracing.TraceType.NONE); + TraceState state = tracing.begin("test-request", Collections.<String,String>emptyMap()); + state.trace("test-1"); + state.trace("test-2"); + state.trace("test-3"); + tracing.stopSession(); + + assert null == tracing.get(); + assert 4 == traces.size(); + assert "test-request".equals(traces.get(0)); + assert "test-1".equals(traces.get(1)); + assert "test-2".equals(traces.get(2)); + assert "test-3".equals(traces.get(3)); + } + + @Test + public void test_get() + { + List<String> traces = new ArrayList<>(); + Tracing tracing = new TracingImpl(traces); + tracing.newSession(Tracing.TraceType.NONE); + tracing.begin("test-request", Collections.<String,String>emptyMap()); + tracing.get().trace("test-1"); + tracing.get().trace("test-2"); + tracing.get().trace("test-3"); + tracing.stopSession(); + + assert null == tracing.get(); + assert 4 == traces.size(); + assert "test-request".equals(traces.get(0)); + assert "test-1".equals(traces.get(1)); + assert "test-2".equals(traces.get(2)); + assert "test-3".equals(traces.get(3)); + } + + @Test + public void test_get_uuid() + { + List<String> traces = new ArrayList<>(); + Tracing tracing = new TracingImpl(traces); + UUID uuid = tracing.newSession(Tracing.TraceType.NONE); + tracing.begin("test-request", Collections.<String,String>emptyMap()); + tracing.get(uuid).trace("test-1"); + tracing.get(uuid).trace("test-2"); + tracing.get(uuid).trace("test-3"); + tracing.stopSession(); + + assert null == tracing.get(); + assert 4 == traces.size(); + assert "test-request".equals(traces.get(0)); + assert "test-1".equals(traces.get(1)); + assert "test-2".equals(traces.get(2)); + assert "test-3".equals(traces.get(3)); + } + + @Test + public void test_states() + { + List<String> traces = new ArrayList<>(); + Tracing tracing = new TracingImpl(traces); + tracing.newSession(Tracing.TraceType.REPAIR); + tracing.begin("test-request", Collections.<String,String>emptyMap()); + tracing.get().enableActivityNotification("test-tag"); + assert TraceState.Status.IDLE == tracing.get().waitActivity(1); + tracing.get().trace("test-1"); + assert TraceState.Status.ACTIVE == tracing.get().waitActivity(1); + tracing.get().stop(); + assert TraceState.Status.STOPPED == tracing.get().waitActivity(1); + tracing.stopSession(); + assert null == tracing.get(); + } + + @Test + public void test_progress_listener() + { + List<String> traces = new ArrayList<>(); + Tracing tracing = new TracingImpl(traces); + tracing.newSession(Tracing.TraceType.REPAIR); + tracing.begin("test-request", Collections.<String,String>emptyMap()); + tracing.get().enableActivityNotification("test-tag"); + + tracing.get().addProgressListener( + new ProgressListener() + { + public void progress(String tag, ProgressEvent pe) + { + assert "test-tag".equals(tag); + assert "test-trace".equals(pe.getMessage()); + } + }); + + tracing.get().trace("test-trace"); + tracing.stopSession(); + assert null == tracing.get(); + } + + private class TracingImpl extends Tracing + { + private final List<String> traces; + + public TracingImpl(List<String> traces) + { + this.traces = traces; + } + + public void stopSessionImpl() + {} + + public TraceState begin(String request, InetAddress ia, Map<String, String> map) + { + traces.add(request); + return get(); + } + + protected TraceState newTraceState(InetAddress ia, UUID uuid, Tracing.TraceType tt) + { + return new TraceState(ia, uuid, tt) + { + protected void traceImpl(String string) + { + traces.add(string); + } + ++ protected void waitForPendingEvents() ++ { ++ } + }; + } + + public void trace(ByteBuffer bb, String string, int i) + { + throw new UnsupportedOperationException("Not supported yet."); + } + } +}