Make sure client gets tombstone overwhelmed warning patch by Carl Yeksigian; reviewed by Josh McKenzie for CASSANDRA-9465
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dbf6e62c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dbf6e62c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dbf6e62c Branch: refs/heads/trunk Commit: dbf6e62c382d62f9c1727ecf5afb90d131a81775 Parents: 582bdba Author: Carl Yeksigian <c...@apache.org> Authored: Wed Jan 13 13:22:36 2016 -0500 Committer: Carl Yeksigian <c...@apache.org> Committed: Wed Jan 13 14:35:59 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../AbstractLocalAwareExecutorService.java | 229 +++++++++++++++++++ .../AbstractTracingAwareExecutorService.java | 229 ------------------- .../DebuggableThreadPoolExecutor.java | 48 ++-- .../cassandra/concurrent/ExecutorLocal.java | 44 ++++ .../cassandra/concurrent/ExecutorLocals.java | 84 +++++++ .../concurrent/LocalAwareExecutorService.java | 34 +++ .../cassandra/concurrent/SEPExecutor.java | 3 +- .../concurrent/SharedExecutorPool.java | 2 +- .../cassandra/concurrent/StageManager.java | 12 +- .../concurrent/TracingAwareExecutorService.java | 36 --- .../cql3/statements/BatchStatement.java | 11 +- .../cql3/statements/SelectStatement.java | 4 +- .../cassandra/db/filter/SliceQueryFilter.java | 2 +- .../apache/cassandra/net/MessagingService.java | 8 +- .../apache/cassandra/service/ClientWarn.java | 62 +++-- .../apache/cassandra/service/StorageProxy.java | 2 +- .../org/apache/cassandra/tracing/Tracing.java | 3 +- .../org/apache/cassandra/transport/Message.java | 6 +- .../transport/RequestThreadPoolExecutor.java | 4 +- .../cassandra/service/ClientWarningsTest.java | 43 ++++ 21 files changed, 529 insertions(+), 338 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 11f2529..6530956 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.5 + * Make sure client gets tombstone overwhelmed warning (CASSANDRA-9465) * Fix error streaming section more than 2GB (CASSANDRA-10961) * (cqlsh) Also apply --connect-timeout to control connection timeout (CASSANDRA-10959) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java new file mode 100644 index 0000000..088b43e --- /dev/null +++ b/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.concurrent; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.tracing.TraceState; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.concurrent.SimpleCondition; +import org.apache.cassandra.utils.JVMStabilityInspector; + +import static org.apache.cassandra.tracing.Tracing.isTracing; + +public abstract class AbstractLocalAwareExecutorService implements LocalAwareExecutorService +{ + private static final Logger logger = LoggerFactory.getLogger(AbstractLocalAwareExecutorService.class); + + protected abstract void addTask(FutureTask<?> futureTask); + protected abstract void onCompletion(); + + /** Task Submission / Creation / Objects **/ + + public <T> FutureTask<T> submit(Callable<T> task) + { + return submit(newTaskFor(task)); + } + + public FutureTask<?> submit(Runnable task) + { + return submit(newTaskFor(task, null)); + } + + public <T> FutureTask<T> submit(Runnable task, T result) + { + return submit(newTaskFor(task, result)); + } + + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) + { + throw new UnsupportedOperationException(); + } + + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException + { + throw new UnsupportedOperationException(); + } + + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException + { + throw new UnsupportedOperationException(); + } + + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException + { + throw new UnsupportedOperationException(); + } + + protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result) + { + return newTaskFor(runnable, result, ExecutorLocals.create()); + } + + protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result, ExecutorLocals locals) + { + if (locals != null) + { + if (runnable instanceof LocalSessionFutureTask) + return (LocalSessionFutureTask<T>) runnable; + return new LocalSessionFutureTask<T>(runnable, result, locals); + } + if (runnable instanceof FutureTask) + return (FutureTask<T>) runnable; + return new FutureTask<>(runnable, result); + } + + protected <T> FutureTask<T> newTaskFor(Callable<T> callable) + { + if (isTracing()) + { + if (callable instanceof LocalSessionFutureTask) + return (LocalSessionFutureTask<T>) callable; + return new LocalSessionFutureTask<T>(callable, ExecutorLocals.create()); + } + if (callable instanceof FutureTask) + return (FutureTask<T>) callable; + return new FutureTask<>(callable); + } + + private class LocalSessionFutureTask<T> extends FutureTask<T> + { + private final ExecutorLocals locals; + + public LocalSessionFutureTask(Callable<T> callable, ExecutorLocals locals) + { + super(callable); + this.locals = locals; + } + + public LocalSessionFutureTask(Runnable runnable, T result, ExecutorLocals locals) + { + super(runnable, result); + this.locals = locals; + } + + public void run() + { + ExecutorLocals old = ExecutorLocals.create(); + ExecutorLocals.set(locals); + try + { + super.run(); + } + finally + { + ExecutorLocals.set(old); + } + } + } + + class FutureTask<T> extends SimpleCondition implements Future<T>, Runnable + { + private boolean failure; + private Object result = this; + private final Callable<T> callable; + + public FutureTask(Callable<T> callable) + { + this.callable = callable; + } + public FutureTask(Runnable runnable, T result) + { + this(Executors.callable(runnable, result)); + } + + public void run() + { + try + { + result = callable.call(); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + logger.warn("Uncaught exception on thread {}: {}", Thread.currentThread(), t); + result = t; + failure = true; + } + finally + { + signalAll(); + onCompletion(); + } + } + + public boolean cancel(boolean mayInterruptIfRunning) + { + return false; + } + + public boolean isCancelled() + { + return false; + } + + public boolean isDone() + { + return isSignaled(); + } + + public T get() throws InterruptedException, ExecutionException + { + await(); + Object result = this.result; + if (failure) + throw new ExecutionException((Throwable) result); + return (T) result; + } + + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException + { + await(timeout, unit); + Object result = this.result; + if (failure) + throw new ExecutionException((Throwable) result); + return (T) result; + } + } + + private <T> FutureTask<T> submit(FutureTask<T> task) + { + addTask(task); + return task; + } + + public void execute(Runnable command) + { + addTask(newTaskFor(command, ExecutorLocals.create())); + } + + public void execute(Runnable command, ExecutorLocals locals) + { + addTask(newTaskFor(command, null, locals)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java deleted file mode 100644 index fb753b0..0000000 --- a/src/java/org/apache/cassandra/concurrent/AbstractTracingAwareExecutorService.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.concurrent; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.tracing.TraceState; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.concurrent.SimpleCondition; -import org.apache.cassandra.utils.JVMStabilityInspector; - -import static org.apache.cassandra.tracing.Tracing.isTracing; - -public abstract class AbstractTracingAwareExecutorService implements TracingAwareExecutorService -{ - private static final Logger logger = LoggerFactory.getLogger(AbstractTracingAwareExecutorService.class); - - protected abstract void addTask(FutureTask<?> futureTask); - protected abstract void onCompletion(); - - /** Task Submission / Creation / Objects **/ - - public <T> FutureTask<T> submit(Callable<T> task) - { - return submit(newTaskFor(task)); - } - - public FutureTask<?> submit(Runnable task) - { - return submit(newTaskFor(task, null)); - } - - public <T> FutureTask<T> submit(Runnable task, T result) - { - return submit(newTaskFor(task, result)); - } - - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) - { - throw new UnsupportedOperationException(); - } - - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException - { - throw new UnsupportedOperationException(); - } - - public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException - { - throw new UnsupportedOperationException(); - } - - public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException - { - throw new UnsupportedOperationException(); - } - - protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result) - { - return newTaskFor(runnable, result, Tracing.instance.get()); - } - - protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result, TraceState traceState) - { - if (traceState != null) - { - if (runnable instanceof TraceSessionFutureTask) - return (TraceSessionFutureTask<T>) runnable; - return new TraceSessionFutureTask<T>(runnable, result, traceState); - } - if (runnable instanceof FutureTask) - return (FutureTask<T>) runnable; - return new FutureTask<>(runnable, result); - } - - protected <T> FutureTask<T> newTaskFor(Callable<T> callable) - { - if (isTracing()) - { - if (callable instanceof TraceSessionFutureTask) - return (TraceSessionFutureTask<T>) callable; - return new TraceSessionFutureTask<T>(callable, Tracing.instance.get()); - } - if (callable instanceof FutureTask) - return (FutureTask<T>) callable; - return new FutureTask<>(callable); - } - - private class TraceSessionFutureTask<T> extends FutureTask<T> - { - private final TraceState state; - - public TraceSessionFutureTask(Callable<T> callable, TraceState state) - { - super(callable); - this.state = state; - } - - public TraceSessionFutureTask(Runnable runnable, T result, TraceState state) - { - super(runnable, result); - this.state = state; - } - - public void run() - { - TraceState oldState = Tracing.instance.get(); - Tracing.instance.set(state); - try - { - super.run(); - } - finally - { - Tracing.instance.set(oldState); - } - } - } - - class FutureTask<T> extends SimpleCondition implements Future<T>, Runnable - { - private boolean failure; - private Object result = this; - private final Callable<T> callable; - - public FutureTask(Callable<T> callable) - { - this.callable = callable; - } - public FutureTask(Runnable runnable, T result) - { - this(Executors.callable(runnable, result)); - } - - public void run() - { - try - { - result = callable.call(); - } - catch (Throwable t) - { - JVMStabilityInspector.inspectThrowable(t); - logger.warn("Uncaught exception on thread {}: {}", Thread.currentThread(), t); - result = t; - failure = true; - } - finally - { - signalAll(); - onCompletion(); - } - } - - public boolean cancel(boolean mayInterruptIfRunning) - { - return false; - } - - public boolean isCancelled() - { - return false; - } - - public boolean isDone() - { - return isSignaled(); - } - - public T get() throws InterruptedException, ExecutionException - { - await(); - Object result = this.result; - if (failure) - throw new ExecutionException((Throwable) result); - return (T) result; - } - - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException - { - await(timeout, unit); - Object result = this.result; - if (failure) - throw new ExecutionException((Throwable) result); - return (T) result; - } - } - - private <T> FutureTask<T> submit(FutureTask<T> task) - { - addTask(task); - return task; - } - - public void execute(Runnable command) - { - addTask(newTaskFor(command, null)); - } - - public void execute(Runnable command, TraceState state) - { - addTask(newTaskFor(command, null, state)); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java index a6d0049..1fb0690 100644 --- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java @@ -44,7 +44,7 @@ import static org.apache.cassandra.tracing.Tracing.isTracing; * threads and the queue is full, we want the enqueuer to block. But to allow the number of threads to drop if a * stage is less busy, core thread timeout is enabled. */ -public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService +public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements LocalAwareExecutorService { protected static final Logger logger = LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class); public static final RejectedExecutionHandler blockingExecutionHandler = new RejectedExecutionHandler() @@ -146,11 +146,11 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements protected void onFinalAccept(Runnable task) {} protected void onFinalRejection(Runnable task) {} - public void execute(Runnable command, TraceState state) + public void execute(Runnable command, ExecutorLocals locals) { - super.execute(state == null || command instanceof TraceSessionWrapper + super.execute(locals == null || command instanceof LocalSessionWrapper ? command - : new TraceSessionWrapper<Object>(command, state)); + : new LocalSessionWrapper<Object>(command, locals)); } public void maybeExecuteImmediately(Runnable command) @@ -162,17 +162,17 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements @Override public void execute(Runnable command) { - super.execute(isTracing() && !(command instanceof TraceSessionWrapper) - ? new TraceSessionWrapper<Object>(Executors.callable(command, null)) + super.execute(isTracing() && !(command instanceof LocalSessionWrapper) + ? new LocalSessionWrapper<Object>(Executors.callable(command, null)) : command); } @Override protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T result) { - if (isTracing() && !(runnable instanceof TraceSessionWrapper)) + if (isTracing() && !(runnable instanceof LocalSessionWrapper)) { - return new TraceSessionWrapper<T>(Executors.callable(runnable, result)); + return new LocalSessionWrapper<T>(Executors.callable(runnable, result)); } return super.newTaskFor(runnable, result); } @@ -180,9 +180,9 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements @Override protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { - if (isTracing() && !(callable instanceof TraceSessionWrapper)) + if (isTracing() && !(callable instanceof LocalSessionWrapper)) { - return new TraceSessionWrapper<T>(callable); + return new LocalSessionWrapper<T>(callable); } return super.newTaskFor(callable); } @@ -198,9 +198,9 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements protected static void maybeResetTraceSessionWrapper(Runnable r) { - if (r instanceof TraceSessionWrapper) + if (r instanceof LocalSessionWrapper) { - TraceSessionWrapper tsw = (TraceSessionWrapper) r; + LocalSessionWrapper tsw = (LocalSessionWrapper) r; // we have to reset trace state as its presence is what denotes the current thread is tracing // and if left this thread might start tracing unrelated tasks tsw.reset(); @@ -210,8 +210,8 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements @Override protected void beforeExecute(Thread t, Runnable r) { - if (r instanceof TraceSessionWrapper) - ((TraceSessionWrapper) r).setupContext(); + if (r instanceof LocalSessionWrapper) + ((LocalSessionWrapper) r).setupContext(); super.beforeExecute(t, r); } @@ -278,35 +278,35 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements } /** - * Used to wrap a Runnable or Callable passed to submit or execute so we can clone the TraceSessionContext and move - * it into the worker thread. + * Used to wrap a Runnable or Callable passed to submit or execute so we can clone the ExecutorLocals and move + * them into the worker thread. * * @param <T> */ - private static class TraceSessionWrapper<T> extends FutureTask<T> + private static class LocalSessionWrapper<T> extends FutureTask<T> { - private final TraceState state; + private final ExecutorLocals locals; - public TraceSessionWrapper(Callable<T> callable) + public LocalSessionWrapper(Callable<T> callable) { super(callable); - state = Tracing.instance.get(); + locals = ExecutorLocals.create(); } - public TraceSessionWrapper(Runnable command, TraceState state) + public LocalSessionWrapper(Runnable command, ExecutorLocals locals) { super(command, null); - this.state = state; + this.locals = locals; } private void setupContext() { - Tracing.instance.set(state); + ExecutorLocals.set(locals); } private void reset() { - Tracing.instance.set(null); + ExecutorLocals.set(null); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/ExecutorLocal.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/ExecutorLocal.java b/src/java/org/apache/cassandra/concurrent/ExecutorLocal.java new file mode 100644 index 0000000..47826f3 --- /dev/null +++ b/src/java/org/apache/cassandra/concurrent/ExecutorLocal.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.concurrent; + +import org.apache.cassandra.service.ClientWarn; +import org.apache.cassandra.tracing.TraceState; +import org.apache.cassandra.tracing.Tracing; + +public interface ExecutorLocal<T> +{ + ExecutorLocal[] all = { Tracing.instance, ClientWarn.instance }; + + /** + * This is called when scheduling the task, and also before calling {@link ExecutorLocal#set(T)} when running on a + * executor thread. + * + * @return The thread-local value that we want to copy across executor boundaries; may be null if not set. + */ + T get(); + + /** + * Before a task has been run, this will be called with the value from the thread that scheduled the task, and after + * the task is finished, the value that was previously retrieved from this thread is restored. + * + * @param value Value to use for the executor local state; may be null. + */ + void set(T value); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java b/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java new file mode 100644 index 0000000..8e6d6ea --- /dev/null +++ b/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.concurrent; + +import java.util.Arrays; + +import org.apache.cassandra.service.ClientWarn; +import org.apache.cassandra.tracing.TraceState; +import org.apache.cassandra.tracing.Tracing; + +/* + * This class only knows about Tracing and ClientWarn, so if any different executor locals are added, it must be + * updated. + * + * We don't enumerate the ExecutorLocal.all array each time because it would be much slower. + */ +public class ExecutorLocals +{ + private static final ExecutorLocal<TraceState> tracing = Tracing.instance; + private static final ExecutorLocal<ClientWarn.State> clientWarn = ClientWarn.instance; + + public final TraceState traceState; + public final ClientWarn.State clientWarnState; + + private ExecutorLocals(TraceState traceState, ClientWarn.State clientWarnState) + { + this.traceState = traceState; + this.clientWarnState = clientWarnState; + } + + static + { + assert Arrays.equals(ExecutorLocal.all, new ExecutorLocal[]{ tracing, clientWarn }) + : "ExecutorLocals has not been updated to reflect new ExecutorLocal.all"; + } + + /** + * This creates a new ExecutorLocals object based on what is already set. + * + * @return an ExecutorLocals object which has the trace state and client warn state captured if either has been set, + * or null if both are unset. The null result short-circuits logic in + * {@link AbstractLocalAwareExecutorService#newTaskFor(Runnable, Object, ExecutorLocals)}, preventing + * unnecessarily calling {@link ExecutorLocals#set(ExecutorLocals)}. + */ + public static ExecutorLocals create() + { + TraceState traceState = tracing.get(); + ClientWarn.State clientWarnState = clientWarn.get(); + if (traceState == null && clientWarnState == null) + return null; + else + return new ExecutorLocals(traceState, clientWarnState); + } + + public static ExecutorLocals create(TraceState traceState) + { + ClientWarn.State clientWarnState = clientWarn.get(); + return new ExecutorLocals(traceState, clientWarnState); + } + + public static void set(ExecutorLocals locals) + { + TraceState traceState = locals == null ? null : locals.traceState; + ClientWarn.State clientWarnState = locals == null ? null : locals.clientWarnState; + tracing.set(traceState); + clientWarn.set(clientWarnState); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/LocalAwareExecutorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/LocalAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/LocalAwareExecutorService.java new file mode 100644 index 0000000..5577d59 --- /dev/null +++ b/src/java/org/apache/cassandra/concurrent/LocalAwareExecutorService.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.cassandra.concurrent; + +import java.util.concurrent.ExecutorService; + +public interface LocalAwareExecutorService extends ExecutorService +{ + // we need a way to inject a TraceState directly into the Executor context without going through + // the global Tracing sessions; see CASSANDRA-5668 + public void execute(Runnable command, ExecutorLocals locals); + + // permits executing in the context of the submitting thread + public void maybeExecuteImmediately(Runnable command); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/SEPExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java index d9a0fa8..8b12b82 100644 --- a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.cassandra.metrics.SEPMetrics; @@ -30,7 +29,7 @@ import org.apache.cassandra.utils.concurrent.WaitQueue; import static org.apache.cassandra.concurrent.SEPWorker.Work; -public class SEPExecutor extends AbstractTracingAwareExecutorService +public class SEPExecutor extends AbstractLocalAwareExecutorService { private final SharedExecutorPool pool; http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java index 8c18c44..dfd7011 100644 --- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java +++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java @@ -103,7 +103,7 @@ public class SharedExecutorPool schedule(Work.SPINNING); } - public TracingAwareExecutorService newExecutor(int maxConcurrency, int maxQueuedTasks, String jmxPath, String name) + public LocalAwareExecutorService newExecutor(int maxConcurrency, int maxQueuedTasks, String jmxPath, String name) { SEPExecutor executor = new SEPExecutor(this, maxConcurrency, maxQueuedTasks, jmxPath, name); executors.add(executor); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/StageManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java index 4f03fd5..114795e 100644 --- a/src/java/org/apache/cassandra/concurrent/StageManager.java +++ b/src/java/org/apache/cassandra/concurrent/StageManager.java @@ -39,7 +39,7 @@ public class StageManager { private static final Logger logger = LoggerFactory.getLogger(StageManager.class); - private static final EnumMap<Stage, TracingAwareExecutorService> stages = new EnumMap<Stage, TracingAwareExecutorService>(Stage.class); + private static final EnumMap<Stage, LocalAwareExecutorService> stages = new EnumMap<Stage, LocalAwareExecutorService>(Stage.class); public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle @@ -87,7 +87,7 @@ public class StageManager stage.getJmxType()); } - private static TracingAwareExecutorService multiThreadedLowSignalStage(Stage stage, int numThreads) + private static LocalAwareExecutorService multiThreadedLowSignalStage(Stage stage, int numThreads) { return SharedExecutorPool.SHARED.newExecutor(numThreads, Integer.MAX_VALUE, stage.getJmxType(), stage.getJmxName()); } @@ -96,7 +96,7 @@ public class StageManager * Retrieve a stage from the StageManager * @param stage name of the stage to be retrieved. */ - public static TracingAwareExecutorService getStage(Stage stage) + public static LocalAwareExecutorService getStage(Stage stage) { return stages.get(stage); } @@ -116,16 +116,16 @@ public class StageManager * A TPE that disallows submit so that we don't need to worry about unwrapping exceptions on the * tracing stage. See CASSANDRA-1123 for background. */ - private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService + private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements LocalAwareExecutorService { public ExecuteOnlyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } - public void execute(Runnable command, TraceState state) + public void execute(Runnable command, ExecutorLocals locals) { - assert state == null; + assert locals == null; super.execute(command); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java deleted file mode 100644 index f580fea..0000000 --- a/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.cassandra.concurrent; - -import java.util.concurrent.ExecutorService; - -import org.apache.cassandra.tracing.TraceState; - -public interface TracingAwareExecutorService extends ExecutorService -{ - // we need a way to inject a TraceState directly into the Executor context without going through - // the global Tracing sessions; see CASSANDRA-5668 - public void execute(Runnable command, TraceState state); - - // permits executing in the context of the submitting thread - public void maybeExecuteImmediately(Runnable command); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 43a80bb..a289ad1 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -273,7 +273,7 @@ public class BatchStatement implements CQLStatement { logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold, ""); } - ClientWarn.warn(MessageFormatter.arrayFormat(format, new Object[] {ksCfPairs, size, warnThreshold, size - warnThreshold, ""}).getMessage()); + ClientWarn.instance.warn(MessageFormatter.arrayFormat(format, new Object[]{ ksCfPairs, size, warnThreshold, size - warnThreshold, "" }).getMessage()); } } @@ -305,8 +305,13 @@ public class BatchStatement implements CQLStatement keySet.size(), keySet.size() == 1 ? "" : "s", ksCfPairs.size() == 1 ? "" : "s", ksCfPairs); - ClientWarn.warn(MessageFormatter.arrayFormat(unloggedBatchWarning, new Object[]{keySet.size(), keySet.size() == 1 ? "" : "s", - ksCfPairs.size() == 1 ? "" : "s", ksCfPairs}).getMessage()); + ClientWarn.instance.warn(MessageFormatter.arrayFormat(unloggedBatchWarning, + new Object[]{ + keySet.size(), + keySet.size() == 1 ? "" : "s", + ksCfPairs.size() == 1 ? "" : "s", + ksCfPairs + }).getMessage()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 5f142ce..5cfa94b 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -251,12 +251,12 @@ public class SelectStatement implements CQLStatement if (!restrictions.hasPartitionKeyRestrictions()) { logger.warn("Aggregation query used without partition key"); - ClientWarn.warn("Aggregation query used without partition key"); + ClientWarn.instance.warn("Aggregation query used without partition key"); } else if (restrictions.keyIsInRelation()) { logger.warn("Aggregation query used on multiple partition keys (IN restriction)"); - ClientWarn.warn("Aggregation query used on multiple partition keys (IN restriction)"); + ClientWarn.instance.warn("Aggregation query used on multiple partition keys (IN restriction)"); } Selection.ResultSetBuilder result = selection.resultSetBuilder(now, parameters.isJson); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java index e1a68e7..d2f0bf4 100644 --- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java +++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java @@ -303,7 +303,7 @@ public class SliceQueryFilter implements IDiskAtomFilter container.metadata().getKeyValidator().getString(key.getKey()), count, getSlicesInfo(container)); - ClientWarn.warn(msg); + ClientWarn.instance.warn(msg); logger.warn(msg); } Tracing.trace("Read {} live and {} tombstone cells{}", http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 61e58c2..459923b 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -43,10 +43,12 @@ import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ExecutorLocal; +import org.apache.cassandra.concurrent.ExecutorLocals; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.concurrent.TracingAwareExecutorService; +import org.apache.cassandra.concurrent.LocalAwareExecutorService; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; import org.apache.cassandra.db.*; @@ -801,10 +803,10 @@ public final class MessagingService implements MessagingServiceMBean return; Runnable runnable = new MessageDeliveryTask(message, id, timestamp, isCrossNodeTimestamp); - TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType()); + LocalAwareExecutorService stage = StageManager.getStage(message.getMessageType()); assert stage != null : "No stage for message type " + message.verb; - stage.execute(runnable, state); + stage.execute(runnable, ExecutorLocals.create(state)); } public void setCallbackForTests(int messageId, CallbackInfo callback) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/service/ClientWarn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ClientWarn.java b/src/java/org/apache/cassandra/service/ClientWarn.java index 2ed0a6c..ddad197 100644 --- a/src/java/org/apache/cassandra/service/ClientWarn.java +++ b/src/java/org/apache/cassandra/service/ClientWarn.java @@ -20,54 +20,68 @@ package org.apache.cassandra.service; import java.util.ArrayList; import java.util.List; +import org.apache.cassandra.concurrent.ExecutorLocal; import org.apache.cassandra.utils.FBUtilities; -public class ClientWarn +public class ClientWarn implements ExecutorLocal<ClientWarn.State> { private static final String TRUNCATED = " [truncated]"; - private static final ThreadLocal<ClientWarn> warnLocal = new ThreadLocal<>(); - - private final List<String> warnings = new ArrayList<>(); + private static final ThreadLocal<ClientWarn.State> warnLocal = new ThreadLocal<>(); + public static ClientWarn instance = new ClientWarn(); private ClientWarn() { } - public static void warn(String text) - { - ClientWarn warner = warnLocal.get(); - if (warner != null) - warner.add(text); + public State get() { + return warnLocal.get(); } - private void add(String warning) - { - if (warnings.size() < FBUtilities.MAX_UNSIGNED_SHORT) - warnings.add(maybeTruncate(warning)); + public void set(State value) { + warnLocal.set(value); } - private static String maybeTruncate(String warning) + public void warn(String text) { - return warning.length() > FBUtilities.MAX_UNSIGNED_SHORT - ? warning.substring(0, FBUtilities.MAX_UNSIGNED_SHORT - TRUNCATED.length()) + TRUNCATED - : warning; + State state = warnLocal.get(); + if (state != null) + state.add(text); } - public static void captureWarnings() + public void captureWarnings() { - warnLocal.set(new ClientWarn()); + warnLocal.set(new State()); } - public static List<String> getWarnings() + public List<String> getWarnings() { - ClientWarn warner = warnLocal.get(); - if (warner == null || warner.warnings.isEmpty()) + State state = warnLocal.get(); + if (state == null || state.warnings.isEmpty()) return null; - return warner.warnings; + return state.warnings; } - public static void resetWarnings() + public void resetWarnings() { warnLocal.remove(); } + + public static class State + { + private final List<String> warnings = new ArrayList<>(); + + private void add(String warning) + { + if (warnings.size() < FBUtilities.MAX_UNSIGNED_SHORT) + warnings.add(maybeTruncate(warning)); + } + + private static String maybeTruncate(String warning) + { + return warning.length() > FBUtilities.MAX_UNSIGNED_SHORT + ? warning.substring(0, FBUtilities.MAX_UNSIGNED_SHORT - TRUNCATED.length()) + TRUNCATED + : warning; + } + + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 88253e3..841e980 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1842,7 +1842,7 @@ public class StorageProxy implements StorageProxyMBean if (filteredEndpoints.size() == 1 && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())) { - StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler), Tracing.instance.get()); + StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler)); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/tracing/Tracing.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java index ccc2637..bf9cee7 100644 --- a/src/java/org/apache/cassandra/tracing/Tracing.java +++ b/src/java/org/apache/cassandra/tracing/Tracing.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ExecutorLocal; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.net.MessageIn; @@ -41,7 +42,7 @@ import org.apache.cassandra.utils.UUIDGen; * A trace session context. Able to track and store trace sessions. A session is usually a user initiated query, and may * have multiple local and remote events before it is completed. All events and sessions are stored at keyspace. */ -public class Tracing +public class Tracing implements ExecutorLocal<TraceState> { public static final String TRACE_HEADER = "TraceSession"; public static final String TRACE_TYPE = "TraceType"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/transport/Message.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index ab794df..01a0794 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -499,14 +499,14 @@ public abstract class Message assert request.connection() instanceof ServerConnection; connection = (ServerConnection)request.connection(); if (connection.getVersion() >= Server.VERSION_4) - ClientWarn.captureWarnings(); + ClientWarn.instance.captureWarnings(); QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId()); logger.trace("Received: {}, v={}", request, connection.getVersion()); response = request.execute(qstate); response.setStreamId(request.getStreamId()); - response.setWarnings(ClientWarn.getWarnings()); + response.setWarnings(ClientWarn.instance.getWarnings()); response.attach(connection); connection.applyStateTransition(request.type, response.type); } @@ -519,7 +519,7 @@ public abstract class Message } finally { - ClientWarn.resetWarnings(); + ClientWarn.instance.resetWarnings(); } logger.trace("Responding: {}, v={}", response, connection.getVersion()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java index 4ecd6a7..289f3e3 100644 --- a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java +++ b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit; import io.netty.util.concurrent.AbstractEventExecutor; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; -import org.apache.cassandra.concurrent.TracingAwareExecutorService; +import org.apache.cassandra.concurrent.LocalAwareExecutorService; import org.apache.cassandra.config.DatabaseDescriptor; import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED; @@ -32,7 +32,7 @@ public class RequestThreadPoolExecutor extends AbstractEventExecutor { private final static int MAX_QUEUED_REQUESTS = 128; private final static String THREAD_FACTORY_ID = "Native-Transport-Requests"; - private final TracingAwareExecutorService wrapped = SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(), + private final LocalAwareExecutorService wrapped = SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(), MAX_QUEUED_REQUESTS, "transport", THREAD_FACTORY_ID); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbf6e62c/test/unit/org/apache/cassandra/service/ClientWarningsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java index cfd5f7a..d22a8f6 100644 --- a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java +++ b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java @@ -24,6 +24,8 @@ import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.ByteOrderedPartitioner; import org.apache.cassandra.transport.Message; import org.apache.cassandra.transport.Server; @@ -79,6 +81,47 @@ public class ClientWarningsTest extends CQLTester } @Test + public void testTombstoneWarning() throws Exception + { + final int iterations = 10000; + createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_4)) + { + client.connect(false); + + for (int i = 0; i < iterations; i++) + { + QueryMessage query = new QueryMessage(String.format("INSERT INTO %s.%s (pk, ck, v) VALUES (1, %s, 1)", + KEYSPACE, + currentTable(), + i), QueryOptions.DEFAULT); + client.execute(query); + } + ColumnFamilyStore store = Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable()); + store.forceBlockingFlush(); + + for (int i = 0; i < iterations; i++) + { + QueryMessage query = new QueryMessage(String.format("DELETE v FROM %s.%s WHERE pk = 1 AND ck = %s", + KEYSPACE, + currentTable(), + i), QueryOptions.DEFAULT); + client.execute(query); + } + store.forceBlockingFlush(); + + { + QueryMessage query = new QueryMessage(String.format("SELECT * FROM %s.%s WHERE pk = 1", + KEYSPACE, + currentTable()), QueryOptions.DEFAULT); + Message.Response resp = client.execute(query); + assertEquals(1, resp.getWarnings().size()); + } + } + } + + @Test public void testLargeBatchWithProtoV2() throws Exception { createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");