Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/94e7ef17 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/94e7ef17 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/94e7ef17 Branch: refs/heads/trunk Commit: 94e7ef17757235cb35df70ff9a2a63e1d29d6c41 Parents: 0f995a2 dbf6e62 Author: Carl Yeksigian <c...@apache.org> Authored: Wed Jan 13 14:45:13 2016 -0500 Committer: Carl Yeksigian <c...@apache.org> Committed: Wed Jan 13 14:45:13 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../AbstractLocalAwareExecutorService.java | 230 +++++++++++++++++++ .../AbstractTracingAwareExecutorService.java | 230 ------------------- .../DebuggableThreadPoolExecutor.java | 48 ++-- .../cassandra/concurrent/ExecutorLocal.java | 44 ++++ .../cassandra/concurrent/ExecutorLocals.java | 84 +++++++ .../concurrent/LocalAwareExecutorService.java | 34 +++ .../cassandra/concurrent/SEPExecutor.java | 3 +- .../concurrent/SharedExecutorPool.java | 2 +- .../cassandra/concurrent/StageManager.java | 12 +- .../concurrent/TracingAwareExecutorService.java | 36 --- .../cassandra/cql3/functions/UDFunction.java | 2 +- .../cql3/statements/BatchStatement.java | 9 +- .../cql3/statements/CreateViewStatement.java | 2 +- .../cql3/statements/SelectStatement.java | 4 +- .../org/apache/cassandra/db/ReadCommand.java | 2 +- .../apache/cassandra/net/MessagingService.java | 7 +- .../apache/cassandra/service/ClientWarn.java | 62 +++-- .../apache/cassandra/service/StorageProxy.java | 2 +- .../org/apache/cassandra/tracing/Tracing.java | 3 +- .../org/apache/cassandra/transport/Message.java | 6 +- .../transport/RequestThreadPoolExecutor.java | 4 +- .../cql3/validation/entities/UFTest.java | 6 +- .../cassandra/service/ClientWarningsTest.java | 58 +++++ 24 files changed, 545 insertions(+), 346 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 614d5b4,6530956..a37ec99 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,20 -1,5 +1,21 @@@ -2.2.5 +3.0.3 + * Fix AssertionError when removing from list using UPDATE (CASSANDRA-10954) + * Fix UnsupportedOperationException when reading old sstable with range + tombstone (CASSANDRA-10743) + * MV should use the maximum timestamp of the primary key (CASSANDRA-10910) + * Fix potential assertion error during compaction (CASSANDRA-10944) + * Fix counting of received sstables in streaming (CASSANDRA-10949) + * Implement hints compression (CASSANDRA-9428) + * Fix potential assertion error when reading static columns (CASSANDRA-10903) + * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711) + * Avoid building PartitionUpdate in toString (CASSANDRA-10897) + * Reduce heap spent when receiving many SSTables (CASSANDRA-10797) + * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873) + * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653) + * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-10837) + * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806) +Merged from 2.2: + * Make sure client gets tombstone overwhelmed warning (CASSANDRA-9465) * Fix error streaming section more than 2GB (CASSANDRA-10961) * (cqlsh) Also apply --connect-timeout to control connection timeout (CASSANDRA-10959) http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java index 0000000,088b43e..f47d8ac mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java +++ b/src/java/org/apache/cassandra/concurrent/AbstractLocalAwareExecutorService.java @@@ -1,0 -1,229 +1,230 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.concurrent; + + import java.util.Collection; + import java.util.List; + import java.util.concurrent.Callable; + import java.util.concurrent.ExecutionException; + import java.util.concurrent.Executors; + import java.util.concurrent.Future; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.TimeoutException; + + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import org.apache.cassandra.tracing.TraceState; + import org.apache.cassandra.tracing.Tracing; + import org.apache.cassandra.utils.concurrent.SimpleCondition; + import org.apache.cassandra.utils.JVMStabilityInspector; + + import static org.apache.cassandra.tracing.Tracing.isTracing; + + public abstract class AbstractLocalAwareExecutorService implements LocalAwareExecutorService + { + private static final Logger logger = LoggerFactory.getLogger(AbstractLocalAwareExecutorService.class); + + protected abstract void addTask(FutureTask<?> futureTask); + protected abstract void onCompletion(); + + /** Task Submission / Creation / Objects **/ + + public <T> FutureTask<T> submit(Callable<T> task) + { + return submit(newTaskFor(task)); + } + + public FutureTask<?> submit(Runnable task) + { + return submit(newTaskFor(task, null)); + } + + public <T> FutureTask<T> submit(Runnable task, T result) + { + return submit(newTaskFor(task, result)); + } + + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) + { + throw new UnsupportedOperationException(); + } + + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException + { + throw new UnsupportedOperationException(); + } + + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException + { + throw new UnsupportedOperationException(); + } + + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException + { + throw new UnsupportedOperationException(); + } + + protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result) + { + return newTaskFor(runnable, result, ExecutorLocals.create()); + } + + protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result, ExecutorLocals locals) + { + if (locals != null) + { + if (runnable instanceof LocalSessionFutureTask) + return (LocalSessionFutureTask<T>) runnable; + return new LocalSessionFutureTask<T>(runnable, result, locals); + } + if (runnable instanceof FutureTask) + return (FutureTask<T>) runnable; + return new FutureTask<>(runnable, result); + } + + protected <T> FutureTask<T> newTaskFor(Callable<T> callable) + { + if (isTracing()) + { + if (callable instanceof LocalSessionFutureTask) + return (LocalSessionFutureTask<T>) callable; + return new LocalSessionFutureTask<T>(callable, ExecutorLocals.create()); + } + if (callable instanceof FutureTask) + return (FutureTask<T>) callable; + return new FutureTask<>(callable); + } + + private class LocalSessionFutureTask<T> extends FutureTask<T> + { + private final ExecutorLocals locals; + + public LocalSessionFutureTask(Callable<T> callable, ExecutorLocals locals) + { + super(callable); + this.locals = locals; + } + + public LocalSessionFutureTask(Runnable runnable, T result, ExecutorLocals locals) + { + super(runnable, result); + this.locals = locals; + } + + public void run() + { + ExecutorLocals old = ExecutorLocals.create(); + ExecutorLocals.set(locals); + try + { + super.run(); + } + finally + { + ExecutorLocals.set(old); + } + } + } + + class FutureTask<T> extends SimpleCondition implements Future<T>, Runnable + { + private boolean failure; + private Object result = this; + private final Callable<T> callable; + + public FutureTask(Callable<T> callable) + { + this.callable = callable; + } + public FutureTask(Runnable runnable, T result) + { + this(Executors.callable(runnable, result)); + } + + public void run() + { + try + { + result = callable.call(); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + logger.warn("Uncaught exception on thread {}: {}", Thread.currentThread(), t); + result = t; + failure = true; + } + finally + { + signalAll(); + onCompletion(); + } + } + + public boolean cancel(boolean mayInterruptIfRunning) + { + return false; + } + + public boolean isCancelled() + { + return false; + } + + public boolean isDone() + { + return isSignaled(); + } + + public T get() throws InterruptedException, ExecutionException + { + await(); + Object result = this.result; + if (failure) + throw new ExecutionException((Throwable) result); + return (T) result; + } + + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException + { - await(timeout, unit); ++ if (!await(timeout, unit)) ++ throw new TimeoutException(); + Object result = this.result; + if (failure) + throw new ExecutionException((Throwable) result); + return (T) result; + } + } + + private <T> FutureTask<T> submit(FutureTask<T> task) + { + addTask(task); + return task; + } + + public void execute(Runnable command) + { + addTask(newTaskFor(command, ExecutorLocals.create())); + } + + public void execute(Runnable command, ExecutorLocals locals) + { + addTask(newTaskFor(command, null, locals)); + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/concurrent/StageManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/cql3/functions/UDFunction.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/functions/UDFunction.java index 04a4c3d,1e5cea6..fa0d306 --- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java +++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java @@@ -269,164 -143,11 +269,164 @@@ public abstract class UDFunction extend return null; long tStart = System.nanoTime(); - ByteBuffer result = executeUserDefined(protocolVersion, parameters); - Tracing.trace("Executed UDF {} in {}\u03bcs", name(), (System.nanoTime() - tStart) / 1000); - return result; + parameters = makeEmptyParametersNull(parameters); + + try + { + // Using async UDF execution is expensive (adds about 100us overhead per invocation on a Core-i7 MBPr). + ByteBuffer result = DatabaseDescriptor.enableUserDefinedFunctionsThreads() + ? executeAsync(protocolVersion, parameters) + : executeUserDefined(protocolVersion, parameters); + + Tracing.trace("Executed UDF {} in {}\u03bcs", name(), (System.nanoTime() - tStart) / 1000); + return result; + } + catch (InvalidRequestException e) + { + throw e; + } + catch (Throwable t) + { + logger.trace("Invocation of user-defined function '{}' failed", this, t); + if (t instanceof VirtualMachineError) + throw (VirtualMachineError) t; + throw FunctionExecutionException.create(this, t); + } } + public static void assertUdfsEnabled(String language) + { + if (!DatabaseDescriptor.enableUserDefinedFunctions()) + throw new InvalidRequestException("User-defined functions are disabled in cassandra.yaml - set enable_user_defined_functions=true to enable"); + if (!"java".equalsIgnoreCase(language) && !DatabaseDescriptor.enableScriptedUserDefinedFunctions()) + throw new InvalidRequestException("Scripted user-defined functions are disabled in cassandra.yaml - set enable_scripted_user_defined_functions=true to enable if you are aware of the security risks"); + } + + static void initializeThread() + { + // Get the TypeCodec stuff in Java Driver initialized. + // This is to get the classes loaded outside of the restricted sandbox's security context of a UDF. + UDHelper.codecFor(DataType.inet()).format(InetAddress.getLoopbackAddress()); + UDHelper.codecFor(DataType.ascii()).format(""); + } + + private static final class ThreadIdAndCpuTime extends CompletableFuture<Object> + { + long threadId; + long cpuTime; + + ThreadIdAndCpuTime() + { + // Looks weird? + // This call "just" links this class to java.lang.management - otherwise UDFs (script UDFs) might fail due to + // java.security.AccessControlException: access denied: ("java.lang.RuntimePermission" "accessClassInPackage.java.lang.management") + // because class loading would be deferred until setup() is executed - but setup() is called with + // limited privileges. + threadMXBean.getCurrentThreadCpuTime(); + } + + void setup() + { + this.threadId = Thread.currentThread().getId(); + this.cpuTime = threadMXBean.getCurrentThreadCpuTime(); + complete(null); + } + } + + private ByteBuffer executeAsync(int protocolVersion, List<ByteBuffer> parameters) + { + ThreadIdAndCpuTime threadIdAndCpuTime = new ThreadIdAndCpuTime(); + + Future<ByteBuffer> future = executor().submit(() -> { + threadIdAndCpuTime.setup(); + return executeUserDefined(protocolVersion, parameters); + }); + + try + { + if (DatabaseDescriptor.getUserDefinedFunctionWarnTimeout() > 0) + try + { + return future.get(DatabaseDescriptor.getUserDefinedFunctionWarnTimeout(), TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) + { + + // log and emit a warning that UDF execution took long + String warn = String.format("User defined function %s ran longer than %dms", this, DatabaseDescriptor.getUserDefinedFunctionWarnTimeout()); + logger.warn(warn); - ClientWarn.warn(warn); ++ ClientWarn.instance.warn(warn); + } + + // retry with difference of warn-timeout to fail-timeout + return future.get(DatabaseDescriptor.getUserDefinedFunctionFailTimeout() - DatabaseDescriptor.getUserDefinedFunctionWarnTimeout(), TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + catch (ExecutionException e) + { + Throwable c = e.getCause(); + if (c instanceof RuntimeException) + throw (RuntimeException) c; + throw new RuntimeException(c); + } + catch (TimeoutException e) + { + // retry a last time with the difference of UDF-fail-timeout to consumed CPU time (just in case execution hit a badly timed GC) + try + { + //The threadIdAndCpuTime shouldn't take a long time to be set so this should return immediately + threadIdAndCpuTime.get(1, TimeUnit.SECONDS); + + long cpuTimeMillis = threadMXBean.getThreadCpuTime(threadIdAndCpuTime.threadId) - threadIdAndCpuTime.cpuTime; + cpuTimeMillis /= 1000000L; + + return future.get(Math.max(DatabaseDescriptor.getUserDefinedFunctionFailTimeout() - cpuTimeMillis, 0L), + TimeUnit.MILLISECONDS); + } + catch (InterruptedException e1) + { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + catch (ExecutionException e1) + { + Throwable c = e.getCause(); + if (c instanceof RuntimeException) + throw (RuntimeException) c; + throw new RuntimeException(c); + } + catch (TimeoutException e1) + { + TimeoutException cause = new TimeoutException(String.format("User defined function %s ran longer than %dms%s", + this, + DatabaseDescriptor.getUserDefinedFunctionFailTimeout(), + DatabaseDescriptor.getUserFunctionTimeoutPolicy() == Config.UserFunctionTimeoutPolicy.ignore + ? "" : " - will stop Cassandra VM")); + FunctionExecutionException fe = FunctionExecutionException.create(this, cause); + JVMStabilityInspector.userFunctionTimeout(cause); + throw fe; + } + } + } + + private List<ByteBuffer> makeEmptyParametersNull(List<ByteBuffer> parameters) + { + List<ByteBuffer> r = new ArrayList<>(parameters.size()); + for (int i = 0; i < parameters.size(); i++) + { + ByteBuffer param = parameters.get(i); + r.add(UDHelper.isNullOrEmpty(argTypes.get(i), param) + ? null : param); + } + return r; + } + + protected abstract ExecutorService executor(); + public boolean isCallableWrtNullable(List<ByteBuffer> parameters) { if (!calledOnNullInput) http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 3979597,a289ad1..47396fb --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@@ -226,35 -179,67 +226,35 @@@ public class BatchStatement implements for (int i = 0; i < statements.size(); i++) { ModificationStatement statement = statements.get(i); + if (isLogged() && statement.cfm.params.gcGraceSeconds == 0) + { + if (tablesWithZeroGcGs == null) + tablesWithZeroGcGs = new HashSet<>(); + tablesWithZeroGcGs.add(String.format("%s.%s", statement.cfm.ksName, statement.cfm.cfName)); + } QueryOptions statementOptions = options.forStatement(i); long timestamp = attrs.getTimestamp(now, statementOptions); - addStatementMutations(statement, statementOptions, local, timestamp, mutations); + statement.addUpdates(collector, statementOptions, local, timestamp); } - return unzipMutations(mutations); - } - - private Collection<? extends IMutation> unzipMutations(Map<String, Map<ByteBuffer, IMutation>> mutations) - { - - // The case where all statement where on the same keyspace is pretty common - if (mutations.size() == 1) - return mutations.values().iterator().next().values(); - - List<IMutation> ms = new ArrayList<>(); - for (Map<ByteBuffer, IMutation> ksMap : mutations.values()) - ms.addAll(ksMap.values()); - - return ms; - } - - private void addStatementMutations(ModificationStatement statement, - QueryOptions options, - boolean local, - long now, - Map<String, Map<ByteBuffer, IMutation>> mutations) - throws RequestExecutionException, RequestValidationException - { - String ksName = statement.keyspace(); - Map<ByteBuffer, IMutation> ksMap = mutations.get(ksName); - if (ksMap == null) + if (tablesWithZeroGcGs != null) { - ksMap = new HashMap<>(); - mutations.put(ksName, ksMap); + String suffix = tablesWithZeroGcGs.size() == 1 ? "" : "s"; + NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, LOGGED_BATCH_LOW_GCGS_WARNING, + suffix, tablesWithZeroGcGs); - ClientWarn.warn(MessageFormatter.arrayFormat(LOGGED_BATCH_LOW_GCGS_WARNING, new Object[] { suffix, tablesWithZeroGcGs }) - .getMessage()); ++ ClientWarn.instance.warn(MessageFormatter.arrayFormat(LOGGED_BATCH_LOW_GCGS_WARNING, new Object[] { suffix, tablesWithZeroGcGs }) ++ .getMessage()); } - // The following does the same than statement.getMutations(), but we inline it here because - // we don't want to recreate mutations every time as this is particularly inefficient when applying - // multiple batch to the same partition (see #6737). - List<ByteBuffer> keys = statement.buildPartitionKeyNames(options); - Composite clusteringPrefix = statement.createClusteringPrefix(options); - UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, options, local, now); - - for (ByteBuffer key : keys) - { - IMutation mutation = ksMap.get(key); - Mutation mut; - if (mutation == null) - { - mut = new Mutation(ksName, key); - mutation = statement.cfm.isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut; - ksMap.put(key, mutation); - } - else - { - mut = statement.cfm.isCounter() ? ((CounterMutation) mutation).getMutation() : (Mutation) mutation; - } + collector.validateIndexedColumns(); + return collector.toMutations(); + } - statement.addUpdateForKey(mut.addOrGet(statement.cfm), key, clusteringPrefix, params); - } + private int updatedRows() + { + // Note: it's possible for 2 statements to actually apply to the same row, but that's just an estimation + // for sizing our PartitionUpdate backing array, so it's good enough. + return statements.size(); } /** @@@ -286,9 -271,9 +286,9 @@@ } else if (logger.isWarnEnabled()) { - logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold, ""); + logger.warn(format, tableNames, size, warnThreshold, size - warnThreshold, ""); } - ClientWarn.warn(MessageFormatter.arrayFormat(format, new Object[] {tableNames, size, warnThreshold, size - warnThreshold, ""}).getMessage()); - ClientWarn.instance.warn(MessageFormatter.arrayFormat(format, new Object[]{ ksCfPairs, size, warnThreshold, size - warnThreshold, "" }).getMessage()); ++ ClientWarn.instance.warn(MessageFormatter.arrayFormat(format, new Object[] {tableNames, size, warnThreshold, size - warnThreshold, ""}).getMessage()); } } @@@ -311,17 -298,21 +311,16 @@@ } // CASSANDRA-9303: If we only have local mutations we do not warn - if (localMutationsOnly) + if (localPartitionsOnly) return; - NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, unloggedBatchWarning, - keySet.size(), keySet.size() == 1 ? "" : "s", - ksCfPairs.size() == 1 ? "" : "s", ksCfPairs); - ClientWarn.instance.warn(MessageFormatter.arrayFormat(unloggedBatchWarning, - new Object[]{ - keySet.size(), - keySet.size() == 1 ? "" : "s", - ksCfPairs.size() == 1 ? "" : "s", - ksCfPairs - }).getMessage()); + NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, UNLOGGED_BATCH_WARNING, + keySet.size(), keySet.size() == 1 ? "" : "s", + tableNames.size() == 1 ? "" : "s", tableNames); - ClientWarn.warn(MessageFormatter.arrayFormat(UNLOGGED_BATCH_WARNING, new Object[]{keySet.size(), keySet.size() == 1 ? "" : "s", ++ ClientWarn.instance.warn(MessageFormatter.arrayFormat(UNLOGGED_BATCH_WARNING, new Object[]{keySet.size(), keySet.size() == 1 ? "" : "s", + tableNames.size() == 1 ? "" : "s", tableNames}).getMessage()); - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java index 4017ce6,0000000..5af4887 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java @@@ -1,330 -1,0 +1,330 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3.statements; + +import java.util.*; +import java.util.stream.Collectors; + +import com.google.common.collect.Iterables; + +import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.config.ViewDefinition; +import org.apache.cassandra.cql3.*; +import org.apache.cassandra.cql3.restrictions.StatementRestrictions; +import org.apache.cassandra.cql3.selection.RawSelector; +import org.apache.cassandra.cql3.selection.Selectable; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.ReversedType; +import org.apache.cassandra.db.view.View; +import org.apache.cassandra.exceptions.AlreadyExistsException; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.exceptions.UnauthorizedException; +import org.apache.cassandra.schema.TableParams; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.ClientWarn; +import org.apache.cassandra.service.MigrationManager; +import org.apache.cassandra.thrift.ThriftValidation; +import org.apache.cassandra.transport.Event; + +public class CreateViewStatement extends SchemaAlteringStatement +{ + private final CFName baseName; + private final List<RawSelector> selectClause; + private final WhereClause whereClause; + private final List<ColumnIdentifier.Raw> partitionKeys; + private final List<ColumnIdentifier.Raw> clusteringKeys; + public final CFProperties properties = new CFProperties(); + private final boolean ifNotExists; + + public CreateViewStatement(CFName viewName, + CFName baseName, + List<RawSelector> selectClause, + WhereClause whereClause, + List<ColumnIdentifier.Raw> partitionKeys, + List<ColumnIdentifier.Raw> clusteringKeys, + boolean ifNotExists) + { + super(viewName); + this.baseName = baseName; + this.selectClause = selectClause; + this.whereClause = whereClause; + this.partitionKeys = partitionKeys; + this.clusteringKeys = clusteringKeys; + this.ifNotExists = ifNotExists; + } + + + public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException + { + if (!baseName.hasKeyspace()) + baseName.setKeyspace(keyspace(), true); + state.hasColumnFamilyAccess(keyspace(), baseName.getColumnFamily(), Permission.ALTER); + } + + public void validate(ClientState state) throws RequestValidationException + { + // We do validation in announceMigration to reduce doubling up of work + } + + private interface AddColumn { + void add(ColumnIdentifier identifier, AbstractType<?> type); + } + + private void add(CFMetaData baseCfm, Iterable<ColumnIdentifier> columns, AddColumn adder) + { + for (ColumnIdentifier column : columns) + { + AbstractType<?> type = baseCfm.getColumnDefinition(column).type; + if (properties.definedOrdering.containsKey(column)) + { + boolean desc = properties.definedOrdering.get(column); + if (!desc && type.isReversed()) + { + type = ((ReversedType)type).baseType; + } + else if (desc && !type.isReversed()) + { + type = ReversedType.getInstance(type); + } + } + adder.add(column, type); + } + } + + public Event.SchemaChange announceMigration(boolean isLocalOnly) throws RequestValidationException + { + // We need to make sure that: + // - primary key includes all columns in base table's primary key + // - make sure that the select statement does not have anything other than columns + // and their names match the base table's names + // - make sure that primary key does not include any collections + // - make sure there is no where clause in the select statement + // - make sure there is not currently a table or view + // - make sure baseTable gcGraceSeconds > 0 + + properties.validate(); + + if (properties.useCompactStorage) + throw new InvalidRequestException("Cannot use 'COMPACT STORAGE' when defining a materialized view"); + + // We enforce the keyspace because if the RF is different, the logic to wait for a + // specific replica would break + if (!baseName.getKeyspace().equals(keyspace())) + throw new InvalidRequestException("Cannot create a materialized view on a table in a separate keyspace"); + + CFMetaData cfm = ThriftValidation.validateColumnFamily(baseName.getKeyspace(), baseName.getColumnFamily()); + + if (cfm.isCounter()) + throw new InvalidRequestException("Materialized views are not supported on counter tables"); + if (cfm.isView()) + throw new InvalidRequestException("Materialized views cannot be created against other materialized views"); + + if (cfm.params.gcGraceSeconds == 0) + { + throw new InvalidRequestException(String.format("Cannot create materialized view '%s' for base table " + + "'%s' with gc_grace_seconds of 0, since this value is " + + "used to TTL undelivered updates. Setting gc_grace_seconds" + + " too low might cause undelivered updates to expire " + + "before being replayed.", cfName.getColumnFamily(), + baseName.getColumnFamily())); + } + + Set<ColumnIdentifier> included = new HashSet<>(); + for (RawSelector selector : selectClause) + { + Selectable.Raw selectable = selector.selectable; + if (selectable instanceof Selectable.WithFieldSelection.Raw) + throw new InvalidRequestException("Cannot select out a part of type when defining a materialized view"); + if (selectable instanceof Selectable.WithFunction.Raw) + throw new InvalidRequestException("Cannot use function when defining a materialized view"); + if (selectable instanceof Selectable.WritetimeOrTTL.Raw) + throw new InvalidRequestException("Cannot use function when defining a materialized view"); + ColumnIdentifier identifier = (ColumnIdentifier) selectable.prepare(cfm); + if (selector.alias != null) + throw new InvalidRequestException(String.format("Cannot alias column '%s' as '%s' when defining a materialized view", identifier.toString(), selector.alias.toString())); + + ColumnDefinition cdef = cfm.getColumnDefinition(identifier); + + if (cdef == null) + throw new InvalidRequestException("Unknown column name detected in CREATE MATERIALIZED VIEW statement : "+identifier); + + if (cdef.isStatic()) - ClientWarn.warn(String.format("Unable to include static column '%s' in Materialized View SELECT statement", identifier)); ++ ClientWarn.instance.warn(String.format("Unable to include static column '%s' in Materialized View SELECT statement", identifier)); + else + included.add(identifier); + } + + Set<ColumnIdentifier.Raw> targetPrimaryKeys = new HashSet<>(); + for (ColumnIdentifier.Raw identifier : Iterables.concat(partitionKeys, clusteringKeys)) + { + if (!targetPrimaryKeys.add(identifier)) + throw new InvalidRequestException("Duplicate entry found in PRIMARY KEY: "+identifier); + + ColumnDefinition cdef = cfm.getColumnDefinition(identifier.prepare(cfm)); + + if (cdef == null) + throw new InvalidRequestException("Unknown column name detected in CREATE MATERIALIZED VIEW statement : "+identifier); + + if (cfm.getColumnDefinition(identifier.prepare(cfm)).type.isMultiCell()) + throw new InvalidRequestException(String.format("Cannot use MultiCell column '%s' in PRIMARY KEY of materialized view", identifier)); + + if (cdef.isStatic()) + throw new InvalidRequestException(String.format("Cannot use Static column '%s' in PRIMARY KEY of materialized view", identifier)); + } + + // build the select statement + Map<ColumnIdentifier.Raw, Boolean> orderings = Collections.emptyMap(); + SelectStatement.Parameters parameters = new SelectStatement.Parameters(orderings, false, true, false); + SelectStatement.RawStatement rawSelect = new SelectStatement.RawStatement(baseName, parameters, selectClause, whereClause, null); + + ClientState state = ClientState.forInternalCalls(); + state.setKeyspace(keyspace()); + + rawSelect.prepareKeyspace(state); + rawSelect.setBoundVariables(getBoundVariables()); + + ParsedStatement.Prepared prepared = rawSelect.prepare(true); + SelectStatement select = (SelectStatement) prepared.statement; + StatementRestrictions restrictions = select.getRestrictions(); + + if (!prepared.boundNames.isEmpty()) + throw new InvalidRequestException("Cannot use query parameters in CREATE MATERIALIZED VIEW statements"); + + if (!restrictions.nonPKRestrictedColumns(false).isEmpty()) + { + throw new InvalidRequestException(String.format( + "Non-primary key columns cannot be restricted in the SELECT statement used for materialized view " + + "creation (got restrictions on: %s)", + restrictions.nonPKRestrictedColumns(false).stream().map(def -> def.name.toString()).collect(Collectors.joining(", ")))); + } + + String whereClauseText = View.relationsToWhereClause(whereClause.relations); + + Set<ColumnIdentifier> basePrimaryKeyCols = new HashSet<>(); + for (ColumnDefinition definition : Iterables.concat(cfm.partitionKeyColumns(), cfm.clusteringColumns())) + basePrimaryKeyCols.add(definition.name); + + List<ColumnIdentifier> targetClusteringColumns = new ArrayList<>(); + List<ColumnIdentifier> targetPartitionKeys = new ArrayList<>(); + + // This is only used as an intermediate state; this is to catch whether multiple non-PK columns are used + boolean hasNonPKColumn = false; + for (ColumnIdentifier.Raw raw : partitionKeys) + hasNonPKColumn = getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetPartitionKeys, restrictions); + + for (ColumnIdentifier.Raw raw : clusteringKeys) + hasNonPKColumn = getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetClusteringColumns, restrictions); + + // We need to include all of the primary key columns from the base table in order to make sure that we do not + // overwrite values in the view. We cannot support "collapsing" the base table into a smaller number of rows in + // the view because if we need to generate a tombstone, we have no way of knowing which value is currently being + // used in the view and whether or not to generate a tombstone. In order to not surprise our users, we require + // that they include all of the columns. We provide them with a list of all of the columns left to include. + boolean missingClusteringColumns = false; + StringBuilder columnNames = new StringBuilder(); + List<ColumnIdentifier> includedColumns = new ArrayList<>(); + for (ColumnDefinition def : cfm.allColumns()) + { + ColumnIdentifier identifier = def.name; + + if ((included.isEmpty() || included.contains(identifier)) + && !targetClusteringColumns.contains(identifier) && !targetPartitionKeys.contains(identifier) + && !def.isStatic()) + { + includedColumns.add(identifier); + } + if (!def.isPrimaryKeyColumn()) continue; + + if (!targetClusteringColumns.contains(identifier) && !targetPartitionKeys.contains(identifier)) + { + if (missingClusteringColumns) + columnNames.append(','); + else + missingClusteringColumns = true; + columnNames.append(identifier); + } + } + if (missingClusteringColumns) + throw new InvalidRequestException(String.format("Cannot create Materialized View %s without primary key columns from base %s (%s)", + columnFamily(), baseName.getColumnFamily(), columnNames.toString())); + + if (targetPartitionKeys.isEmpty()) + throw new InvalidRequestException("Must select at least a column for a Materialized View"); + + if (targetClusteringColumns.isEmpty()) + throw new InvalidRequestException("No columns are defined for Materialized View other than primary key"); + + CFMetaData.Builder cfmBuilder = CFMetaData.Builder.createView(keyspace(), columnFamily()); + add(cfm, targetPartitionKeys, cfmBuilder::addPartitionKey); + add(cfm, targetClusteringColumns, cfmBuilder::addClusteringColumn); + add(cfm, includedColumns, cfmBuilder::addRegularColumn); + cfmBuilder.withId(properties.properties.getId()); + TableParams params = properties.properties.asNewTableParams(); + CFMetaData viewCfm = cfmBuilder.build().params(params); + ViewDefinition definition = new ViewDefinition(keyspace(), + columnFamily(), + Schema.instance.getId(keyspace(), baseName.getColumnFamily()), + baseName.getColumnFamily(), + included.isEmpty(), + rawSelect, + whereClauseText, + viewCfm); + + try + { + MigrationManager.announceNewView(definition, isLocalOnly); + return new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, keyspace(), columnFamily()); + } + catch (AlreadyExistsException e) + { + if (ifNotExists) + return null; + throw e; + } + } + + private static boolean getColumnIdentifier(CFMetaData cfm, + Set<ColumnIdentifier> basePK, + boolean hasNonPKColumn, + ColumnIdentifier.Raw raw, + List<ColumnIdentifier> columns, + StatementRestrictions restrictions) + { + ColumnIdentifier identifier = raw.prepare(cfm); + ColumnDefinition def = cfm.getColumnDefinition(identifier); + + boolean isPk = basePK.contains(identifier); + if (!isPk && hasNonPKColumn) + throw new InvalidRequestException(String.format("Cannot include more than one non-primary key column '%s' in materialized view partition key", identifier)); + + // We don't need to include the "IS NOT NULL" filter on a non-composite partition key + // because we will never allow a single partition key to be NULL + boolean isSinglePartitionKey = cfm.getColumnDefinition(identifier).isPartitionKey() + && cfm.partitionKeyColumns().size() == 1; + if (!isSinglePartitionKey && !restrictions.isRestricted(def)) + throw new InvalidRequestException(String.format("Primary key column '%s' is required to be filtered by 'IS NOT NULL'", identifier)); + + columns.add(identifier); + return !isPk; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index a9bb121,5cfa94b..904adca --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@@ -359,21 -256,19 +359,21 @@@ public class SelectStatement implement else if (restrictions.keyIsInRelation()) { logger.warn("Aggregation query used on multiple partition keys (IN restriction)"); - ClientWarn.warn("Aggregation query used on multiple partition keys (IN restriction)"); + ClientWarn.instance.warn("Aggregation query used on multiple partition keys (IN restriction)"); } - Selection.ResultSetBuilder result = selection.resultSetBuilder(now, parameters.isJson); + Selection.ResultSetBuilder result = selection.resultSetBuilder(parameters.isJson); while (!pager.isExhausted()) { - for (Row row : pager.fetchPage(pageSize)) + try (PartitionIterator iter = pager.fetchPage(pageSize)) { - // Not columns match the query, skip - if (row.cf == null) - continue; - - processColumnFamily(row.key.getKey(), row.cf, options, now, result); + while (iter.hasNext()) + { + try (RowIterator partition = iter.next()) + { + processPartition(partition, options, result, nowInSec); + } + } } } return new ResultMessage.Rows(result.build(options.getProtocolVersion())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ReadCommand.java index 3f0695c,cd86336..668a189 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@@ -232,713 -95,55 +232,713 @@@ public abstract class ReadCommand imple return this; } - public String getColumnFamilyName() + /** + * Sets the digest version, for when digest for that command is requested. + * <p> + * Note that we allow setting this independently of setting the command as a digest query as + * this allows us to use the command as a carrier of the digest version even if we only call + * setIsDigestQuery on some copy of it. + * + * @param digestVersion the version for the digest is this command is used for digest query.. + * @return this read command. + */ + public ReadCommand setDigestVersion(int digestVersion) { - return cfName; + this.digestVersion = digestVersion; + return this; } + /** + * Whether this query is for thrift or not. + * + * @return whether this query is for thrift. + */ + public boolean isForThrift() + { + return isForThrift; + } + + /** + * The clustering index filter this command to use for the provided key. + * <p> + * Note that that method should only be called on a key actually queried by this command + * and in practice, this will almost always return the same filter, but for the sake of + * paging, the filter on the first key of a range command might be slightly different. + * + * @param key a partition key queried by this command. + * + * @return the {@code ClusteringIndexFilter} to use for the partition of key {@code key}. + */ + public abstract ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key); + + /** + * Returns a copy of this command. + * + * @return a copy of this command. + */ public abstract ReadCommand copy(); - public abstract Row getRow(Keyspace keyspace); + protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup); + + protected abstract int oldestUnrepairedTombstone(); + + public ReadResponse createResponse(UnfilteredPartitionIterator iterator, ColumnFilter selection) + { + return isDigestQuery() + ? ReadResponse.createDigestResponse(iterator, digestVersion) + : ReadResponse.createDataResponse(iterator, selection); + } + + public long indexSerializedSize(int version) + { + if (index.isPresent()) + return IndexMetadata.serializer.serializedSize(index.get(), version); + else + return 0; + } + + public Index getIndex(ColumnFamilyStore cfs) + { + // if we've already consulted the index manager, and it returned a valid index + // the result should be cached here. + if(index.isPresent()) + return cfs.indexManager.getIndex(index.get()); + + // if no cached index is present, but we've already consulted the index manager + // then no registered index is suitable for this command, so just return null. + if (indexManagerQueried) + return null; + + // do the lookup, set the flag to indicate so and cache the result if not null + Index selected = cfs.indexManager.getBestIndexFor(this); + indexManagerQueried = true; - public abstract IDiskAtomFilter filter(); + if (selected == null) + return null; - public String getKeyspace() + index = Optional.of(selected.getIndexMetadata()); + return selected; + } + + /** + * Executes this command on the local host. + * + * @param orderGroup the operation group spanning this command + * + * @return an iterator over the result of executing this command locally. + */ + @SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary + // iterators created inside the try as long as we do close the original resultIterator), or by closing the result. + public UnfilteredPartitionIterator executeLocally(ReadOrderGroup orderGroup) { - return ksName; + long startTimeNanos = System.nanoTime(); + + ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata()); + Index index = getIndex(cfs); + + Index.Searcher searcher = null; + if (index != null) + { + if (!cfs.indexManager.isIndexQueryable(index)) + throw new IndexNotAvailableException(index); + + searcher = index.searcherFor(this); + Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexMetadata().name); + } + + UnfilteredPartitionIterator resultIterator = searcher == null + ? queryStorage(cfs, orderGroup) + : searcher.search(orderGroup); + + try + { + resultIterator = withMetricsRecording(withoutPurgeableTombstones(resultIterator, cfs), cfs.metric, startTimeNanos); + + // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so + // no point in checking it again. + RowFilter updatedFilter = searcher == null + ? rowFilter() + : index.getPostIndexQueryFilter(rowFilter()); + + // TODO: We'll currently do filtering by the rowFilter here because it's convenient. However, + // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it + // would be more efficient (the sooner we discard stuff we know we don't care, the less useless + // processing we do on it). + return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec()); + } + catch (RuntimeException | Error e) + { + resultIterator.close(); + throw e; + } } - // maybeGenerateRetryCommand is used to generate a retry for short reads - public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row) + protected abstract void recordLatency(TableMetrics metric, long latencyNanos); + + public PartitionIterator executeInternal(ReadOrderGroup orderGroup) { - return null; + return UnfilteredPartitionIterators.filter(executeLocally(orderGroup), nowInSec()); } - // maybeTrim removes columns from a response that is too long - public Row maybeTrim(Row row) + public ReadOrderGroup startOrderGroup() { - return row; + return ReadOrderGroup.forCommand(this); } - public long getTimeout() + /** + * Wraps the provided iterator so that metrics on what is scanned by the command are recorded. + * This also log warning/trow TombstoneOverwhelmingException if appropriate. + */ + private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final TableMetrics metric, final long startTimeNanos) { - return DatabaseDescriptor.getReadRpcTimeout(); + class MetricRecording extends Transformation<UnfilteredRowIterator> + { + private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold(); + private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold(); + + private final boolean respectTombstoneThresholds = !Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName); + + private int liveRows = 0; + private int tombstones = 0; + + private DecoratedKey currentKey; + + @Override + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter) + { + currentKey = iter.partitionKey(); + return Transformation.apply(iter, this); + } + + @Override + public Row applyToStatic(Row row) + { + return applyToRow(row); + } + + @Override + public Row applyToRow(Row row) + { + if (row.hasLiveData(ReadCommand.this.nowInSec())) + ++liveRows; + + for (Cell cell : row.cells()) + { + if (!cell.isLive(ReadCommand.this.nowInSec())) + countTombstone(row.clustering()); + } + return row; + } + + @Override + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + countTombstone(marker.clustering()); + return marker; + } + + private void countTombstone(ClusteringPrefix clustering) + { + ++tombstones; + if (tombstones > failureThreshold && respectTombstoneThresholds) + { + String query = ReadCommand.this.toCQLString(); + Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query); + throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering); + } + } + + @Override + public void onClose() + { + recordLatency(metric, System.nanoTime() - startTimeNanos); + + metric.tombstoneScannedHistogram.update(tombstones); + metric.liveScannedHistogram.update(liveRows); + + boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds; + if (warnTombstones) + { + String msg = String.format("Read %d live rows and %d tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, tombstones, ReadCommand.this.toCQLString()); - ClientWarn.warn(msg); ++ ClientWarn.instance.warn(msg); + logger.warn(msg); + } + + Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : "")); + } + }; + + return Transformation.apply(iter, new MetricRecording()); } -} -class ReadCommandSerializer implements IVersionedSerializer<ReadCommand> -{ - public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + /** + * Creates a message for this command. + */ + public abstract MessageOut<ReadCommand> createMessage(int version); + + protected abstract void appendCQLWhereClause(StringBuilder sb); + + // Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it + // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for purgeable tombstones (which + // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive). + protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs) + { + final boolean isForThrift = iterator.isForThrift(); + class WithoutPurgeableTombstones extends PurgeFunction + { + public WithoutPurgeableTombstones() + { + super(isForThrift, nowInSec(), cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(), cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones()); + } + + protected long getMaxPurgeableTimestamp() + { + return Long.MAX_VALUE; + } + } + return Transformation.apply(iterator, new WithoutPurgeableTombstones()); + } + + /** + * Recreate the CQL string corresponding to this query. + * <p> + * Note that in general the returned string will not be exactly the original user string, first + * because there isn't always a single syntax for a given query, but also because we don't have + * all the information needed (we know the non-PK columns queried but not the PK ones as internally + * we query them all). So this shouldn't be relied too strongly, but this should be good enough for + * debugging purpose which is what this is for. + */ + public String toCQLString() + { + StringBuilder sb = new StringBuilder(); + sb.append("SELECT ").append(columnFilter()); + sb.append(" FROM ").append(metadata().ksName).append('.').append(metadata.cfName); + appendCQLWhereClause(sb); + + if (limits() != DataLimits.NONE) + sb.append(' ').append(limits()); + return sb.toString(); + } + + private static class Serializer implements IVersionedSerializer<ReadCommand> + { + private static int digestFlag(boolean isDigest) + { + return isDigest ? 0x01 : 0; + } + + private static boolean isDigest(int flags) + { + return (flags & 0x01) != 0; + } + + private static int thriftFlag(boolean isForThrift) + { + return isForThrift ? 0x02 : 0; + } + + private static boolean isForThrift(int flags) + { + return (flags & 0x02) != 0; + } + + private static int indexFlag(boolean hasIndex) + { + return hasIndex ? 0x04 : 0; + } + + private static boolean hasIndex(int flags) + { + return (flags & 0x04) != 0; + } + + public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + { + // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly + assert version >= MessagingService.VERSION_30; + + out.writeByte(command.kind.ordinal()); + out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent())); + if (command.isDigestQuery()) + out.writeUnsignedVInt(command.digestVersion()); + CFMetaData.serializer.serialize(command.metadata(), out, version); + out.writeInt(command.nowInSec()); + ColumnFilter.serializer.serialize(command.columnFilter(), out, version); + RowFilter.serializer.serialize(command.rowFilter(), out, version); + DataLimits.serializer.serialize(command.limits(), out, version); + if (command.index.isPresent()) + IndexMetadata.serializer.serialize(command.index.get(), out, version); + + command.serializeSelection(out, version); + } + + public ReadCommand deserialize(DataInputPlus in, int version) throws IOException + { + if (version < MessagingService.VERSION_30) + return legacyReadCommandSerializer.deserialize(in, version); + + Kind kind = Kind.values()[in.readByte()]; + int flags = in.readByte(); + boolean isDigest = isDigest(flags); + boolean isForThrift = isForThrift(flags); + boolean hasIndex = hasIndex(flags); + int digestVersion = isDigest ? (int)in.readUnsignedVInt() : 0; + CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); + int nowInSec = in.readInt(); + ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata); + RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata); + DataLimits limits = DataLimits.serializer.deserialize(in, version); + Optional<IndexMetadata> index = hasIndex + ? deserializeIndexMetadata(in, version, metadata) + : Optional.empty(); + + return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index); + } + + private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException + { + try + { + return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm)); + } + catch (UnknownIndexException e) + { + String message = String.format("Couldn't find a defined index on %s.%s with the id %s. " + + "If an index was just created, this is likely due to the schema not " + + "being fully propagated. Local read will proceed without using the " + + "index. Please wait for schema agreement after index creation.", + cfm.ksName, cfm.cfName, e.indexId.toString()); + logger.info(message); + return Optional.empty(); + } + } + + public long serializedSize(ReadCommand command, int version) + { + // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly + assert version >= MessagingService.VERSION_30; + + return 2 // kind + flags + + (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0) + + CFMetaData.serializer.serializedSize(command.metadata(), version) + + TypeSizes.sizeof(command.nowInSec()) + + ColumnFilter.serializer.serializedSize(command.columnFilter(), version) + + RowFilter.serializer.serializedSize(command.rowFilter(), version) + + DataLimits.serializer.serializedSize(command.limits(), version) + + command.selectionSerializedSize(version) + + command.indexSerializedSize(version); + } + } + + // Dispatch to either Serializer or LegacyRangeSliceCommandSerializer. Only useful as long as we maintain pre-3.0 + // compatibility + private static class RangeSliceSerializer implements IVersionedSerializer<ReadCommand> + { + public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + { + if (version < MessagingService.VERSION_30) + legacyRangeSliceCommandSerializer.serialize(command, out, version); + else + serializer.serialize(command, out, version); + } + + public ReadCommand deserialize(DataInputPlus in, int version) throws IOException + { + return version < MessagingService.VERSION_30 + ? legacyRangeSliceCommandSerializer.deserialize(in, version) + : serializer.deserialize(in, version); + } + + public long serializedSize(ReadCommand command, int version) + { + return version < MessagingService.VERSION_30 + ? legacyRangeSliceCommandSerializer.serializedSize(command, version) + : serializer.serializedSize(command, version); + } + } + + private enum LegacyType + { + GET_BY_NAMES((byte)1), + GET_SLICES((byte)2); + + public final byte serializedValue; + + LegacyType(byte b) + { + this.serializedValue = b; + } + + public static LegacyType fromPartitionFilterKind(ClusteringIndexFilter.Kind kind) + { + return kind == ClusteringIndexFilter.Kind.SLICE + ? GET_SLICES + : GET_BY_NAMES; + } + + public static LegacyType fromSerializedValue(byte b) + { + return b == 1 ? GET_BY_NAMES : GET_SLICES; + } + } + + /** + * Serializer for pre-3.0 RangeSliceCommands. + */ + private static class LegacyRangeSliceCommandSerializer implements IVersionedSerializer<ReadCommand> { - out.writeByte(command.commandType.serializedValue); - switch (command.commandType) + public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + { + assert version < MessagingService.VERSION_30; + + PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command; + assert !rangeCommand.dataRange().isPaging(); + + // convert pre-3.0 incompatible names filters to slice filters + rangeCommand = maybeConvertNamesToSlice(rangeCommand); + + CFMetaData metadata = rangeCommand.metadata(); + + out.writeUTF(metadata.ksName); + out.writeUTF(metadata.cfName); + out.writeLong(rangeCommand.nowInSec() * 1000L); // convert from seconds to millis + + // begin DiskAtomFilterSerializer.serialize() + if (rangeCommand.isNamesQuery()) + { + out.writeByte(1); // 0 for slices, 1 for names + ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter; + LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, filter, out); + } + else + { + out.writeByte(0); // 0 for slices, 1 for names + + // slice filter serialization + ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter; + + boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); + LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata); + + out.writeBoolean(filter.isReversed()); + + // limit + DataLimits.Kind kind = rangeCommand.limits().kind(); + boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && rangeCommand.limits().perPartitionCount() == 1; + if (isDistinct) + out.writeInt(1); + else + out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices())); + + int compositesToGroup; + boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); + if (kind == DataLimits.Kind.THRIFT_LIMIT) + compositesToGroup = -1; + else if (isDistinct && !selectsStatics) + compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490) + else + compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size(); + + out.writeInt(compositesToGroup); + } + + serializeRowFilter(out, rangeCommand.rowFilter()); + AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version); + + // maxResults + out.writeInt(rangeCommand.limits().count()); + + // countCQL3Rows + if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1) // if for Thrift or DISTINCT + out.writeBoolean(false); + else + out.writeBoolean(true); + + // isPaging + out.writeBoolean(false); + } + + public ReadCommand deserialize(DataInputPlus in, int version) throws IOException + { + assert version < MessagingService.VERSION_30; + + String keyspace = in.readUTF(); + String columnFamily = in.readUTF(); + + CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily); + if (metadata == null) + { + String message = String.format("Got legacy range command for nonexistent table %s.%s.", keyspace, columnFamily); + throw new UnknownColumnFamilyException(message, null); + } + + int nowInSec = (int) (in.readLong() / 1000); // convert from millis to seconds + + ClusteringIndexFilter filter; + ColumnFilter selection; + int compositesToGroup = 0; + int perPartitionLimit = -1; + byte readType = in.readByte(); // 0 for slices, 1 for names + if (readType == 1) + { + Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata); + selection = selectionAndFilter.left; + filter = selectionAndFilter.right; + } + else + { + Pair<ClusteringIndexSliceFilter, Boolean> p = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata); + filter = p.left; + perPartitionLimit = in.readInt(); + compositesToGroup = in.readInt(); + selection = getColumnSelectionForSlice(p.right, compositesToGroup, metadata); + } + + RowFilter rowFilter = deserializeRowFilter(in, metadata); + + AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version); + int maxResults = in.readInt(); + + in.readBoolean(); // countCQL3Rows (not needed) + in.readBoolean(); // isPaging (not needed) + + boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING)); + boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit == 1 && selectsStatics); + DataLimits limits; + if (isDistinct) + limits = DataLimits.distinctLimits(maxResults); + else if (compositesToGroup == -1) + limits = DataLimits.thriftLimits(maxResults, perPartitionLimit); + else + limits = DataLimits.cqlLimits(maxResults); + + return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty()); + } + + static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException + { + ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rowFilter.iterator()); + out.writeInt(indexExpressions.size()); + for (RowFilter.Expression expression : indexExpressions) + { + ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out); + expression.operator().writeTo(out); + ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out); + } + } + + static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData metadata) throws IOException + { + int numRowFilters = in.readInt(); + if (numRowFilters == 0) + return RowFilter.NONE; + + RowFilter rowFilter = RowFilter.create(numRowFilters); + for (int i = 0; i < numRowFilters; i++) + { + ByteBuffer columnName = ByteBufferUtil.readWithShortLength(in); + ColumnDefinition column = metadata.getColumnDefinition(columnName); + Operator op = Operator.readFrom(in); + ByteBuffer indexValue = ByteBufferUtil.readWithShortLength(in); + rowFilter.add(column, op, indexValue); + } + return rowFilter; + } + + static long serializedRowFilterSize(RowFilter rowFilter) + { + long size = TypeSizes.sizeof(0); // rowFilterCount + for (RowFilter.Expression expression : rowFilter) + { + size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes); + size += TypeSizes.sizeof(0); // operator int value + size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue()); + } + return size; + } + + public long serializedSize(ReadCommand command, int version) + { + assert version < MessagingService.VERSION_30; + assert command.kind == Kind.PARTITION_RANGE; + + PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command; + rangeCommand = maybeConvertNamesToSlice(rangeCommand); + CFMetaData metadata = rangeCommand.metadata(); + + long size = TypeSizes.sizeof(metadata.ksName); + size += TypeSizes.sizeof(metadata.cfName); + size += TypeSizes.sizeof((long) rangeCommand.nowInSec()); + + size += 1; // single byte flag: 0 for slices, 1 for names + if (rangeCommand.isNamesQuery()) + { + PartitionColumns columns = rangeCommand.columnFilter().fetchedColumns(); + ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter; + size += LegacyReadCommandSerializer.serializedNamesFilterSize(filter, metadata, columns); + } + else + { + ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter; + boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); + size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata); + size += TypeSizes.sizeof(filter.isReversed()); + size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount()); + size += TypeSizes.sizeof(0); // compositesToGroup + } + + if (rangeCommand.rowFilter().equals(RowFilter.NONE)) + { + size += TypeSizes.sizeof(0); + } + else + { + ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rangeCommand.rowFilter().iterator()); + size += TypeSizes.sizeof(indexExpressions.size()); + for (RowFilter.Expression expression : indexExpressions) + { + size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes); + size += TypeSizes.sizeof(expression.operator().ordinal()); + size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue()); + } + } + + size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version); + size += TypeSizes.sizeof(rangeCommand.limits().count()); + size += TypeSizes.sizeof(!rangeCommand.isForThrift()); + return size + TypeSizes.sizeof(rangeCommand.dataRange().isPaging()); + } + + static PartitionRangeReadCommand maybeConvertNamesToSlice(PartitionRangeReadCommand command) { - case GET_BY_NAMES: - SliceByNamesReadCommand.serializer.serialize(command, out, version); - break; - case GET_SLICES: - SliceFromReadCommand.serializer.serialize(command, out, version); - break; - default: - throw new AssertionError(); + if (!command.dataRange().isNamesQuery()) + return command; + + CFMetaData metadata = command.metadata(); + if (!LegacyReadCommandSerializer.shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns())) + return command; + + ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter; + ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata); + DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter); + return new PartitionRangeReadCommand( + command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(), + command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty()); + } + + static ColumnFilter getColumnSelectionForSlice(boolean selectsStatics, int compositesToGroup, CFMetaData metadata) + { + // A value of -2 indicates this is a DISTINCT query that doesn't select static columns, only partition keys. + // In that case, we'll basically be querying the first row of the partition, but we must make sure we include + // all columns so we get at least one cell if there is a live row as it would confuse pre-3.0 nodes otherwise. + if (compositesToGroup == -2) + return ColumnFilter.all(metadata); + + // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all + PartitionColumns columns = selectsStatics + ? metadata.partitionColumns() + : metadata.partitionColumns().withoutStatics(); + return ColumnFilter.selectionBuilder().addAll(columns).build(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/94e7ef17/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/MessagingService.java index ccc900b,459923b..d416dca --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@@ -39,8 -39,12 +39,9 @@@ import com.google.common.collect.Lists import com.google.common.collect.Sets; import org.cliffc.high_scale_lib.NonBlockingHashMap; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import org.apache.cassandra.concurrent.ExecutorLocal; + import org.apache.cassandra.concurrent.ExecutorLocals; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager;