Merge commit '452d626a7a6b03917b7bd72a5dfe9da8a27e0903' into cassandra-2.2 * commit '452d626a7a6b03917b7bd72a5dfe9da8a27e0903': Fix handling of clustering key > 64K
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0171259d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0171259d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0171259d Branch: refs/heads/cassandra-3.0 Commit: 0171259d05a2dd5ec901ccadda016361312b59a9 Parents: 8d5b7b6 452d626 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Thu Jun 23 10:55:27 2016 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Thu Jun 23 10:57:32 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../restrictions/StatementRestrictions.java | 7 ++++- .../cassandra/net/OutboundTcpConnection.java | 3 +- .../apache/cassandra/utils/ByteBufferUtil.java | 6 ++-- .../org/apache/cassandra/cql3/CQLTester.java | 1 + .../cql3/validation/operations/InsertTest.java | 19 +++++++++++++ .../cql3/validation/operations/SelectTest.java | 30 +++++++++++++++++++- 7 files changed, 62 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0171259d/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 5106fad,9a3779c..59a9794 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,35 -1,5 +1,35 @@@ -2.1.15 +2.2.7 + * Validate bloom_filter_fp_chance against lowest supported + value when the table is created (CASSANDRA-11920) + * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013) + * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038) + * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984) + * Persist local metadata earlier in startup sequence (CASSANDRA-11742) + * Run CommitLog tests with different compression settings (CASSANDRA-9039) + * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664) + * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587) + * Fix possible race condition in CommitLog.recover (CASSANDRA-11743) + * Enable client encryption in sstableloader with cli options (CASSANDRA-11708) + * Possible memory leak in NIODataInputStream (CASSANDRA-11867) + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669) + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753) + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395) + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626) + * Exit JVM if JMX server fails to startup (CASSANDRA-11540) + * Produce a heap dump when exiting on OOM (CASSANDRA-9861) + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427) + * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510) + * JSON datetime formatting needs timezone (CASSANDRA-11137) + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502) + * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660) + * Add missing files to debian packages (CASSANDRA-11642) + * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621) + * cqlsh: COPY FROM should use regular inserts for single statement batches and + report errors correctly if workers processes crash on initialization (CASSANDRA-11474) + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553) + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988) +Merged from 2.1: - 2.1.15 + * Prevent select statements with clustering key > 64k (CASSANDRA-11882) * Fix clock skew corrupting other nodes with paxos (CASSANDRA-11991) * Remove distinction between non-existing static columns and existing but null in LWTs (CASSANDRA-9842) * Support mlockall on IBM POWER arch (CASSANDRA-11576) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0171259d/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java index 5b7c58d,0000000..6f03c40 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java @@@ -1,608 -1,0 +1,613 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.restrictions; + +import java.nio.ByteBuffer; +import java.util.*; + +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.*; +import org.apache.cassandra.cql3.functions.Function; +import org.apache.cassandra.cql3.statements.Bound; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.composites.Composite; +import org.apache.cassandra.db.index.SecondaryIndexManager; +import org.apache.cassandra.dht.*; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static org.apache.cassandra.config.ColumnDefinition.toIdentifiers; +import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse; +import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull; +import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue; +import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest; + +/** + * The restrictions corresponding to the relations specified on the where-clause of CQL query. + */ +public final class StatementRestrictions +{ + public static final String REQUIRES_ALLOW_FILTERING_MESSAGE = + "Cannot execute this query as it might involve data filtering and " + + "thus may have unpredictable performance. If you want to execute " + + "this query despite the performance unpredictability, use ALLOW FILTERING"; + + /** + * The Column Family meta data + */ + public final CFMetaData cfm; + + /** + * Restrictions on partitioning columns + */ + private PrimaryKeyRestrictions partitionKeyRestrictions; + + /** + * Restrictions on clustering columns + */ + private PrimaryKeyRestrictions clusteringColumnsRestrictions; + + /** + * Restriction on non-primary key columns (i.e. secondary index restrictions) + */ + private RestrictionSet nonPrimaryKeyRestrictions; + + /** + * The restrictions used to build the index expressions + */ + private final List<Restrictions> indexRestrictions = new ArrayList<>(); + + /** + * <code>true</code> if the secondary index need to be queried, <code>false</code> otherwise + */ + private boolean usesSecondaryIndexing; + + /** + * Specify if the query will return a range of partition keys. + */ + private boolean isKeyRange; + + /** + * Creates a new empty <code>StatementRestrictions</code>. + * + * @param cfm the column family meta data + * @return a new empty <code>StatementRestrictions</code>. + */ + public static StatementRestrictions empty(CFMetaData cfm) + { + return new StatementRestrictions(cfm); + } + + private StatementRestrictions(CFMetaData cfm) + { + this.cfm = cfm; + this.partitionKeyRestrictions = new PrimaryKeyRestrictionSet(cfm.getKeyValidatorAsCType()); + this.clusteringColumnsRestrictions = new PrimaryKeyRestrictionSet(cfm.comparator); + this.nonPrimaryKeyRestrictions = new RestrictionSet(); + } + + public StatementRestrictions(CFMetaData cfm, + List<Relation> whereClause, + VariableSpecifications boundNames, + boolean selectsOnlyStaticColumns, + boolean selectACollection, + boolean useFiltering) + { + this.cfm = cfm; + this.partitionKeyRestrictions = new PrimaryKeyRestrictionSet(cfm.getKeyValidatorAsCType()); + this.clusteringColumnsRestrictions = new PrimaryKeyRestrictionSet(cfm.comparator, cfm); + this.nonPrimaryKeyRestrictions = new RestrictionSet(); + + /* + * WHERE clause. For a given entity, rules are: - EQ relation conflicts with anything else (including a 2nd EQ) + * - Can't have more than one LT(E) relation (resp. GT(E) relation) - IN relation are restricted to row keys + * (for now) and conflicts with anything else (we could allow two IN for the same entity but that doesn't seem + * very useful) - The value_alias cannot be restricted in any way (we don't support wide rows with indexed value + * in CQL so far) + */ + for (Relation relation : whereClause) + addRestriction(relation.toRestriction(cfm, boundNames)); + + SecondaryIndexManager secondaryIndexManager = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName).indexManager; + boolean hasQueriableClusteringColumnIndex = clusteringColumnsRestrictions.hasSupportingIndex(secondaryIndexManager); + boolean hasQueriableIndex = hasQueriableClusteringColumnIndex + || partitionKeyRestrictions.hasSupportingIndex(secondaryIndexManager) + || nonPrimaryKeyRestrictions.hasSupportingIndex(secondaryIndexManager); + + // At this point, the select statement if fully constructed, but we still have a few things to validate + processPartitionKeyRestrictions(hasQueriableIndex); + + // Some but not all of the partition key columns have been specified; + // hence we need turn these restrictions into index expressions. + if (usesSecondaryIndexing) + indexRestrictions.add(partitionKeyRestrictions); + + checkFalse(selectsOnlyStaticColumns && hasClusteringColumnsRestriction(), + "Cannot restrict clustering columns when selecting only static columns"); + + processClusteringColumnsRestrictions(hasQueriableIndex, selectACollection); + + // Covers indexes on the first clustering column (among others). + if (isKeyRange && hasQueriableClusteringColumnIndex) + usesSecondaryIndexing = true; + + usesSecondaryIndexing = usesSecondaryIndexing || clusteringColumnsRestrictions.isContains(); + + if (usesSecondaryIndexing) + indexRestrictions.add(clusteringColumnsRestrictions); + + // Even if usesSecondaryIndexing is false at this point, we'll still have to use one if + // there is restrictions not covered by the PK. + if (!nonPrimaryKeyRestrictions.isEmpty()) + { + if (!hasQueriableIndex) + { + // Filtering for non-index query is only supported for thrift static CFs + if (cfm.comparator.isDense() || cfm.comparator.isCompound()) + throw invalidRequest("Predicates on non-primary-key columns (%s) are not yet supported for non secondary index queries", + Joiner.on(", ").join(toIdentifiers(nonPrimaryKeyRestrictions.getColumnDefs()))); + + if (!useFiltering) + throw invalidRequest(REQUIRES_ALLOW_FILTERING_MESSAGE); + } + usesSecondaryIndexing = true; + indexRestrictions.add(nonPrimaryKeyRestrictions); + } + + if (usesSecondaryIndexing) + validateSecondaryIndexSelections(selectsOnlyStaticColumns); + } + + private void addRestriction(Restriction restriction) throws InvalidRequestException + { + if (restriction.isMultiColumn()) + clusteringColumnsRestrictions = clusteringColumnsRestrictions.mergeWith(restriction); + else if (restriction.isOnToken()) + partitionKeyRestrictions = partitionKeyRestrictions.mergeWith(restriction); + else + addSingleColumnRestriction((SingleColumnRestriction) restriction); + } + + public Iterable<Function> getFunctions() + { + return Iterables.concat(partitionKeyRestrictions.getFunctions(), + clusteringColumnsRestrictions.getFunctions(), + nonPrimaryKeyRestrictions.getFunctions()); + } + + private void addSingleColumnRestriction(SingleColumnRestriction restriction) throws InvalidRequestException + { + ColumnDefinition def = restriction.columnDef; + if (def.isPartitionKey()) + partitionKeyRestrictions = partitionKeyRestrictions.mergeWith(restriction); + else if (def.isClusteringColumn()) + clusteringColumnsRestrictions = clusteringColumnsRestrictions.mergeWith(restriction); + else + nonPrimaryKeyRestrictions = nonPrimaryKeyRestrictions.addRestriction(restriction); + } + + /** + * Checks if the restrictions on the partition key is an IN restriction. + * + * @return <code>true</code> the restrictions on the partition key is an IN restriction, <code>false</code> + * otherwise. + */ + public boolean keyIsInRelation() + { + return partitionKeyRestrictions.isIN(); + } + + /** + * Checks if the query request a range of partition keys. + * + * @return <code>true</code> if the query request a range of partition keys, <code>false</code> otherwise. + */ + public boolean isKeyRange() + { + return this.isKeyRange; + } + + /** + * Checks if the secondary index need to be queried. + * + * @return <code>true</code> if the secondary index need to be queried, <code>false</code> otherwise. + */ + public boolean usesSecondaryIndexing() + { + return this.usesSecondaryIndexing; + } + + private void processPartitionKeyRestrictions(boolean hasQueriableIndex) throws InvalidRequestException + { + // If there is a queriable index, no special condition are required on the other restrictions. + // But we still need to know 2 things: + // - If we don't have a queriable index, is the query ok + // - Is it queriable without 2ndary index, which is always more efficient + // If a component of the partition key is restricted by a relation, all preceding + // components must have a EQ. Only the last partition key component can be in IN relation. + if (partitionKeyRestrictions.isOnToken()) + isKeyRange = true; + + if (hasPartitionKeyUnrestrictedComponents()) + { + if (!partitionKeyRestrictions.isEmpty()) + { + if (!hasQueriableIndex) + throw invalidRequest("Partition key parts: %s must be restricted as other parts are", + Joiner.on(", ").join(getPartitionKeyUnrestrictedComponents())); + } + + isKeyRange = true; + usesSecondaryIndexing = hasQueriableIndex; + } + } + + /** + * Checks if the partition key has some unrestricted components. + * @return <code>true</code> if the partition key has some unrestricted components, <code>false</code> otherwise. + */ + private boolean hasPartitionKeyUnrestrictedComponents() + { + return partitionKeyRestrictions.size() < cfm.partitionKeyColumns().size(); + } + + public boolean hasPartitionKeyRestrictions() + { + return !partitionKeyRestrictions.isEmpty(); + } + + /** + * Checks if the restrictions contain any non-primary key restrictions + * @return <code>true</code> if the restrictions contain any non-primary key restrictions, <code>false</code> otherwise. + */ + public boolean hasNonPrimaryKeyRestrictions() + { + return !nonPrimaryKeyRestrictions.isEmpty(); + } + + /** + * Returns the partition key components that are not restricted. + * @return the partition key components that are not restricted. + */ + private List<ColumnIdentifier> getPartitionKeyUnrestrictedComponents() + { + List<ColumnDefinition> list = new ArrayList<>(cfm.partitionKeyColumns()); + list.removeAll(partitionKeyRestrictions.getColumnDefs()); + return ColumnDefinition.toIdentifiers(list); + } + + /** + * Processes the clustering column restrictions. + * + * @param hasQueriableIndex <code>true</code> if some of the queried data are indexed, <code>false</code> otherwise + * @param selectACollection <code>true</code> if the query should return a collection column + * @throws InvalidRequestException if the request is invalid + */ + private void processClusteringColumnsRestrictions(boolean hasQueriableIndex, + boolean selectACollection) throws InvalidRequestException + { + checkFalse(clusteringColumnsRestrictions.isIN() && selectACollection, + "Cannot restrict clustering columns by IN relations when a collection is selected by the query"); + checkFalse(clusteringColumnsRestrictions.isContains() && !hasQueriableIndex, + "Cannot restrict clustering columns by a CONTAINS relation without a secondary index"); + + if (hasClusteringColumnsRestriction() && clusteringRestrictionsNeedFiltering()) + { + if (hasQueriableIndex) + { + usesSecondaryIndexing = true; + return; + } + + List<ColumnDefinition> clusteringColumns = cfm.clusteringColumns(); + List<ColumnDefinition> restrictedColumns = new LinkedList<>(clusteringColumnsRestrictions.getColumnDefs()); + + for (int i = 0, m = restrictedColumns.size(); i < m; i++) + { + ColumnDefinition clusteringColumn = clusteringColumns.get(i); + ColumnDefinition restrictedColumn = restrictedColumns.get(i); + + if (!clusteringColumn.equals(restrictedColumn)) + { + throw invalidRequest( + "PRIMARY KEY column \"%s\" cannot be restricted as preceding column \"%s\" is not restricted", + restrictedColumn.name, + clusteringColumn.name); + } + } + } + } + + public final boolean clusteringRestrictionsNeedFiltering() + { + assert clusteringColumnsRestrictions instanceof PrimaryKeyRestrictionSet; + return ((PrimaryKeyRestrictionSet) clusteringColumnsRestrictions).needsFiltering(); + } + + public List<IndexExpression> getIndexExpressions(SecondaryIndexManager indexManager, + QueryOptions options) throws InvalidRequestException + { + if (!usesSecondaryIndexing || indexRestrictions.isEmpty()) + return Collections.emptyList(); + + List<IndexExpression> expressions = new ArrayList<>(); + for (Restrictions restrictions : indexRestrictions) + restrictions.addIndexExpressionTo(expressions, indexManager, options); + + return expressions; + } + + /** + * Returns the partition keys for which the data is requested. + * + * @param options the query options + * @return the partition keys for which the data is requested. + * @throws InvalidRequestException if the partition keys cannot be retrieved + */ + public Collection<ByteBuffer> getPartitionKeys(final QueryOptions options) throws InvalidRequestException + { + return partitionKeyRestrictions.values(options); + } + + /** + * Returns the specified bound of the partition key. + * + * @param b the boundary type + * @param options the query options + * @return the specified bound of the partition key + * @throws InvalidRequestException if the boundary cannot be retrieved + */ + private ByteBuffer getPartitionKeyBound(Bound b, QueryOptions options) throws InvalidRequestException + { + // Deal with unrestricted partition key components (special-casing is required to deal with 2i queries on the + // first + // component of a composite partition key). + if (hasPartitionKeyUnrestrictedComponents()) + return ByteBufferUtil.EMPTY_BYTE_BUFFER; + + // We deal with IN queries for keys in other places, so we know buildBound will return only one result + return partitionKeyRestrictions.bounds(b, options).get(0); + } + + /** + * Returns the partition key bounds. + * + * @param options the query options + * @return the partition key bounds + * @throws InvalidRequestException if the query is invalid + */ + public AbstractBounds<RowPosition> getPartitionKeyBounds(QueryOptions options) throws InvalidRequestException + { + IPartitioner p = StorageService.getPartitioner(); + + if (partitionKeyRestrictions.isOnToken()) + { + return getPartitionKeyBoundsForTokenRestrictions(p, options); + } + + return getPartitionKeyBounds(p, options); + } + + private AbstractBounds<RowPosition> getPartitionKeyBounds(IPartitioner p, + QueryOptions options) throws InvalidRequestException + { + ByteBuffer startKeyBytes = getPartitionKeyBound(Bound.START, options); + ByteBuffer finishKeyBytes = getPartitionKeyBound(Bound.END, options); + + RowPosition startKey = RowPosition.ForKey.get(startKeyBytes, p); + RowPosition finishKey = RowPosition.ForKey.get(finishKeyBytes, p); + + if (startKey.compareTo(finishKey) > 0 && !finishKey.isMinimum()) + return null; + + if (partitionKeyRestrictions.isInclusive(Bound.START)) + { + return partitionKeyRestrictions.isInclusive(Bound.END) + ? new Bounds<>(startKey, finishKey) + : new IncludingExcludingBounds<>(startKey, finishKey); + } + + return partitionKeyRestrictions.isInclusive(Bound.END) + ? new Range<>(startKey, finishKey) + : new ExcludingBounds<>(startKey, finishKey); + } + + private AbstractBounds<RowPosition> getPartitionKeyBoundsForTokenRestrictions(IPartitioner p, + QueryOptions options) + throws InvalidRequestException + { + Token startToken = getTokenBound(Bound.START, options, p); + Token endToken = getTokenBound(Bound.END, options, p); + + boolean includeStart = partitionKeyRestrictions.isInclusive(Bound.START); + boolean includeEnd = partitionKeyRestrictions.isInclusive(Bound.END); + + /* + * If we ask SP.getRangeSlice() for (token(200), token(200)], it will happily return the whole ring. + * However, wrapping range doesn't really make sense for CQL, and we want to return an empty result in that + * case (CASSANDRA-5573). So special case to create a range that is guaranteed to be empty. + * + * In practice, we want to return an empty result set if either startToken > endToken, or both are equal but + * one of the bound is excluded (since [a, a] can contains something, but not (a, a], [a, a) or (a, a)). + * Note though that in the case where startToken or endToken is the minimum token, then this special case + * rule should not apply. + */ + int cmp = startToken.compareTo(endToken); + if (!startToken.isMinimum() && !endToken.isMinimum() + && (cmp > 0 || (cmp == 0 && (!includeStart || !includeEnd)))) + return null; + + RowPosition start = includeStart ? startToken.minKeyBound() : startToken.maxKeyBound(); + RowPosition end = includeEnd ? endToken.maxKeyBound() : endToken.minKeyBound(); + + return new Range<>(start, end); + } + + private Token getTokenBound(Bound b, QueryOptions options, IPartitioner p) throws InvalidRequestException + { + if (!partitionKeyRestrictions.hasBound(b)) + return p.getMinimumToken(); + + ByteBuffer value = partitionKeyRestrictions.bounds(b, options).get(0); + checkNotNull(value, "Invalid null token value"); + return p.getTokenFactory().fromByteArray(value); + } + + /** + * Checks if the query does not contains any restriction on the clustering columns. + * + * @return <code>true</code> if the query does not contains any restriction on the clustering columns, + * <code>false</code> otherwise. + */ + public boolean hasNoClusteringColumnsRestriction() + { + return clusteringColumnsRestrictions.isEmpty(); + } + + /** + * Checks if the query has some restrictions on the clustering columns. + * + * @return <code>true</code> if the query has some restrictions on the clustering columns, + * <code>false</code> otherwise. + */ + public boolean hasClusteringColumnsRestriction() + { + return !clusteringColumnsRestrictions.isEmpty(); + } + + // For non-composite slices, we don't support internally the difference between exclusive and + // inclusive bounds, so we deal with it manually. + public boolean isNonCompositeSliceWithExclusiveBounds() + { + return !cfm.comparator.isCompound() + && clusteringColumnsRestrictions.isSlice() + && (!clusteringColumnsRestrictions.isInclusive(Bound.START) || !clusteringColumnsRestrictions.isInclusive(Bound.END)); + } + + /** + * Returns the requested clustering columns as <code>Composite</code>s. + * + * @param options the query options + * @return the requested clustering columns as <code>Composite</code>s + * @throws InvalidRequestException if the query is not valid + */ + public List<Composite> getClusteringColumnsAsComposites(QueryOptions options) throws InvalidRequestException + { + return clusteringColumnsRestrictions.valuesAsComposites(options); + } + + /** + * Returns the bounds (start or end) of the clustering columns as <code>Composites</code>. + * + * @param b the bound type + * @param options the query options + * @return the bounds (start or end) of the clustering columns as <code>Composites</code> + * @throws InvalidRequestException if the request is not valid + */ + public List<Composite> getClusteringColumnsBoundsAsComposites(Bound b, + QueryOptions options) throws InvalidRequestException + { - return clusteringColumnsRestrictions.boundsAsComposites(b, options); ++ List<Composite> bounds = clusteringColumnsRestrictions.boundsAsComposites(b, options); ++ for (Composite c : bounds) { ++ if (!c.isEmpty()) ++ QueryProcessor.validateComposite(c, cfm.comparator); ++ } ++ return bounds; + } + + /** + * Returns the bounds (start or end) of the clustering columns. + * + * @param b the bound type + * @param options the query options + * @return the bounds (start or end) of the clustering columns + * @throws InvalidRequestException if the request is not valid + */ + public List<ByteBuffer> getClusteringColumnsBounds(Bound b, QueryOptions options) throws InvalidRequestException + { + return clusteringColumnsRestrictions.bounds(b, options); + } + + /** + * Checks if the bounds (start or end) of the clustering columns are inclusive. + * + * @param bound the bound type + * @return <code>true</code> if the bounds (start or end) of the clustering columns are inclusive, + * <code>false</code> otherwise + */ + public boolean areRequestedBoundsInclusive(Bound bound) + { + return clusteringColumnsRestrictions.isInclusive(bound); + } + + /** + * Checks if the query returns a range of columns. + * + * @return <code>true</code> if the query returns a range of columns, <code>false</code> otherwise. + */ + public boolean isColumnRange() + { + // Due to CASSANDRA-5762, we always do a slice for CQL3 tables (not dense, composite). + // Static CF (non dense but non composite) never entails a column slice however + if (!cfm.comparator.isDense()) + return cfm.comparator.isCompound(); + + // Otherwise (i.e. for compact table where we don't have a row marker anyway and thus don't care about + // CASSANDRA-5762), + // it is a range query if it has at least one the column alias for which no relation is defined or is not EQ. + return clusteringColumnsRestrictions.size() < cfm.clusteringColumns().size() || clusteringColumnsRestrictions.isSlice(); + } + + /** + * Checks if the query need to use filtering. + * @return <code>true</code> if the query need to use filtering, <code>false</code> otherwise. + */ + public boolean needFiltering() + { + int numberOfRestrictedColumns = 0; + for (Restrictions restrictions : indexRestrictions) + numberOfRestrictedColumns += restrictions.size(); + + return numberOfRestrictedColumns > 1 + || (numberOfRestrictedColumns == 0 && !clusteringColumnsRestrictions.isEmpty()) + || (numberOfRestrictedColumns != 0 + && nonPrimaryKeyRestrictions.hasMultipleContains()); + } + + private void validateSecondaryIndexSelections(boolean selectsOnlyStaticColumns) throws InvalidRequestException + { + checkFalse(keyIsInRelation(), + "Select on indexed columns and with IN clause for the PRIMARY KEY are not supported"); + // When the user only select static columns, the intent is that we don't query the whole partition but just + // the static parts. But 1) we don't have an easy way to do that with 2i and 2) since we don't support index on + // static columns + // so far, 2i means that you've restricted a non static column, so the query is somewhat non-sensical. + checkFalse(selectsOnlyStaticColumns, "Queries using 2ndary indexes don't support selecting only static columns"); + } + + public void reverse() + { + clusteringColumnsRestrictions = new ReversedPrimaryKeyRestrictions(clusteringColumnsRestrictions); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0171259d/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 6a507c1,1a88220..a1ba4b4 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@@ -292,13 -285,14 +292,14 @@@ public class OutboundTcpConnection exte if (flush) out.flush(); } - catch (Exception e) + catch (Throwable e) { + JVMStabilityInspector.inspectThrowable(e); disconnect(); - if (e instanceof IOException) + if (e instanceof IOException || e.getCause() instanceof IOException) { - if (logger.isDebugEnabled()) - logger.debug("error writing to {}", poolReference.endPoint(), e); + if (logger.isTraceEnabled()) + logger.trace("error writing to {}", poolReference.endPoint(), e); // if the message was important, such as a repair acknowledgement, put it back on the queue // to retry after re-connecting. See CASSANDRA-5393 http://git-wip-us.apache.org/repos/asf/cassandra/blob/0171259d/src/java/org/apache/cassandra/utils/ByteBufferUtil.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/utils/ByteBufferUtil.java index 1831c19,b78b8eb..6c676e0 --- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java +++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java @@@ -298,7 -287,9 +298,8 @@@ public class ByteBufferUti public static void writeWithShortLength(ByteBuffer buffer, DataOutputPlus out) throws IOException { int length = buffer.remaining(); - assert 0 <= length && length <= FBUtilities.MAX_UNSIGNED_SHORT : length; + assert 0 <= length && length <= FBUtilities.MAX_UNSIGNED_SHORT : - String.format("Attempted serializing to buffer exceeded maximum of %s bytes: %s", FBUtilities.MAX_UNSIGNED_SHORT, length); - ++ String.format("Attempted serializing to buffer exceeded maximum of %s bytes: %s", FBUtilities.MAX_UNSIGNED_SHORT, length); out.writeShort(length); out.write(buffer); } @@@ -306,7 -297,9 +307,8 @@@ public static void writeWithShortLength(byte[] buffer, DataOutput out) throws IOException { int length = buffer.length; - assert 0 <= length && length <= FBUtilities.MAX_UNSIGNED_SHORT : length; + assert 0 <= length && length <= FBUtilities.MAX_UNSIGNED_SHORT : - String.format("Attempted serializing to buffer exceeded maximum of %s bytes: %s", FBUtilities.MAX_UNSIGNED_SHORT, length); - ++ String.format("Attempted serializing to buffer exceeded maximum of %s bytes: %s", FBUtilities.MAX_UNSIGNED_SHORT, length); out.writeShort(length); out.write(buffer); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0171259d/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java index 5e17d1b,34c0980..98b8e23 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@@ -77,66 -61,19 +77,67 @@@ public abstract class CQLTeste protected static final Logger logger = LoggerFactory.getLogger(CQLTester.class); public static final String KEYSPACE = "cql_test_keyspace"; - private static final boolean USE_PREPARED_VALUES = Boolean.valueOf(System.getProperty("cassandra.test.use_prepared", "true")); + public static final String KEYSPACE_PER_TEST = "cql_test_keyspace_alt"; + protected static final boolean USE_PREPARED_VALUES = Boolean.valueOf(System.getProperty("cassandra.test.use_prepared", "true")); protected static final long ROW_CACHE_SIZE_IN_MB = Integer.valueOf(System.getProperty("cassandra.test.row_cache_size_in_mb", "0")); private static final AtomicInteger seqNumber = new AtomicInteger(); - protected static final ByteBuffer TOO_BIG = ByteBuffer.allocate(FBUtilities.MAX_UNSIGNED_SHORT + 1024); ++ protected static final ByteBuffer TOO_BIG = ByteBuffer.allocate(1024 * 65); + + private static org.apache.cassandra.transport.Server server; + protected static final int nativePort; + protected static final InetAddress nativeAddr; + private static final Cluster[] cluster; + private static final Session[] session; + + public static int maxProtocolVersion; + static { + int version; + for (version = 1; version <= Server.CURRENT_VERSION; ) + { + try + { + ProtocolVersion.fromInt(++version); + } + catch (IllegalArgumentException e) + { + version--; + break; + } + } + maxProtocolVersion = version; + cluster = new Cluster[maxProtocolVersion]; + session = new Session[maxProtocolVersion]; - static - { // Once per-JVM is enough SchemaLoader.prepareServer(); + + nativeAddr = InetAddress.getLoopbackAddress(); + + try + { + try (ServerSocket serverSocket = new ServerSocket(0)) + { + nativePort = serverSocket.getLocalPort(); + } + Thread.sleep(250); + } + catch (Exception e) + { + throw new RuntimeException(e); + } } + public static ResultMessage lastSchemaChangeResult; + private List<String> tables = new ArrayList<>(); private List<String> types = new ArrayList<>(); + private List<String> functions = new ArrayList<>(); + private List<String> aggregates = new ArrayList<>(); + + // We don't use USE_PREPARED_VALUES in the code below so some test can foce value preparation (if the result + // is not expected to be the same without preparation) + private boolean usePrepared = USE_PREPARED_VALUES; + private static final boolean reusePrepared = Boolean.valueOf(System.getProperty("cassandra.test.reuse_prepared", "true")); @BeforeClass public static void setUpClass() http://git-wip-us.apache.org/repos/asf/cassandra/blob/0171259d/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java index 6e9d212,99ec908..1d532cb --- a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java @@@ -25,35 -26,20 +26,53 @@@ import org.apache.cassandra.exceptions. public class InsertTest extends CQLTester { @Test + public void testInsertWithUnset() throws Throwable + { + createTable("CREATE TABLE %s (k int PRIMARY KEY, s text, i int)"); + + // insert using nulls + execute("INSERT INTO %s (k, s, i) VALUES (10, ?, ?)", "text", 10); + execute("INSERT INTO %s (k, s, i) VALUES (10, ?, ?)", null, null); + assertRows(execute("SELECT s, i FROM %s WHERE k = 10"), + row(null, null) // sending null deletes the data + ); + // insert using UNSET + execute("INSERT INTO %s (k, s, i) VALUES (11, ?, ?)", "text", 10); + execute("INSERT INTO %s (k, s, i) VALUES (11, ?, ?)", unset(), unset()); + assertRows(execute("SELECT s, i FROM %s WHERE k=11"), + row("text", 10) // unset columns does not delete the existing data + ); + + assertInvalidMessage("Invalid unset value for column k", "UPDATE %s SET i = 0 WHERE k = ?", unset()); + assertInvalidMessage("Invalid unset value for column k", "DELETE FROM %s WHERE k = ?", unset()); + assertInvalidMessage("Invalid unset value for argument in call to function blobasint", "SELECT * FROM %s WHERE k = blobAsInt(?)", unset()); + } + + @Test + public void testInsertTtlWithUnset() throws Throwable + { + createTable("CREATE TABLE %s (k int PRIMARY KEY, i int)"); + execute("INSERT INTO %s (k, i) VALUES (1, 1) USING TTL ?", unset()); // treat as 'unlimited' + assertRows(execute("SELECT ttl(i) FROM %s"), + row(new Object[]{ null }) + ); + } ++ ++ @Test + public void testOverlyLargeInsertPK() throws Throwable + { + createTable("CREATE TABLE %s (a text, b text, PRIMARY KEY ((a), b))"); + + assertInvalidThrow(InvalidRequestException.class, + "INSERT INTO %s (a, b) VALUES (?, 'foo')", new String(TOO_BIG.array())); + } + + @Test + public void testOverlyLargeInsertCK() throws Throwable + { + createTable("CREATE TABLE %s (a text, b text, PRIMARY KEY ((a), b))"); + + assertInvalidThrow(InvalidRequestException.class, + "INSERT INTO %s (a, b) VALUES ('foo', ?)", new String(TOO_BIG.array())); + } }