This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.1 by this push: new c3327855e4 Add a virtual table that exposes currently running queries c3327855e4 is described below commit c3327855e4bf98f8631c959b82bd8470726034a8 Author: Caleb Rackliffe <calebrackli...@gmail.com> AuthorDate: Mon Jun 27 18:40:40 2022 -0500 Add a virtual table that exposes currently running queries patch by Chris Lohfink; reviewed by Caleb Rackliffe and Benedict Elliott Smith for CASSANDRA-15241 Co-authored-by: Chris Lohfink <clohf...@apple.com> Co-authored-by: Caleb Rackliffe <calebrackli...@gmail.com> Co-authored-by: Benedict Elliott Smith <bened...@apache.org> --- CHANGES.txt | 1 + .../cassandra/concurrent/DebuggableTask.java | 85 +++++++++++++++ .../cassandra/concurrent/ExecutionFailure.java | 61 ++++++++++- .../apache/cassandra/concurrent/FutureTask.java | 21 +++- .../org/apache/cassandra/concurrent/SEPWorker.java | 52 +++++++++- .../cassandra/concurrent/SharedExecutorPool.java | 26 ++++- .../apache/cassandra/concurrent/TaskFactory.java | 4 + .../apache/cassandra/db/virtual/QueriesTable.java | 94 +++++++++++++++++ .../cassandra/db/virtual/SystemViewsKeyspace.java | 1 + .../org/apache/cassandra/service/StorageProxy.java | 114 ++++++++++++++++----- .../org/apache/cassandra/transport/Dispatcher.java | 68 +++++++++--- .../transport/InitialConnectionHandler.java | 5 +- .../org/apache/cassandra/transport/Message.java | 3 +- .../cassandra/transport/messages/QueryMessage.java | 3 +- .../distributed/test/QueriesTableTest.java | 89 ++++++++++++++++ 15 files changed, 581 insertions(+), 46 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 45c80354fa..a4fa4ce525 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.1.3 + * Add a virtual table that exposes currently running queries (CASSANDRA-15241) Merged from 4.0: * Upgrade snappy to 1.1.10.1 (CASSANDRA-18608) * Fix assertion error when describing mv as table (CASSANDRA-18596) diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableTask.java b/src/java/org/apache/cassandra/concurrent/DebuggableTask.java new file mode 100644 index 0000000000..ac04eb4c34 --- /dev/null +++ b/src/java/org/apache/cassandra/concurrent/DebuggableTask.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.concurrent; + +import org.apache.cassandra.utils.Shared; + +import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES; +import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; + +/** + * Interface to include on a Runnable or Callable submitted to the {@link SharedExecutorPool} to provide more + * detailed diagnostics. + */ +@Shared(scope = SIMULATION, inner = INTERFACES) +public interface DebuggableTask +{ + public long creationTimeNanos(); + + public long startTimeNanos(); + + public String description(); + + interface RunnableDebuggableTask extends Runnable, DebuggableTask {} + + /** + * Wraps a {@link DebuggableTask} to include the name of the thread running it. + */ + public static class RunningDebuggableTask implements DebuggableTask + { + private final DebuggableTask task; + private final String threadId; + + public RunningDebuggableTask(String threadId, DebuggableTask task) + { + this.task = task; + this.threadId = threadId; + } + + public String threadId() + { + return threadId; + } + + public boolean hasTask() + { + return task != null; + } + + @Override + public long creationTimeNanos() + { + assert hasTask(); + return task.creationTimeNanos(); + } + + @Override + public long startTimeNanos() + { + assert hasTask(); + return task.startTimeNanos(); + } + + @Override + public String description() + { + assert hasTask(); + return task.description(); + } + } +} diff --git a/src/java/org/apache/cassandra/concurrent/ExecutionFailure.java b/src/java/org/apache/cassandra/concurrent/ExecutionFailure.java index 7fa7dcbd54..27ab885e23 100644 --- a/src/java/org/apache/cassandra/concurrent/ExecutionFailure.java +++ b/src/java/org/apache/cassandra/concurrent/ExecutionFailure.java @@ -21,6 +21,7 @@ package org.apache.cassandra.concurrent; import java.util.concurrent.Callable; import java.util.concurrent.Future; +import org.apache.cassandra.concurrent.DebuggableTask.RunnableDebuggableTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,6 +106,14 @@ public class ExecutionFailure return enforceOptions(withResources, wrap, false); } + /** + * @see #suppressing(WithResources, Runnable) + */ + static RunnableDebuggableTask suppressingDebuggable(WithResources withResources, RunnableDebuggableTask debuggable) + { + return enforceOptionsDebuggable(withResources, debuggable, false); + } + /** * Encapsulate the execution, propagating or suppressing any exceptions as requested. * @@ -119,7 +128,7 @@ public class ExecutionFailure @Override public void run() { - try (Closeable close = withResources.get()) + try (@SuppressWarnings("unused") Closeable close = withResources.get()) { wrap.run(); } @@ -139,6 +148,54 @@ public class ExecutionFailure }; } + /** + * @see #enforceOptions(WithResources, Runnable, boolean) + */ + private static RunnableDebuggableTask enforceOptionsDebuggable(WithResources withResources, RunnableDebuggableTask debuggable, boolean propagate) + { + return new RunnableDebuggableTask() + { + @Override + public void run() + { + try (@SuppressWarnings("unused") Closeable close = withResources.get()) + { + debuggable.run(); + } + catch (Throwable t) + { + handle(t); + if (propagate) + throw t; + } + } + + @Override + public String toString() + { + return debuggable.toString(); + } + + @Override + public long creationTimeNanos() + { + return debuggable.creationTimeNanos(); + } + + @Override + public long startTimeNanos() + { + return debuggable.startTimeNanos(); + } + + @Override + public String description() + { + return debuggable.description(); + } + }; + } + /** * See {@link #enforceOptions(WithResources, Callable)} */ @@ -158,7 +215,7 @@ public class ExecutionFailure @Override public V call() throws Exception { - try (Closeable close = withResources.get()) + try (@SuppressWarnings("unused") Closeable close = withResources.get()) { return wrap.call(); } diff --git a/src/java/org/apache/cassandra/concurrent/FutureTask.java b/src/java/org/apache/cassandra/concurrent/FutureTask.java index 2348ff6bf8..763884a2da 100644 --- a/src/java/org/apache/cassandra/concurrent/FutureTask.java +++ b/src/java/org/apache/cassandra/concurrent/FutureTask.java @@ -20,9 +20,10 @@ package org.apache.cassandra.concurrent; import java.util.concurrent.Callable; -import org.apache.cassandra.utils.concurrent.RunnableFuture; +import javax.annotation.Nullable; import org.apache.cassandra.utils.concurrent.AsyncFuture; +import org.apache.cassandra.utils.concurrent.RunnableFuture; /** * A FutureTask that utilises Cassandra's {@link AsyncFuture}, making it compatible with {@link ExecutorPlus}. @@ -31,15 +32,28 @@ import org.apache.cassandra.utils.concurrent.AsyncFuture; public class FutureTask<V> extends AsyncFuture<V> implements RunnableFuture<V> { private Callable<? extends V> call; + private volatile DebuggableTask debuggable; public FutureTask(Callable<? extends V> call) { - this.call = call; + this(call, call instanceof DebuggableTask ? (DebuggableTask) call : null); } public FutureTask(Runnable run) { - this.call = callable(run); + this(callable(run), run instanceof DebuggableTask ? (DebuggableTask) run : null); + } + + private FutureTask(Callable<? extends V> call, DebuggableTask debuggable) + { + this.call = call; + this.debuggable = debuggable; + } + + @Nullable + DebuggableTask debuggableTask() + { + return debuggable; } V call() throws Exception @@ -63,6 +77,7 @@ public class FutureTask<V> extends AsyncFuture<V> implements RunnableFuture<V> finally { call = null; + debuggable = null; } } diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java index c7b9abf719..fe16c950df 100644 --- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java +++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java @@ -48,6 +48,8 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl long prevStopCheck = 0; long soleSpinnerSpinTime = 0; + private final AtomicReference<Runnable> currentTask = new AtomicReference<>(); + SEPWorker(ThreadGroup threadGroup, Long workerId, Work initialState, SharedExecutorPool pool) { this.pool = pool; @@ -58,9 +60,27 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl thread.start(); } + /** + * @return the current {@link DebuggableTask}, if one exists + */ + public DebuggableTask currentDebuggableTask() + { + // can change after null check so go off local reference + Runnable task = currentTask.get(); + + // Local read and mutation Runnables are themselves debuggable + if (task instanceof DebuggableTask) + return (DebuggableTask) task; + + if (task instanceof FutureTask) + return ((FutureTask<?>) task).debuggableTask(); + + return null; + } + public void run() { - /** + /* * we maintain two important invariants: * 1) after exiting spinning phase, we ensure at least one more task on _each_ queue will be processed * promptly after we begin, assuming any are outstanding on any pools. this is to permit producers to @@ -101,8 +121,10 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl if (assigned == null) continue; if (SET_THREAD_NAME) - Thread.currentThread().setName(assigned.name + "-" + workerId); + Thread.currentThread().setName(assigned.name + '-' + workerId); + task = assigned.tasks.poll(); + currentTask.lazySet(task); // if we do have tasks assigned, nobody will change our state so we can simply set it to WORKING // (which is also a state that will never be interrupted externally) @@ -128,9 +150,12 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl break; task = assigned.tasks.poll(); + currentTask.lazySet(task); } // return our work permit, and maybe signal shutdown + currentTask.lazySet(null); + if (status != RETURNED_WORK_PERMIT) assigned.returnWorkPermit(); @@ -173,6 +198,11 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl logger.error("Unexpected exception killed worker", t); } } + finally + { + currentTask.lazySet(null); + pool.workerEnded(this); + } } // try to assign this worker the provided work @@ -420,4 +450,22 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl return assigned != null; } } + + @Override + public String toString() + { + return thread.getName(); + } + + @Override + public int hashCode() + { + return workerId.intValue(); + } + + @Override + public boolean equals(Object obj) + { + return obj == this; + } } diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java index f74854f9cb..0631ec61da 100644 --- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java +++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java @@ -17,8 +17,11 @@ */ package org.apache.cassandra.concurrent; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; @@ -26,6 +29,9 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.LockSupport; +import java.util.stream.Collectors; + +import org.apache.cassandra.concurrent.DebuggableTask.RunningDebuggableTask; import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; import static org.apache.cassandra.concurrent.SEPWorker.Work; @@ -77,6 +83,8 @@ public class SharedExecutorPool final ConcurrentSkipListMap<Long, SEPWorker> spinning = new ConcurrentSkipListMap<>(); // the collection of threads that have been asked to stop/deschedule - new workers are scheduled from here last final ConcurrentSkipListMap<Long, SEPWorker> descheduled = new ConcurrentSkipListMap<>(); + // All SEPWorkers that are currently running + private final Set<SEPWorker> allWorkers = Collections.newSetFromMap(new ConcurrentHashMap<>()); volatile boolean shuttingDown = false; @@ -102,7 +110,23 @@ public class SharedExecutorPool return; if (!work.isStop()) - new SEPWorker(threadGroup, workerId.incrementAndGet(), work, this); + { + SEPWorker worker = new SEPWorker(threadGroup, workerId.incrementAndGet(), work, this); + allWorkers.add(worker); + } + } + + void workerEnded(SEPWorker worker) + { + allWorkers.remove(worker); + } + + public List<RunningDebuggableTask> runningTasks() + { + return allWorkers.stream() + .map(worker -> new RunningDebuggableTask(worker.toString(), worker.currentDebuggableTask())) + .filter(RunningDebuggableTask::hasTask) + .collect(Collectors.toList()); } void maybeStartSpinningWorker() diff --git a/src/java/org/apache/cassandra/concurrent/TaskFactory.java b/src/java/org/apache/cassandra/concurrent/TaskFactory.java index 56087d950b..faeabe6c4c 100644 --- a/src/java/org/apache/cassandra/concurrent/TaskFactory.java +++ b/src/java/org/apache/cassandra/concurrent/TaskFactory.java @@ -20,6 +20,7 @@ package org.apache.cassandra.concurrent; import java.util.concurrent.Callable; +import org.apache.cassandra.concurrent.DebuggableTask.RunnableDebuggableTask; import org.apache.cassandra.utils.Shared; import org.apache.cassandra.utils.WithResources; import org.apache.cassandra.utils.concurrent.RunnableFuture; @@ -127,6 +128,9 @@ public interface TaskFactory @Override public Runnable toExecute(Runnable runnable) { + if (runnable instanceof RunnableDebuggableTask) + return ExecutionFailure.suppressingDebuggable(ExecutorLocals.propagate(), (RunnableDebuggableTask) runnable); + // no reason to propagate exception when it is inaccessible to caller return ExecutionFailure.suppressing(ExecutorLocals.propagate(), runnable); } diff --git a/src/java/org/apache/cassandra/db/virtual/QueriesTable.java b/src/java/org/apache/cassandra/db/virtual/QueriesTable.java new file mode 100644 index 0000000000..aeba61c004 --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/QueriesTable.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import org.apache.cassandra.concurrent.DebuggableTask; +import org.apache.cassandra.concurrent.SharedExecutorPool; +import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.schema.TableMetadata; + +import static java.lang.Long.max; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; + +/** + * Virtual table that lists currently running queries on the NTR (coordinator) and Read/Mutation (local) stages + * + * Example: + * <pre> + * cqlsh> SELECT * FROM system_views.queries; + * + * thread_id | queued_micros | running_micros | task + * ------------------------------+---------------+-----------------+-------------------------------------------------------------------------------- + * Native-Transport-Requests-7 | 72923 | 7611 | QUERY select * from system_views.queries; [pageSize = 100] + * MutationStage-2 | 18249 | 2084 | Mutation(keyspace='distributed_test_keyspace', key='000000f8', modifications... + * ReadStage-2 | 72447 | 10121 | SELECT * FROM keyspace.table LIMIT 5000 + * </pre> + */ +final class QueriesTable extends AbstractVirtualTable +{ + private static final String TABLE_NAME = "queries"; + private static final String ID = "thread_id"; + private static final String QUEUED = "queued_micros"; + private static final String RUNNING = "running_micros"; + private static final String DESC = "task"; + + QueriesTable(String keyspace) + { + super(TableMetadata.builder(keyspace, TABLE_NAME) + .comment("Lists currently running queries") + .kind(TableMetadata.Kind.VIRTUAL) + .partitioner(new LocalPartitioner(UTF8Type.instance)) + // The thread name is unique since the id given to each SEPWorker is unique + .addPartitionKeyColumn(ID, UTF8Type.instance) + .addRegularColumn(QUEUED, LongType.instance) + .addRegularColumn(RUNNING, LongType.instance) + .addRegularColumn(DESC, UTF8Type.instance) + .build()); + } + + /** + * Walks the {@link SharedExecutorPool} workers for any {@link DebuggableTask} instances and populates the table. + */ + @Override + public DataSet data() + { + SimpleDataSet result = new SimpleDataSet(metadata()); + + for (DebuggableTask.RunningDebuggableTask task : SharedExecutorPool.SHARED.runningTasks()) + { + if (!task.hasTask()) continue; + + long creationTimeNanos = task.creationTimeNanos(); + long startTimeNanos = task.startTimeNanos(); + long now = approxTime.now(); + + long queuedMicros = NANOSECONDS.toMicros(max((startTimeNanos > 0 ? startTimeNanos : now) - creationTimeNanos, 0)); + long runningMicros = startTimeNanos > 0 ? NANOSECONDS.toMicros(now - startTimeNanos) : 0; + + result.row(task.threadId()) + .column(QUEUED, queuedMicros) + .column(RUNNING, runningMicros) + .column(DESC, task.description()); + } + + return result; + } +} diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java index f13e61ce73..59a0aba809 100644 --- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java @@ -47,6 +47,7 @@ public final class SystemViewsKeyspace extends VirtualKeyspace .add(new BatchMetricsTable(VIRTUAL_VIEWS)) .add(new StreamingVirtualTable(VIRTUAL_VIEWS)) .add(new GossipInfoTable(VIRTUAL_VIEWS)) + .add(new QueriesTable(VIRTUAL_VIEWS)) .addAll(LocalRepairTables.getAll(VIRTUAL_VIEWS)) .build()); } diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 2a0ccab793..b96120cbb2 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -44,19 +44,20 @@ import com.google.common.cache.CacheLoader; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Uninterruptibles; -import org.apache.cassandra.config.Config; -import org.apache.cassandra.service.paxos.*; +import org.apache.cassandra.service.paxos.Ballot; +import org.apache.cassandra.service.paxos.Commit; +import org.apache.cassandra.service.paxos.ContentionStrategy; import org.apache.cassandra.service.paxos.Paxos; -import org.apache.cassandra.utils.TimeUUID; -import org.apache.cassandra.utils.concurrent.CountDownLatch; - +import org.apache.cassandra.service.paxos.PaxosState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.batchlog.Batch; import org.apache.cassandra.batchlog.BatchlogManager; +import org.apache.cassandra.concurrent.DebuggableTask.RunnableDebuggableTask; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ConsistencyLevel; @@ -139,13 +140,17 @@ import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.MonotonicClock; import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.utils.concurrent.CountDownLatch; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; -import static com.google.common.collect.Iterables.concat; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import static com.google.common.collect.Iterables.concat; +import static org.apache.commons.lang3.StringUtils.join; + import static org.apache.cassandra.db.ConsistencyLevel.SERIAL; -import static org.apache.cassandra.net.Message.out; import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casReadMetrics; import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casWriteMetrics; import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetrics; @@ -153,8 +158,15 @@ import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetri import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.viewWriteMetrics; import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetrics; import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetricsForLevel; +import static org.apache.cassandra.net.Message.out; import static org.apache.cassandra.net.NoPayload.noPayload; -import static org.apache.cassandra.net.Verb.*; +import static org.apache.cassandra.net.Verb.BATCH_STORE_REQ; +import static org.apache.cassandra.net.Verb.MUTATION_REQ; +import static org.apache.cassandra.net.Verb.PAXOS_COMMIT_REQ; +import static org.apache.cassandra.net.Verb.PAXOS_PREPARE_REQ; +import static org.apache.cassandra.net.Verb.PAXOS_PROPOSE_REQ; +import static org.apache.cassandra.net.Verb.SCHEMA_VERSION_REQ; +import static org.apache.cassandra.net.Verb.TRUNCATE_REQ; import static org.apache.cassandra.service.BatchlogResponseHandler.BatchlogCleanup; import static org.apache.cassandra.service.paxos.Ballot.Flag.GLOBAL; import static org.apache.cassandra.service.paxos.Ballot.Flag.LOCAL; @@ -165,7 +177,6 @@ import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; import static org.apache.cassandra.utils.Clock.Global.nanoTime; import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID; import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch; -import static org.apache.commons.lang3.StringUtils.join; public class StorageProxy implements StorageProxyMBean { @@ -828,6 +839,12 @@ public class StorageProxy implements StorageProxyMBean } } + @Override + public String description() + { + return "Paxos " + message.payload.toString(); + } + @Override protected Verb verb() { @@ -1263,7 +1280,7 @@ public class StorageProxy implements StorageProxyMBean logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, replica, batch.size()); if (replica.isSelf()) - performLocally(Stage.MUTATION, replica, () -> BatchlogManager.store(batch), handler); + performLocally(Stage.MUTATION, replica, () -> BatchlogManager.store(batch), handler, "Batchlog store"); else MessagingService.instance().sendWithCallback(message, replica.endpoint(), handler); } @@ -1279,7 +1296,7 @@ public class StorageProxy implements StorageProxyMBean logger.trace("Sending batchlog remove request {} to {}", uuid, target); if (target.isSelf()) - performLocally(Stage.MUTATION, target, () -> BatchlogManager.remove(uuid)); + performLocally(Stage.MUTATION, target, () -> BatchlogManager.remove(uuid), "Batchlog remove"); else MessagingService.instance().send(message, target.endpoint()); } @@ -1523,7 +1540,7 @@ public class StorageProxy implements StorageProxyMBean if (insertLocal) { Preconditions.checkNotNull(localReplica); - performLocally(stage, localReplica, mutation::apply, responseHandler); + performLocally(stage, localReplica, mutation::apply, responseHandler, mutation); } if (localDc != null) @@ -1590,7 +1607,7 @@ public class StorageProxy implements StorageProxyMBean logger.trace("Sending message to {}@{}", message.id(), target); } - private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable) + private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable, String description) { stage.maybeExecuteImmediately(new LocalMutationRunnable(localReplica) { @@ -1606,6 +1623,12 @@ public class StorageProxy implements StorageProxyMBean } } + @Override + public String description() + { + return description; + } + @Override protected Verb verb() { @@ -1614,7 +1637,7 @@ public class StorageProxy implements StorageProxyMBean }); } - private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable, final RequestCallback<?> handler) + private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable, final RequestCallback<?> handler, Object description) { stage.maybeExecuteImmediately(new LocalMutationRunnable(localReplica) { @@ -1633,6 +1656,14 @@ public class StorageProxy implements StorageProxyMBean } } + @Override + public String description() + { + // description is an Object and toString() called so we do not have to evaluate the Mutation.toString() + // unless expliclitly checked + return description.toString(); + } + @Override protected Verb verb() { @@ -2087,7 +2118,7 @@ public class StorageProxy implements StorageProxyMBean return concatAndBlockOnRepair(results, repairs); } - public static class LocalReadRunnable extends DroppableRunnable + public static class LocalReadRunnable extends DroppableRunnable implements RunnableDebuggableTask { private final ReadCommand command; private final ReadCallback handler; @@ -2157,6 +2188,24 @@ public class StorageProxy implements StorageProxyMBean } } } + + @Override + public long creationTimeNanos() + { + return approxCreationTimeNanos; + } + + @Override + public long startTimeNanos() + { + return approxStartTimeNanos; + } + + @Override + public String description() + { + return command.toCQLString(); + } } public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, @@ -2467,7 +2516,9 @@ public class StorageProxy implements StorageProxyMBean */ private static abstract class DroppableRunnable implements Runnable { - final long approxCreationTimeNanos; + protected final long approxCreationTimeNanos; + protected volatile long approxStartTimeNanos; + final Verb verb; public DroppableRunnable(Verb verb) @@ -2478,11 +2529,11 @@ public class StorageProxy implements StorageProxyMBean public final void run() { - long approxCurrentTimeNanos = MonotonicClock.Global.approxTime.now(); + approxStartTimeNanos = MonotonicClock.Global.approxTime.now(); long expirationTimeNanos = verb.expiresAtNanos(approxCreationTimeNanos); - if (approxCurrentTimeNanos > expirationTimeNanos) + if (approxStartTimeNanos > expirationTimeNanos) { - long timeTakenNanos = approxCurrentTimeNanos - approxCreationTimeNanos; + long timeTakenNanos = approxStartTimeNanos - approxCreationTimeNanos; MessagingService.instance().metrics.recordSelfDroppedMessage(verb, timeTakenNanos, NANOSECONDS); return; } @@ -2503,9 +2554,10 @@ public class StorageProxy implements StorageProxyMBean * Like DroppableRunnable, but if it aborts, it will rerun (on the mutation stage) after * marking itself as a hint in progress so that the hint backpressure mechanism can function. */ - private static abstract class LocalMutationRunnable implements Runnable + private static abstract class LocalMutationRunnable implements RunnableDebuggableTask { private final long approxCreationTimeNanos = MonotonicClock.Global.approxTime.now(); + private volatile long approxStartTimeNanos; private final Replica localReplica; @@ -2517,11 +2569,12 @@ public class StorageProxy implements StorageProxyMBean public final void run() { final Verb verb = verb(); - long nowNanos = MonotonicClock.Global.approxTime.now(); + approxStartTimeNanos = MonotonicClock.Global.approxTime.now(); long expirationTimeNanos = verb.expiresAtNanos(approxCreationTimeNanos); - if (nowNanos > expirationTimeNanos) + + if (approxStartTimeNanos > expirationTimeNanos) { - long timeTakenNanos = nowNanos - approxCreationTimeNanos; + long timeTakenNanos = approxStartTimeNanos - approxCreationTimeNanos; MessagingService.instance().metrics.recordSelfDroppedMessage(Verb.MUTATION_REQ, timeTakenNanos, NANOSECONDS); HintRunnable runnable = new HintRunnable(EndpointsForToken.of(localReplica.range().right, localReplica)) @@ -2545,6 +2598,21 @@ public class StorageProxy implements StorageProxyMBean } } + @Override + public long creationTimeNanos() + { + return approxCreationTimeNanos; + } + + @Override + public long startTimeNanos() + { + return approxStartTimeNanos; + } + + @Override + abstract public String description(); + abstract protected Verb verb(); abstract protected void runMayThrow() throws Exception; } diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java b/src/java/org/apache/cassandra/transport/Dispatcher.java index da79c3d2c4..8f8a607c77 100644 --- a/src/java/org/apache/cassandra/transport/Dispatcher.java +++ b/src/java/org/apache/cassandra/transport/Dispatcher.java @@ -24,15 +24,16 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import com.google.common.base.Predicate; -import org.apache.cassandra.metrics.ClientMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.channel.Channel; import io.netty.channel.EventLoop; import io.netty.util.AttributeKey; +import org.apache.cassandra.concurrent.DebuggableTask.RunnableDebuggableTask; import org.apache.cassandra.concurrent.LocalAwareExecutorPlus; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.metrics.ClientMetrics; import org.apache.cassandra.net.FrameEncoder; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.QueryState; @@ -42,10 +43,10 @@ import org.apache.cassandra.transport.Flusher.FlushItem; import org.apache.cassandra.transport.messages.ErrorMessage; import org.apache.cassandra.transport.messages.EventMessage; import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.MonotonicClock; import org.apache.cassandra.utils.NoSpamLogger; import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED; -import static org.apache.cassandra.utils.Clock.Global.nanoTime; public class Dispatcher { @@ -79,17 +80,60 @@ public class Dispatcher public void dispatch(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure) { - requestExecutor.submit(() -> processRequest(channel, request, forFlusher, backpressure)); + requestExecutor.submit(new RequestProcessor(channel, request, forFlusher, backpressure)); ClientMetrics.instance.markRequestDispatched(); } + public class RequestProcessor implements RunnableDebuggableTask + { + private final Channel channel; + private final Message.Request request; + private final FlushItemConverter forFlusher; + private final Overload backpressure; + + private final long approxCreationTimeNanos = MonotonicClock.Global.approxTime.now(); + private volatile long approxStartTimeNanos; + + public RequestProcessor(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure) + { + this.channel = channel; + this.request = request; + this.forFlusher = forFlusher; + this.backpressure = backpressure; + } + + @Override + public void run() + { + approxStartTimeNanos = MonotonicClock.Global.approxTime.now(); + processRequest(channel, request, forFlusher, backpressure, approxStartTimeNanos); + } + + @Override + public long creationTimeNanos() + { + return approxCreationTimeNanos; + } + + @Override + public long startTimeNanos() + { + return approxStartTimeNanos; + } + + @Override + public String description() + { + return request.toString(); + } + } + /** * Note: this method may be executed on the netty event loop, during initial protocol negotiation; the caller is * responsible for cleaning up any global or thread-local state. (ex. tracing, client warnings, etc.). */ - private static Message.Response processRequest(ServerConnection connection, Message.Request request, Overload backpressure) + private static Message.Response processRequest(ServerConnection connection, Message.Request request, Overload backpressure, long startTimeNanos) { - long queryStartNanoTime = nanoTime(); if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4)) ClientWarn.instance.captureWarnings(); @@ -119,7 +163,7 @@ public class Dispatcher Message.logger.trace("Received: {}, v={}", request, connection.getVersion()); connection.requests.inc(); - Message.Response response = request.execute(qstate, queryStartNanoTime); + Message.Response response = request.execute(qstate, startTimeNanos); if (request.isTrackable()) CoordinatorWarnings.done(); @@ -130,15 +174,15 @@ public class Dispatcher connection.applyStateTransition(request.type, response.type); return response; } - + /** * Note: this method may be executed on the netty event loop. */ - static Message.Response processRequest(Channel channel, Message.Request request, Overload backpressure) + static Message.Response processRequest(Channel channel, Message.Request request, Overload backpressure, long approxStartTimeNanos) { try { - return processRequest((ServerConnection) request.connection(), request, backpressure); + return processRequest((ServerConnection) request.connection(), request, backpressure, approxStartTimeNanos); } catch (Throwable t) { @@ -163,9 +207,9 @@ public class Dispatcher /** * Note: this method is not expected to execute on the netty event loop. */ - void processRequest(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure) + void processRequest(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure, long approxStartTimeNanos) { - Message.Response response = processRequest(channel, request, backpressure); + Message.Response response = processRequest(channel, request, backpressure, approxStartTimeNanos); FlushItem<?> toFlush = forFlusher.toFlushItem(channel, request, response); Message.logger.trace("Responding: {}, v={}", response, request.connection().getVersion()); flush(toFlush); @@ -201,7 +245,7 @@ public class Dispatcher * for delivering events to registered clients is dependent on protocol version and the configuration * of the pipeline. For v5 and newer connections, the event message is encoded into an Envelope, * wrapped in a FlushItem and then delivered via the pipeline's flusher, in a similar way to - * a Response returned from {@link #processRequest(Channel, Message.Request, FlushItemConverter, Overload)}. + * a Response returned from {@link #processRequest(Channel, Message.Request, FlushItemConverter, Overload, long)}. * It's worth noting that events are not generally fired as a direct response to a client request, * so this flush item has a null request attribute. The dispatcher itself is created when the * pipeline is first configured during protocol negotiation and is attached to the channel for diff --git a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java index 75cb72e8b5..e4cff99acb 100644 --- a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java +++ b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import org.apache.cassandra.transport.ClientResourceLimits.Overload; +import org.apache.cassandra.utils.MonotonicClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,7 +149,9 @@ public class InitialConnectionHandler extends ByteToMessageDecoder promise = new VoidChannelPromise(ctx.channel(), false); } - final Message.Response response = Dispatcher.processRequest(ctx.channel(), startup, Overload.NONE); + long approxStartTimeNanos = MonotonicClock.Global.approxTime.now(); + final Message.Response response = Dispatcher.processRequest(ctx.channel(), startup, Overload.NONE, approxStartTimeNanos); + outbound = response.encode(inbound.header.version); ctx.writeAndFlush(outbound, promise); logger.trace("Configured pipeline: {}", ctx.pipeline()); diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index 75c997e38c..2c91a76c3b 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -193,7 +193,8 @@ public abstract class Message this.customPayload = customPayload; } - public String debugString() + @Override + public String toString() { return String.format("(%s:%s:%s)", type, streamId, connection == null ? "null" : connection.getVersion().asInt()); } diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java index 9a296e442b..c295216d2c 100644 --- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java @@ -148,6 +148,7 @@ public class QueryMessage extends Message.Request @Override public String toString() { - return String.format("QUERY %s [pageSize = %d]", query, options.getPageSize()); + return String.format("QUERY %s [pageSize = %d] at consistency %s", + query, options.getPageSize(), options.getConsistency()); } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java new file mode 100644 index 0000000000..09e56e0b61 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Row; +import org.apache.cassandra.distributed.api.SimpleQueryResult; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; + +public class QueriesTableTest extends TestBaseImpl +{ + public static final int ITERATIONS = 256; + + @Test + public void shouldExposeReadsAndWrites() throws Throwable + { + try (Cluster cluster = init(Cluster.build(1).start())) + { + ExecutorService executor = Executors.newFixedThreadPool(16); + + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (k int primary key, v int)"); + + AtomicInteger reads = new AtomicInteger(0); + AtomicInteger writes = new AtomicInteger(0); + AtomicInteger paxos = new AtomicInteger(0); + + for (int i = 0; i < ITERATIONS; i++) + { + int k = i; + executor.execute(() -> cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (k, v) VALUES (" + k + ", 0)", ConsistencyLevel.ALL)); + executor.execute(() -> cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 10 WHERE k = " + (k - 1) + " IF v = 0", ConsistencyLevel.ALL)); + executor.execute(() -> cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE k = " + (k - 1), ConsistencyLevel.ALL)); + + executor.execute(() -> + { + SimpleQueryResult result = cluster.get(1).executeInternalWithResult("SELECT * FROM system_views.queries"); + + while (result.hasNext()) + { + Row row = result.next(); + String threadId = row.get("thread_id").toString(); + String task = row.get("task").toString(); + + if (threadId.contains("Read") && task.contains("SELECT")) + reads.incrementAndGet(); + else if (threadId.contains("Mutation") && task.contains("Mutation")) + writes.incrementAndGet(); + else if (threadId.contains("Mutation") && task.contains("Paxos")) + paxos.incrementAndGet(); + } + }); + } + + executor.shutdown(); + assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); + + // We should see at least one read, write, and conditional update in the "queries" table. + assertThat(reads.get()).isGreaterThan(0).isLessThanOrEqualTo(ITERATIONS); + assertThat(writes.get()).isGreaterThan(0).isLessThanOrEqualTo(ITERATIONS); + assertThat(paxos.get()).isGreaterThan(0).isLessThanOrEqualTo(ITERATIONS); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org