Merge branch 'cassandra-1.2' into trunk Conflicts: doc/cql3/CQL.textile src/java/org/apache/cassandra/cql3/QueryProcessor.java src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java src/java/org/apache/cassandra/cql3/statements/SelectStatement.java src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1be575da Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1be575da Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1be575da Branch: refs/heads/trunk Commit: 1be575da0601fed1c84421d6cd602fcb0a90da08 Parents: 5c7b2e1 dc457c5 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Fri Jul 26 20:37:43 2013 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Fri Jul 26 20:37:43 2013 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + doc/cql3/CQL.textile | 10 +++-- src/java/org/apache/cassandra/cql3/Cql.g | 2 +- .../cql3/statements/ModificationStatement.java | 11 +++-- .../cql3/statements/SelectStatement.java | 47 +++++++++++--------- 5 files changed, 43 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1be575da/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1be575da/doc/cql3/CQL.textile ---------------------------------------------------------------------- diff --cc doc/cql3/CQL.textile index f63b6b8,fe64c71..dd76228 --- a/doc/cql3/CQL.textile +++ b/doc/cql3/CQL.textile @@@ -1077,13 -1052,10 +1077,17 @@@ h2(#changes). Change The following describes the addition/changes brought for each version of CQL. +h3. 3.1.0 + +* "ALTER TABLE":#alterTableStmt @DROP@ option has been reenabled for CQL3 tables and has new semantics now: the space formerly used by dropped columns will now be eventually reclaimed (post-compaction). You should not readd previously dropped columns unless you use timestamps with microsecond precision (see "CASSANDRA-3919":https://issues.apache.org/jira/browse/CASSANDRA-3919 for more details). +* @SELECT@ statement now supports aliases in select clause. Aliases in WHERE and ORDER BY clauses are not supported. See the "section on select"#selectStmt for details. +* @CREATE@ statements for @KEYSPACE@, @TABLE@ and @INDEX@ now supports an @IF NOT EXISTS@ condition. Similarly, @DROP@ statements support a @IF EXISTS@ condition. +* @INSERT@ statements optionally supports a @IF NOT EXISTS@ condition and @UDPATE@ supports @IF@ conditions. + + h3. 3.0.5 + + * SELECT, UPDATE, and DELETE statements now allow empty IN relations (see "CASSANDRA-5626":https://issues.apache.org/jira/browse/CASSANDRA-5626). + h3. 3.0.4 * Updated the syntax for custom "secondary indexes":#createIndexStmt. http://git-wip-us.apache.org/repos/asf/cassandra/blob/1be575da/src/java/org/apache/cassandra/cql3/Cql.g ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1be575da/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 0aa188d,5ede98f..d08d0ee --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@@ -109,175 -69,76 +109,175 @@@ public abstract class ModificationState public void validate(ClientState state) throws InvalidRequestException { - if (timeToLive < 0) - throw new InvalidRequestException("A TTL must be greater or equal to 0"); - - if (timeToLive > ExpiringColumn.MAX_TTL) - throw new InvalidRequestException(String.format("ttl is too large. requested (%d) maximum (%d)", timeToLive, ExpiringColumn.MAX_TTL)); } - protected abstract void validateConsistency(ConsistencyLevel cl) throws InvalidRequestException; + public void addOperation(Operation op) + { + columnOperations.add(op); + } - public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException + public List<Operation> getOperations() { - if (cl == null) - throw new InvalidRequestException("Invalid empty consistency level"); + return columnOperations; + } - validateConsistency(cl); + public void addCondition(Operation op) + { + if (columnConditions == null) + columnConditions = new ArrayList<Operation>(); - // The type should have been set by now or we have a bug - assert type != null; + columnConditions.add(op); + } - Collection<? extends IMutation> mutations = getMutations(variables, false, cl, queryState.getTimestamp()); - if (mutations.isEmpty()) - return null; + public void setIfNotExistCondition() + { + ifNotExists = true; + } - switch (type) - { - case LOGGED: - if (mutations.size() > 1) - StorageProxy.mutateAtomically((Collection<RowMutation>) mutations, cl); - else - StorageProxy.mutate(mutations, cl); - break; - case UNLOGGED: - case COUNTER: - StorageProxy.mutate(mutations, cl); - break; - default: - throw new AssertionError(); - } + private void addKeyValues(ColumnIdentifier name, List<Term> values) throws InvalidRequestException + { + if (processedKeys.put(name, values) != null) + throw new InvalidRequestException(String.format("Multiple definitions found for PRIMARY KEY part %s", name)); + } - return null; + public void addKeyValue(ColumnIdentifier name, Term value) throws InvalidRequestException + { + addKeyValues(name, Collections.singletonList(value)); } - public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException + public void processWhereClause(List<Relation> whereClause, ColumnSpecification[] names) throws InvalidRequestException { - for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp())) - mutation.apply(); - return null; + CFDefinition cfDef = cfm.getCfDef(); + for (Relation rel : whereClause) + { + CFDefinition.Name name = cfDef.get(rel.getEntity()); + if (name == null) + throw new InvalidRequestException(String.format("Unknown key identifier %s", rel.getEntity())); + + switch (name.kind) + { + case KEY_ALIAS: + case COLUMN_ALIAS: + List<Term.Raw> rawValues; + if (rel.operator() == Relation.Type.EQ) + rawValues = Collections.singletonList(rel.getValue()); + else if (name.kind == CFDefinition.Name.Kind.KEY_ALIAS && rel.operator() == Relation.Type.IN) + rawValues = rel.getInValues(); + else + throw new InvalidRequestException(String.format("Invalid operator %s for PRIMARY KEY part %s", rel.operator(), name)); + + List<Term> values = new ArrayList<Term>(rawValues.size()); + for (Term.Raw raw : rawValues) + { + Term t = raw.prepare(name); + t.collectMarkerSpecification(names); + values.add(t); + } + addKeyValues(name.name, values); + break; + case VALUE_ALIAS: + case COLUMN_METADATA: + throw new InvalidRequestException(String.format("Non PRIMARY KEY %s found in where clause", name)); + } + } } - public long getTimestamp(long now) + private List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables) + throws InvalidRequestException { - return timestamp == null ? now : timestamp; + CFDefinition cfDef = cfm.getCfDef(); + ColumnNameBuilder keyBuilder = cfDef.getKeyNameBuilder(); + List<ByteBuffer> keys = new ArrayList<ByteBuffer>(); + for (CFDefinition.Name name : cfDef.keys.values()) + { + List<Term> values = processedKeys.get(name.name); - if (values == null || values.isEmpty()) ++ if (values == null) + throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", name)); + + if (keyBuilder.remainingCount() == 1) + { + for (Term t : values) + { + ByteBuffer val = t.bindAndGet(variables); + if (val == null) + throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name)); + keys.add(keyBuilder.copy().add(val).build()); + } + } + else + { - if (values.size() > 1) ++ if (values.isEmpty() || values.size() > 1) + throw new InvalidRequestException("IN is only supported on the last column of the partition key"); + ByteBuffer val = values.get(0).bindAndGet(variables); + if (val == null) + throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name)); + keyBuilder.add(val); + } + } + return keys; } - public void setTimestamp(long timestamp) + private ColumnNameBuilder createClusteringPrefixBuilder(List<ByteBuffer> variables) + throws InvalidRequestException { - this.timestamp = timestamp; + CFDefinition cfDef = cfm.getCfDef(); + ColumnNameBuilder builder = cfDef.getColumnNameBuilder(); + CFDefinition.Name firstEmptyKey = null; + for (CFDefinition.Name name : cfDef.columns.values()) + { + List<Term> values = processedKeys.get(name.name); - if (values == null || values.isEmpty()) ++ if (values == null) + { + firstEmptyKey = name; + if (requireFullClusteringKey() && cfDef.isComposite && !cfDef.isCompact) + throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", name)); + } + else if (firstEmptyKey != null) + { + throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s since %s is set", firstEmptyKey.name, name.name)); + } + else + { + assert values.size() == 1; // We only allow IN for row keys so far + ByteBuffer val = values.get(0).bindAndGet(variables); + if (val == null) + throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", name)); + builder.add(val); + } + } + return builder; } - public boolean isSetTimestamp() + protected CFDefinition.Name getFirstEmptyKey() { - return timestamp != null; + for (CFDefinition.Name name : cfm.getCfDef().columns.values()) + { + List<Term> values = processedKeys.get(name.name); + if (values == null || values.isEmpty()) + return name; + } + return null; } - public int getTimeToLive() + protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl) + throws RequestExecutionException, RequestValidationException { - return timeToLive; + // Lists SET operation incurs a read. + Set<ByteBuffer> toRead = null; + for (Operation op : columnOperations) + { + if (op.requiresRead()) + { + if (toRead == null) + toRead = new TreeSet<ByteBuffer>(UTF8Type.instance); + toRead.add(op.columnName.key); + } + } + + return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead, (CompositeType)cfm.comparator, local, cl); } - protected Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> keys, ColumnNameBuilder builder, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl) + private Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException { try @@@ -329,121 -188,6 +329,124 @@@ return map; } + public boolean hasConditions() + { + return ifNotExists || (columnConditions != null && !columnConditions.isEmpty()); + } + + public ResultMessage execute(QueryState queryState, QueryOptions options) + throws RequestExecutionException, RequestValidationException + { + if (options.getConsistency() == null) + throw new InvalidRequestException("Invalid empty consistency level"); + + return hasConditions() + ? executeWithCondition(queryState, options) + : executeWithoutCondition(queryState, options); + } + + private ResultMessage executeWithoutCondition(QueryState queryState, QueryOptions options) + throws RequestExecutionException, RequestValidationException + { + ConsistencyLevel cl = options.getConsistency(); + if (isCounter()) + cl.validateCounterForWrite(cfm); + else + cl.validateForWrite(cfm.ksName); + - StorageProxy.mutateWithTriggers(getMutations(options.getValues(), false, cl, queryState.getTimestamp(), false), cl, false); ++ Collection<? extends IMutation> mutations = getMutations(options.getValues(), false, cl, queryState.getTimestamp(), false); ++ if (!mutations.isEmpty()) ++ StorageProxy.mutateWithTriggers(mutations, cl, false); ++ + return null; + } + + public ResultMessage executeWithCondition(QueryState queryState, QueryOptions options) + throws RequestExecutionException, RequestValidationException + { + List<ByteBuffer> variables = options.getValues(); + List<ByteBuffer> keys = buildPartitionKeyNames(variables); + // We don't support IN for CAS operation so far + if (keys.size() > 1) + throw new InvalidRequestException("IN on the partition key is not supported with conditional updates"); + + ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables); + UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(queryState.getTimestamp(), variables), getTimeToLive(variables), null); + + ByteBuffer key = keys.get(0); + ThriftValidation.validateKey(cfm, key); + + ColumnFamily updates = updateForKey(key, clusteringPrefix, params); + ColumnFamily expected = buildConditions(key, clusteringPrefix, params); + + ColumnFamily result = StorageProxy.cas(keyspace(), columnFamily(), key, clusteringPrefix, expected, updates, options.getConsistency(), options.getSerialConsistency()); + return new ResultMessage.Rows(buildCasResultSet(key, result)); + } + + private ResultSet buildCasResultSet(ByteBuffer key, ColumnFamily cf) throws InvalidRequestException + { + boolean success = cf == null; + + ColumnSpecification spec = new ColumnSpecification(keyspace(), columnFamily(), CAS_RESULT_COLUMN, BooleanType.instance); + ResultSet.Metadata metadata = new ResultSet.Metadata(Collections.singletonList(spec)); + List<List<ByteBuffer>> rows = Collections.singletonList(Collections.singletonList(BooleanType.instance.decompose(success))); + + ResultSet rs = new ResultSet(metadata, rows); + return success ? rs : merge(rs, buildCasFailureResultSet(key, cf)); + } + + private static ResultSet merge(ResultSet left, ResultSet right) + { + if (left.size() == 0) + return right; + else if (right.size() == 0) + return left; + + assert left.size() == 1 && right.size() == 1; + int size = left.metadata.names.size() + right.metadata.names.size(); + List<ColumnSpecification> specs = new ArrayList<ColumnSpecification>(size); + specs.addAll(left.metadata.names); + specs.addAll(right.metadata.names); + List<ByteBuffer> row = new ArrayList<ByteBuffer>(size); + row.addAll(left.rows.get(0)); + row.addAll(right.rows.get(0)); + return new ResultSet(new ResultSet.Metadata(specs), Collections.singletonList(row)); + } + + private ResultSet buildCasFailureResultSet(ByteBuffer key, ColumnFamily cf) throws InvalidRequestException + { + CFDefinition cfDef = cfm.getCfDef(); + + Selection selection; + if (ifNotExists) + { + selection = Selection.wildcard(cfDef); + } + else + { + List<CFDefinition.Name> names = new ArrayList<CFDefinition.Name>(columnConditions.size()); + for (Operation condition : columnConditions) + names.add(cfDef.get(condition.columnName)); + selection = Selection.forColumns(names); + } + + long now = System.currentTimeMillis(); + Selection.ResultSetBuilder builder = selection.resultSetBuilder(now); + SelectStatement.forSelection(cfDef, selection).processColumnFamily(key, cf, Collections.<ByteBuffer>emptyList(), Integer.MAX_VALUE, now, builder); + + return builder.build(); + } + + public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException + { + if (hasConditions()) + throw new UnsupportedOperationException(); + + for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp(), false)) + mutation.apply(); + return null; + } + /** * Convert statement into a list of mutations to apply on the server * http://git-wip-us.apache.org/repos/asf/cassandra/blob/1be575da/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 8b9f7df,4f559e5..fbabf2f --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@@ -268,25 -198,50 +268,29 @@@ public class SelectStatement implement return cfDef.cfm.cfName; } - private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables) throws RequestValidationException + private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables, int limit, long now) throws RequestValidationException { - QueryPath queryPath = new QueryPath(columnFamily()); - Collection<ByteBuffer> keys = getKeys(variables); + if (keys.isEmpty()) // in case of IN () for (the last column of) the partition key. + return null; + List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size()); - // ...a range (slice) of column names - if (isColumnRange()) - { - // Note that we use the total limit for every key. This is - // potentially inefficient, but then again, IN + LIMIT is not a - // very sensible choice - for (ByteBuffer key : keys) - { - QueryProcessor.validateKey(key); - // Note that we should not share the slice filter amongst the command, due to SliceQueryFilter not - // being immutable due to its columnCounter used by the lastCounted() method - // (this is fairly ugly and we should change that but that's probably not a tiny refactor to do that cleanly) - SliceQueryFilter filter = (SliceQueryFilter)makeFilter(variables); - if (filter == null) - return null; + IDiskAtomFilter filter = makeFilter(variables, limit); + if (filter == null) + return null; - commands.add(new SliceFromReadCommand(keyspace(), key, queryPath, filter)); - } - } - // ...of a list of column names - else + // Note that we use the total limit for every key, which is potentially inefficient. + // However, IN + LIMIT is not a very sensible choice. + for (ByteBuffer key : keys) { - // ByNames commands can share the filter - IDiskAtomFilter filter = makeFilter(variables); - if (filter == null) - return null; - - for (ByteBuffer key: keys) - { - QueryProcessor.validateKey(key); - commands.add(new SliceByNamesReadCommand(keyspace(), key, queryPath, (NamesQueryFilter)filter)); - } + QueryProcessor.validateKey(key); + // We should not share the slice filter amongst the commands (hence the cloneShallow), due to + // SliceQueryFilter not being immutable due to its columnCounter used by the lastCounted() method + // (this is fairly ugly and we should change that but that's probably not a tiny refactor to do that cleanly) + commands.add(ReadCommand.create(keyspace(), key, columnFamily(), now, filter.cloneShallow())); } + return commands; } @@@ -714,39 -661,23 +722,37 @@@ private List<IndexExpression> getIndexExpressions(List<ByteBuffer> variables) throws InvalidRequestException { - if (metadataRestrictions.isEmpty()) - return Collections.<IndexExpression>emptyList(); + if (!usesSecondaryIndexing || restrictedNames.isEmpty()) + return Collections.emptyList(); List<IndexExpression> expressions = new ArrayList<IndexExpression>(); - for (Map.Entry<CFDefinition.Name, Restriction> entry : metadataRestrictions.entrySet()) + for (CFDefinition.Name name : restrictedNames) { - CFDefinition.Name name = entry.getKey(); - Restriction restriction = entry.getValue(); + Restriction restriction; + switch (name.kind) + { + case KEY_ALIAS: + restriction = keyRestrictions[name.position]; + break; + case COLUMN_ALIAS: + restriction = columnRestrictions[name.position]; + break; + case COLUMN_METADATA: + restriction = metadataRestrictions.get(name); + break; + default: + // We don't allow restricting a VALUE_ALIAS for now in prepare. + throw new AssertionError(); + } if (restriction.isEquality()) { - for (Term t : restriction.eqValues) - { - ByteBuffer value = t.bindAndGet(variables); - if (value == null) - throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", name)); - if (value.remaining() > 0xFFFF) - throw new InvalidRequestException("Index expression values may not be larger than 64K"); - expressions.add(new IndexExpression(name.name.key, IndexOperator.EQ, value)); - } + assert restriction.eqValues.size() == 1; // IN is not supported for indexed columns. + ByteBuffer value = restriction.eqValues.get(0).bindAndGet(variables); + if (value == null) + throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", name)); + if (value.remaining() > 0xFFFF) + throw new InvalidRequestException("Index expression values may not be larger than 64K"); + expressions.add(new IndexExpression(name.name.key, IndexOperator.EQ, value)); } else { @@@ -1106,12 -1047,7 +1112,11 @@@ * At this point, the select statement if fully constructed, but we still have a few things to validate */ - // If a component of the PRIMARY KEY is restricted by a non-EQ relation, all preceding + // If there is a queriable index, no special condition are required on the other restrictions. + // But we still need to know 2 things: + // - If we don't have a queriable index, is the query ok + // - Is it queriable without 2ndary index, which is always more efficient - + // If a component of the partition key is restricted by a non-EQ relation, all preceding // components must have a EQ, and all following must have no restriction boolean shouldBeDone = false; CFDefinition.Name previous = null; @@@ -1186,52 -1139,16 +1188,52 @@@ previous = cname; } - // Deal with indexed columns - if (!stmt.metadataRestrictions.isEmpty()) + // If a cluster key column is restricted by a non-EQ relation, all preceding + // columns must have a EQ, and all following must have no restriction. Unless + // the column is indexed that is. + shouldBeDone = false; + previous = null; + iter = cfDef.columns.values().iterator(); + int lastRestrictedClusteringKey = stmt.columnRestrictions.length - 1; + for (int i = 0; i < stmt.columnRestrictions.length; i++) { - stmt.isKeyRange = true; - Set<ByteBuffer> indexedNames = new HashSet<ByteBuffer>(); - for (ColumnDefinition cfdef : cfm.getColumn_metadata().values()) + CFDefinition.Name cname = iter.next(); + Restriction restriction = stmt.columnRestrictions[i]; + + if (restriction == null) + { + if (!shouldBeDone) + lastRestrictedClusteringKey = i - 1; + shouldBeDone = true; + } + else { - if (cfdef.getIndexType() != null) + if (shouldBeDone) + { + if (hasQueriableIndex) + { + stmt.usesSecondaryIndexing = true; + break; + } + throw new InvalidRequestException(String.format("PRIMARY KEY part %s cannot be restricted (preceding part %s is either not restricted or by a non-EQ relation)", cname, previous)); + } + else if (!restriction.isEquality()) + { + lastRestrictedClusteringKey = i; + shouldBeDone = true; + // For non-composite slices, we don't support internally the difference between exclusive and + // inclusive bounds, so we deal with it manually. + if (!cfDef.isComposite && (!restriction.isInclusive(Bound.START) || !restriction.isInclusive(Bound.END))) + stmt.sliceRestriction = restriction; + } + // We only support IN for the last name and for compact storage so far + // TODO: #3885 allows us to extend to non compact as well, but that remains to be done - else if (restriction.eqValues.size() > 1) ++ else if (restriction.isINRestriction()) { - indexedNames.add(cfdef.name); + if (i != stmt.columnRestrictions.length - 1) + throw new InvalidRequestException(String.format("PRIMARY KEY part %s cannot be restricted by IN relation", cname)); + else if (stmt.selectACollection()) + throw new InvalidRequestException(String.format("Cannot restrict PRIMARY KEY part %s by IN relation as a collection is selected by the query", cname)); } }