Support list index operation with conditions patch by slebresne; reviewed by thobbs for CASSANDRA-7499
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5db108c3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5db108c3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5db108c3 Branch: refs/heads/trunk Commit: 5db108c314fa5064669eefef8e5f6a52a1500b96 Parents: 2ea11c1 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Mon Aug 18 10:21:44 2014 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Mon Aug 18 10:22:40 2014 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + build.xml | 2 +- .../cql3/statements/BatchStatement.java | 23 +- .../cql3/statements/CQL3CasConditions.java | 203 -------------- .../cql3/statements/CQL3CasRequest.java | 268 +++++++++++++++++++ .../cql3/statements/ModificationStatement.java | 57 ++-- .../apache/cassandra/service/CASConditions.java | 39 --- .../apache/cassandra/service/CASRequest.java | 45 ++++ .../apache/cassandra/service/StorageProxy.java | 13 +- .../cassandra/thrift/CassandraServer.java | 16 +- 10 files changed, 353 insertions(+), 314 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4fa537b..cecf153 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.1 + * Support list index operations with conditions (CASSANDRA-7499) * Add max live/tombstoned cells to nodetool cfstats output (CASSANDRA-7731) * Validate IPv6 wildcard addresses properly (CASSANDRA-7680) * (cqlsh) Error when tracing query (CASSANDRA-7613) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 54f5f3d..d747bbc 100644 --- a/build.xml +++ b/build.xml @@ -25,7 +25,7 @@ <property name="debuglevel" value="source,lines,vars"/> <!-- default version and SCM information --> - <property name="base.version" value="2.1.0-rc6"/> + <property name="base.version" value="2.1.1"/> <property name="scm.connection" value="scm:git://git.apache.org/cassandra.git"/> <property name="scm.developerConnection" value="scm:git://git.apache.org/cassandra.git"/> <property name="scm.url" value="http://git-wip-us.apache.org/repos/asf?p=cassandra.git;a=tree"/> http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 90be914..17d1771 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -149,9 +149,6 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache throw new InvalidRequestException("Batch with conditions cannot span multiple tables"); ksName = stmt.keyspace(); cfName = stmt.columnFamily(); - - if (stmt.requiresRead()) - throw new InvalidRequestException("Operations using list indexes are not allowed with IF conditions"); } } } @@ -240,7 +237,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache * Checks batch size to ensure threshold is met. If not, a warning is logged. * @param cfs ColumnFamilies that will store the batch's mutations. */ - private void verifyBatchSize(Iterable<ColumnFamily> cfs) + public static void verifyBatchSize(Iterable<ColumnFamily> cfs) { long size = 0; long warnThreshold = DatabaseDescriptor.getBatchSizeWarnThreshold(); @@ -306,8 +303,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache ByteBuffer key = null; String ksName = null; String cfName = null; - ColumnFamily updates = null; - CQL3CasConditions conditions = null; + CQL3CasRequest casRequest = null; Set<ColumnDefinition> columnsWithConditions = new LinkedHashSet<>(); for (int i = 0; i < statements.size(); i++) @@ -323,8 +319,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache key = pks.get(0); ksName = statement.cfm.ksName; cfName = statement.cfm.cfName; - conditions = new CQL3CasConditions(statement.cfm, now); - updates = ArrayBackedSortedColumns.factory.create(statement.cfm); + casRequest = new CQL3CasRequest(statement.cfm, key, true); } else if (!key.equals(pks.get(0))) { @@ -334,22 +329,18 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache Composite clusteringPrefix = statement.createClusteringPrefix(statementOptions); if (statement.hasConditions()) { - statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementOptions, timestamp); + statement.addConditions(clusteringPrefix, casRequest, statementOptions); // As soon as we have a ifNotExists, we set columnsWithConditions to null so that everything is in the resultSet if (statement.hasIfNotExistCondition() || statement.hasIfExistCondition()) columnsWithConditions = null; else if (columnsWithConditions != null) Iterables.addAll(columnsWithConditions, statement.getColumnsWithConditions()); } - else - { - UpdateParameters params = statement.makeUpdateParameters(Collections.singleton(key), clusteringPrefix, statementOptions, false, now); - statement.addUpdateForKey(updates, key, clusteringPrefix, params); - } + casRequest.addRowUpdate(clusteringPrefix, statement, statementOptions, timestamp); } - verifyBatchSize(Collections.singleton(updates)); - ColumnFamily result = StorageProxy.cas(ksName, cfName, key, conditions, updates, options.getSerialConsistency(), options.getConsistency()); + ColumnFamily result = StorageProxy.cas(ksName, cfName, key, casRequest, options.getSerialConsistency(), options.getConsistency()); + return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true, options.forStatement(0))); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java deleted file mode 100644 index 8b5a403..0000000 --- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.cql3.statements; - -import java.nio.ByteBuffer; -import java.util.*; - -import org.apache.cassandra.cql3.*; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.filter.*; -import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.service.CASConditions; -import org.apache.cassandra.utils.Pair; - -/** - * Processed CAS conditions on potentially multiple rows of the same partition. - */ -public class CQL3CasConditions implements CASConditions -{ - private final CFMetaData cfm; - private final long now; - - // We index RowCondition by the prefix of the row they applied to for 2 reasons: - // 1) this allows to keep things sorted to build the ColumnSlice array below - // 2) this allows to detect when contradictory conditions are set (not exists with some other conditions on the same row) - private final SortedMap<Composite, RowCondition> conditions; - - public CQL3CasConditions(CFMetaData cfm, long now) - { - this.cfm = cfm; - // We will use now for Cell.isLive() which expects milliseconds but the argument is in microseconds. - this.now = now / 1000; - this.conditions = new TreeMap<>(cfm.comparator); - } - - public void addNotExist(Composite prefix) throws InvalidRequestException - { - RowCondition previous = conditions.put(prefix, new NotExistCondition(prefix, now)); - if (previous != null && !(previous instanceof NotExistCondition)) - { - // these should be prevented by the parser, but it doesn't hurt to check - if (previous instanceof ExistCondition) - throw new InvalidRequestException("Cannot mix IF EXISTS and IF NOT EXISTS conditions for the same row"); - else - throw new InvalidRequestException("Cannot mix IF conditions and IF NOT EXISTS for the same row"); - } - } - - public void addExist(Composite prefix) throws InvalidRequestException - { - RowCondition previous = conditions.put(prefix, new ExistCondition(prefix, now)); - // this should be prevented by the parser, but it doesn't hurt to check - if (previous != null && previous instanceof NotExistCondition) - throw new InvalidRequestException("Cannot mix IF EXISTS and IF NOT EXISTS conditions for the same row"); - } - - public void addConditions(Composite prefix, Collection<ColumnCondition> conds, QueryOptions options) throws InvalidRequestException - { - RowCondition condition = conditions.get(prefix); - if (condition == null) - { - condition = new ColumnsConditions(prefix, now); - conditions.put(prefix, condition); - } - else if (!(condition instanceof ColumnsConditions)) - { - throw new InvalidRequestException("Cannot mix IF conditions and IF NOT EXISTS for the same row"); - } - ((ColumnsConditions)condition).addConditions(conds, options); - } - - public IDiskAtomFilter readFilter() - { - assert !conditions.isEmpty(); - ColumnSlice[] slices = new ColumnSlice[conditions.size()]; - int i = 0; - // We always read CQL rows entirely as on CAS failure we want to be able to distinguish between "row exists - // but all values for which there were conditions are null" and "row doesn't exists", and we can't rely on the - // row marker for that (see #6623) - for (Composite prefix : conditions.keySet()) - slices[i++] = prefix.slice(); - - int toGroup = cfm.comparator.isDense() ? -1 : cfm.clusteringColumns().size(); - assert ColumnSlice.validateSlices(slices, cfm.comparator, false); - return new SliceQueryFilter(slices, false, slices.length, toGroup); - } - - public boolean appliesTo(ColumnFamily current) throws InvalidRequestException - { - for (RowCondition condition : conditions.values()) - { - if (!condition.appliesTo(current)) - return false; - } - return true; - } - - private static abstract class RowCondition - { - public final Composite rowPrefix; - protected final long now; - - protected RowCondition(Composite rowPrefix, long now) - { - this.rowPrefix = rowPrefix; - this.now = now; - } - - public abstract boolean appliesTo(ColumnFamily current) throws InvalidRequestException; - } - - private static class NotExistCondition extends RowCondition - { - private NotExistCondition(Composite rowPrefix, long now) - { - super(rowPrefix, now); - } - - public boolean appliesTo(ColumnFamily current) - { - if (current == null) - return true; - - Iterator<Cell> iter = current.iterator(new ColumnSlice[]{ rowPrefix.slice() }); - while (iter.hasNext()) - if (iter.next().isLive(now)) - return false; - return true; - } - } - - private static class ExistCondition extends RowCondition - { - private ExistCondition(Composite rowPrefix, long now) - { - super (rowPrefix, now); - } - - public boolean appliesTo(ColumnFamily current) - { - if (current == null) - return false; - - Iterator<Cell> iter = current.iterator(new ColumnSlice[]{ rowPrefix.slice() }); - while (iter.hasNext()) - if (iter.next().isLive(now)) - return true; - return false; - } - } - - private static class ColumnsConditions extends RowCondition - { - private final Map<Pair<ColumnIdentifier, ByteBuffer>, ColumnCondition.Bound> conditions = new HashMap<>(); - - private ColumnsConditions(Composite rowPrefix, long now) - { - super(rowPrefix, now); - } - - public void addConditions(Collection<ColumnCondition> conds, QueryOptions options) throws InvalidRequestException - { - for (ColumnCondition condition : conds) - { - // We will need the variables in appliesTo but with protocol batches, each condition in this object can have a - // different list of variables. - ColumnCondition.Bound current = condition.bind(options); - ColumnCondition.Bound previous = conditions.put(Pair.create(condition.column.name, current.getCollectionElementValue()), current); - // If 2 conditions are actually equal, let it slide - if (previous != null && !previous.equals(current)) - throw new InvalidRequestException("Duplicate and incompatible conditions for column " + condition.column.name); - } - } - - public boolean appliesTo(ColumnFamily current) throws InvalidRequestException - { - if (current == null) - return conditions.isEmpty(); - - for (ColumnCondition.Bound condition : conditions.values()) - if (!condition.appliesTo(rowPrefix, current, now)) - return false; - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java new file mode 100644 index 0000000..a85c1e5 --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements; + +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.cassandra.cql3.*; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.composites.Composite; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.service.CASRequest; +import org.apache.cassandra.utils.Pair; + +/** + * Processed CAS conditions and update on potentially multiple rows of the same partition. + */ +public class CQL3CasRequest implements CASRequest +{ + private final CFMetaData cfm; + private final ByteBuffer key; + private final long now; + private final boolean isBatch; + + // We index RowCondition by the prefix of the row they applied to for 2 reasons: + // 1) this allows to keep things sorted to build the ColumnSlice array below + // 2) this allows to detect when contradictory conditions are set (not exists with some other conditions on the same row) + private final SortedMap<Composite, RowCondition> conditions; + + private final List<RowUpdate> updates = new ArrayList<>(); + + public CQL3CasRequest(CFMetaData cfm, ByteBuffer key, boolean isBatch) + { + this.cfm = cfm; + // When checking if conditions apply, we want to use a fixed reference time for a whole request to check + // for expired cells. Note that this is unrelated to the cell timestamp. + this.now = System.currentTimeMillis(); + this.key = key; + this.conditions = new TreeMap<>(cfm.comparator); + this.isBatch = isBatch; + } + + public void addRowUpdate(Composite prefix, ModificationStatement stmt, QueryOptions options, long timestamp) + { + updates.add(new RowUpdate(prefix, stmt, options, timestamp)); + } + + public void addNotExist(Composite prefix) throws InvalidRequestException + { + RowCondition previous = conditions.put(prefix, new NotExistCondition(prefix, now)); + if (previous != null && !(previous instanceof NotExistCondition)) + { + // these should be prevented by the parser, but it doesn't hurt to check + if (previous instanceof ExistCondition) + throw new InvalidRequestException("Cannot mix IF EXISTS and IF NOT EXISTS conditions for the same row"); + else + throw new InvalidRequestException("Cannot mix IF conditions and IF NOT EXISTS for the same row"); + } + } + + public void addExist(Composite prefix) throws InvalidRequestException + { + RowCondition previous = conditions.put(prefix, new ExistCondition(prefix, now)); + // this should be prevented by the parser, but it doesn't hurt to check + if (previous != null && previous instanceof NotExistCondition) + throw new InvalidRequestException("Cannot mix IF EXISTS and IF NOT EXISTS conditions for the same row"); + } + + public void addConditions(Composite prefix, Collection<ColumnCondition> conds, QueryOptions options) throws InvalidRequestException + { + RowCondition condition = conditions.get(prefix); + if (condition == null) + { + condition = new ColumnsConditions(prefix, now); + conditions.put(prefix, condition); + } + else if (!(condition instanceof ColumnsConditions)) + { + throw new InvalidRequestException("Cannot mix IF conditions and IF NOT EXISTS for the same row"); + } + ((ColumnsConditions)condition).addConditions(conds, options); + } + + public IDiskAtomFilter readFilter() + { + assert !conditions.isEmpty(); + ColumnSlice[] slices = new ColumnSlice[conditions.size()]; + int i = 0; + // We always read CQL rows entirely as on CAS failure we want to be able to distinguish between "row exists + // but all values for which there were conditions are null" and "row doesn't exists", and we can't rely on the + // row marker for that (see #6623) + for (Composite prefix : conditions.keySet()) + slices[i++] = prefix.slice(); + + int toGroup = cfm.comparator.isDense() ? -1 : cfm.clusteringColumns().size(); + assert ColumnSlice.validateSlices(slices, cfm.comparator, false); + return new SliceQueryFilter(slices, false, slices.length, toGroup); + } + + public boolean appliesTo(ColumnFamily current) throws InvalidRequestException + { + for (RowCondition condition : conditions.values()) + { + if (!condition.appliesTo(current)) + return false; + } + return true; + } + + public ColumnFamily makeUpdates(ColumnFamily current) throws InvalidRequestException + { + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm); + for (RowUpdate upd : updates) + upd.applyUpdates(current, cf); + + if (isBatch) + BatchStatement.verifyBatchSize(Collections.singleton(cf)); + + return cf; + } + + /** + * Due to some operation on lists, we can't generate the update that a given Modification statement does before + * we get the values read by the initial read of Paxos. A RowUpdate thus just store the relevant information + * (include the statement iself) to generate those updates. We'll have multiple RowUpdate for a Batch, otherwise + * we'll have only one. + */ + private class RowUpdate + { + private final Composite rowPrefix; + private final ModificationStatement stmt; + private final QueryOptions options; + private final long timestamp; + + private RowUpdate(Composite rowPrefix, ModificationStatement stmt, QueryOptions options, long timestamp) + { + this.rowPrefix = rowPrefix; + this.stmt = stmt; + this.options = options; + this.timestamp = timestamp; + } + + public void applyUpdates(ColumnFamily current, ColumnFamily updates) throws InvalidRequestException + { + Map<ByteBuffer, CQL3Row> map = null; + if (stmt.requiresRead()) + { + // Uses the "current" values read by Paxos for lists operation that requires a read + Iterator<CQL3Row> iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(current.iterator(new ColumnSlice[]{ rowPrefix.slice() })); + if (iter.hasNext()) + { + map = Collections.singletonMap(key, iter.next()); + assert !iter.hasNext() : "We shoudn't be updating more than one CQL row per-ModificationStatement"; + } + } + + UpdateParameters params = new UpdateParameters(cfm, options, timestamp, stmt.getTimeToLive(options), map); + stmt.addUpdateForKey(updates, key, rowPrefix, params); + } + } + + private static abstract class RowCondition + { + public final Composite rowPrefix; + protected final long now; + + protected RowCondition(Composite rowPrefix, long now) + { + this.rowPrefix = rowPrefix; + this.now = now; + } + + public abstract boolean appliesTo(ColumnFamily current) throws InvalidRequestException; + } + + private static class NotExistCondition extends RowCondition + { + private NotExistCondition(Composite rowPrefix, long now) + { + super(rowPrefix, now); + } + + public boolean appliesTo(ColumnFamily current) + { + if (current == null) + return true; + + Iterator<Cell> iter = current.iterator(new ColumnSlice[]{ rowPrefix.slice() }); + while (iter.hasNext()) + if (iter.next().isLive(now)) + return false; + return true; + } + } + + private static class ExistCondition extends RowCondition + { + private ExistCondition(Composite rowPrefix, long now) + { + super (rowPrefix, now); + } + + public boolean appliesTo(ColumnFamily current) + { + if (current == null) + return false; + + Iterator<Cell> iter = current.iterator(new ColumnSlice[]{ rowPrefix.slice() }); + while (iter.hasNext()) + if (iter.next().isLive(now)) + return true; + return false; + } + } + + private static class ColumnsConditions extends RowCondition + { + private final Map<Pair<ColumnIdentifier, ByteBuffer>, ColumnCondition.Bound> conditions = new HashMap<>(); + + private ColumnsConditions(Composite rowPrefix, long now) + { + super(rowPrefix, now); + } + + public void addConditions(Collection<ColumnCondition> conds, QueryOptions options) throws InvalidRequestException + { + for (ColumnCondition condition : conds) + { + // We will need the variables in appliesTo but with protocol batches, each condition in this object can have a + // different list of variables. + ColumnCondition.Bound current = condition.bind(options); + ColumnCondition.Bound previous = conditions.put(Pair.create(condition.column.name, current.getCollectionElementValue()), current); + // If 2 conditions are actually equal, let it slide + if (previous != null && !previous.equals(current)) + throw new InvalidRequestException("Duplicate and incompatible conditions for column " + condition.column.name); + } + } + + public boolean appliesTo(ColumnFamily current) throws InvalidRequestException + { + if (current == null) + return conditions.isEmpty(); + + for (ColumnCondition.Bound condition : conditions.values()) + if (!condition.appliesTo(rowPrefix, current, now)) + return false; + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index fef0e94..774883d 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -147,14 +147,8 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF public void validate(ClientState state) throws InvalidRequestException { - if (hasConditions()) - { - if (attrs.isTimestampSet()) - throw new InvalidRequestException("Cannot provide custom timestamp for conditional updates"); - - if (requiresRead()) - throw new InvalidRequestException("Operations using list indexes are not allowed with IF conditions"); - } + if (hasConditions() && attrs.isTimestampSet()) + throw new InvalidRequestException("Cannot provide custom timestamp for conditional updates"); if (isCounter() && attrs.isTimestampSet()) throw new InvalidRequestException("Cannot provide custom timestamp for counter updates"); @@ -414,32 +408,20 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF public boolean requiresRead() { + // Lists SET operation incurs a read. for (Operation op : columnOperations) if (op.requiresRead()) return true; + return false; } protected Map<ByteBuffer, CQL3Row> readRequiredRows(Collection<ByteBuffer> partitionKeys, Composite clusteringPrefix, boolean local, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException { - // Lists SET operation incurs a read. - boolean requiresRead = false; - for (Operation op : columnOperations) - { - if (op.requiresRead()) - { - requiresRead = true; - break; - } - } - - return requiresRead ? readRows(partitionKeys, clusteringPrefix, cfm, local, cl) : null; - } + if (!requiresRead()) + return null; - protected Map<ByteBuffer, CQL3Row> readRows(Collection<ByteBuffer> partitionKeys, Composite rowPrefix, CFMetaData cfm, boolean local, ConsistencyLevel cl) - throws RequestExecutionException, RequestValidationException - { try { cl.validateForRead(keyspace()); @@ -449,7 +431,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF throw new InvalidRequestException(String.format("Write operation require a read but consistency %s is not supported on reads", cl)); } - ColumnSlice[] slices = new ColumnSlice[]{ rowPrefix.slice() }; + ColumnSlice[] slices = new ColumnSlice[]{ clusteringPrefix.slice() }; List<ReadCommand> commands = new ArrayList<ReadCommand>(partitionKeys.size()); long now = System.currentTimeMillis(); for (ByteBuffer key : partitionKeys) @@ -527,46 +509,41 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF throw new InvalidRequestException("IN on the partition key is not supported with conditional updates"); ByteBuffer key = keys.get(0); - long now = options.getTimestamp(queryState); - CQL3CasConditions conditions = new CQL3CasConditions(cfm, now); Composite prefix = createClusteringPrefix(options); - ColumnFamily updates = ArrayBackedSortedColumns.factory.create(cfm); - addUpdatesAndConditions(key, prefix, updates, conditions, options, getTimestamp(now, options)); + + CQL3CasRequest request = new CQL3CasRequest(cfm, key, false); + addConditions(prefix, request, options); + request.addRowUpdate(prefix, this, options, now); ColumnFamily result = StorageProxy.cas(keyspace(), columnFamily(), key, - conditions, - updates, + request, options.getSerialConsistency(), options.getConsistency()); return new ResultMessage.Rows(buildCasResultSet(key, result, options)); } - public void addUpdatesAndConditions(ByteBuffer key, Composite clusteringPrefix, ColumnFamily updates, CQL3CasConditions conditions, QueryOptions options, long now) - throws InvalidRequestException + public void addConditions(Composite clusteringPrefix, CQL3CasRequest request, QueryOptions options) throws InvalidRequestException { - UpdateParameters updParams = new UpdateParameters(cfm, options, now, getTimeToLive(options), null); - addUpdateForKey(updates, key, clusteringPrefix, updParams); - if (ifNotExists) { // If we use ifNotExists, if the statement applies to any non static columns, then the condition is on the row of the non-static // columns and the prefix should be the clusteringPrefix. But if only static columns are set, then the ifNotExists apply to the existence // of any static columns and we should use the prefix for the "static part" of the partition. - conditions.addNotExist(clusteringPrefix); + request.addNotExist(clusteringPrefix); } else if (ifExists) { - conditions.addExist(clusteringPrefix); + request.addExist(clusteringPrefix); } else { if (columnConditions != null) - conditions.addConditions(clusteringPrefix, columnConditions, options); + request.addConditions(clusteringPrefix, columnConditions, options); if (staticConditions != null) - conditions.addConditions(cfm.comparator.staticPrefix(), staticConditions, options); + request.addConditions(cfm.comparator.staticPrefix(), staticConditions, options); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/src/java/org/apache/cassandra/service/CASConditions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CASConditions.java b/src/java/org/apache/cassandra/service/CASConditions.java deleted file mode 100644 index c0a2111..0000000 --- a/src/java/org/apache/cassandra/service/CASConditions.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.exceptions.InvalidRequestException; - -/** - * Abstract the conditions to be fulfilled by a CAS operation. - */ -public interface CASConditions -{ - /** - * The filter to use to fetch the value to compare for the CAS. - */ - public IDiskAtomFilter readFilter(); - - /** - * Returns whether the provided CF, that represents the values fetched using the - * readFilter(), match the CAS conditions this object stands for. - */ - public boolean appliesTo(ColumnFamily current) throws InvalidRequestException; -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/src/java/org/apache/cassandra/service/CASRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CASRequest.java b/src/java/org/apache/cassandra/service/CASRequest.java new file mode 100644 index 0000000..3d86637 --- /dev/null +++ b/src/java/org/apache/cassandra/service/CASRequest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.filter.IDiskAtomFilter; +import org.apache.cassandra.exceptions.InvalidRequestException; + +/** + * Abstract the conditions and updates for a CAS operation. + */ +public interface CASRequest +{ + /** + * The filter to use to fetch the value to compare for the CAS. + */ + public IDiskAtomFilter readFilter(); + + /** + * Returns whether the provided CF, that represents the values fetched using the + * readFilter(), match the CAS conditions this object stands for. + */ + public boolean appliesTo(ColumnFamily current) throws InvalidRequestException; + + /** + * The updates to perform of a CAS success. The values fetched using the readFilter() + * are passed as argument. + */ + public ColumnFamily makeUpdates(ColumnFamily current) throws InvalidRequestException; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 62fc0d0..1c0c482 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -190,8 +190,7 @@ public class StorageProxy implements StorageProxyMBean * @param keyspaceName the keyspace for the CAS * @param cfName the column family for the CAS * @param key the row key for the row to CAS - * @param conditions the conditions for the CAS to apply. - * @param updates the value to insert if {@code condtions} matches the current values. + * @param request the conditions for the CAS to apply as well as the update to perform if the conditions hold. * @param consistencyForPaxos the consistency for the paxos prepare and propose round. This can only be either SERIAL or LOCAL_SERIAL. * @param consistencyForCommit the consistency for write done during the commit phase. This can be anything, except SERIAL or LOCAL_SERIAL. * @@ -201,8 +200,7 @@ public class StorageProxy implements StorageProxyMBean public static ColumnFamily cas(String keyspaceName, String cfName, ByteBuffer key, - CASConditions conditions, - ColumnFamily updates, + CASRequest request, ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit) throws UnavailableException, IsBootstrappingException, ReadTimeoutException, WriteTimeoutException, InvalidRequestException @@ -226,18 +224,19 @@ public class StorageProxy implements StorageProxyMBean // read the current values and check they validate the conditions Tracing.trace("Reading existing values for CAS precondition"); long timestamp = System.currentTimeMillis(); - ReadCommand readCommand = ReadCommand.create(keyspaceName, key, cfName, timestamp, conditions.readFilter()); + ReadCommand readCommand = ReadCommand.create(keyspaceName, key, cfName, timestamp, request.readFilter()); List<Row> rows = read(Arrays.asList(readCommand), consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM); ColumnFamily current = rows.get(0).cf; - if (!conditions.appliesTo(current)) + if (!request.appliesTo(current)) { - Tracing.trace("CAS precondition {} does not match current values {}", conditions, current); + Tracing.trace("CAS precondition does not match current values {}", current); // We should not return null as this means success return current == null ? ArrayBackedSortedColumns.factory.create(metadata) : current; } // finish the paxos round w/ the desired updates // TODO turn null updates into delete? + ColumnFamily updates = request.makeUpdates(current); // Apply triggers to cas updates. A consideration here is that // triggers emit Mutations, and so a given trigger implementation http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 33cd012..de0b125 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -63,7 +63,7 @@ import org.apache.cassandra.locator.DynamicEndpointSnitch; import org.apache.cassandra.metrics.ClientMetrics; import org.apache.cassandra.scheduler.IRequestScheduler; import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.service.CASConditions; +import org.apache.cassandra.service.CASRequest; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.service.StorageProxy; @@ -784,8 +784,7 @@ public class CassandraServer implements Cassandra.Iface ColumnFamily result = StorageProxy.cas(cState.getKeyspace(), column_family, key, - new ThriftCASConditions(cfExpected), - cfUpdates, + new ThriftCASRequest(cfExpected, cfUpdates), ThriftConversion.fromThrift(serial_consistency_level), ThriftConversion.fromThrift(commit_consistency_level)); return result == null @@ -2249,13 +2248,15 @@ public class CassandraServer implements Cassandra.Iface }); } - private static class ThriftCASConditions implements CASConditions + private static class ThriftCASRequest implements CASRequest { private final ColumnFamily expected; + private final ColumnFamily updates; - private ThriftCASConditions(ColumnFamily expected) + private ThriftCASRequest(ColumnFamily expected, ColumnFamily updates) { this.expected = expected; + this.updates = updates; } public IDiskAtomFilter readFilter() @@ -2300,10 +2301,9 @@ public class CassandraServer implements Cassandra.Iface return cf != null && !cf.hasOnlyTombstones(now); } - @Override - public String toString() + public ColumnFamily makeUpdates(ColumnFamily current) { - return expected.toString(); + return updates; } } }