Support list index operation with conditions

patch by slebresne; reviewed by thobbs for CASSANDRA-7499


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5db108c3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5db108c3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5db108c3

Branch: refs/heads/trunk
Commit: 5db108c314fa5064669eefef8e5f6a52a1500b96
Parents: 2ea11c1
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Mon Aug 18 10:21:44 2014 +0200
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Mon Aug 18 10:22:40 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 build.xml                                       |   2 +-
 .../cql3/statements/BatchStatement.java         |  23 +-
 .../cql3/statements/CQL3CasConditions.java      | 203 --------------
 .../cql3/statements/CQL3CasRequest.java         | 268 +++++++++++++++++++
 .../cql3/statements/ModificationStatement.java  |  57 ++--
 .../apache/cassandra/service/CASConditions.java |  39 ---
 .../apache/cassandra/service/CASRequest.java    |  45 ++++
 .../apache/cassandra/service/StorageProxy.java  |  13 +-
 .../cassandra/thrift/CassandraServer.java       |  16 +-
 10 files changed, 353 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4fa537b..cecf153 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.1
+ * Support list index operations with conditions (CASSANDRA-7499)
  * Add max live/tombstoned cells to nodetool cfstats output (CASSANDRA-7731)
  * Validate IPv6 wildcard addresses properly (CASSANDRA-7680)
  * (cqlsh) Error when tracing query (CASSANDRA-7613)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 54f5f3d..d747bbc 100644
--- a/build.xml
+++ b/build.xml
@@ -25,7 +25,7 @@
     <property name="debuglevel" value="source,lines,vars"/>
 
     <!-- default version and SCM information -->
-    <property name="base.version" value="2.1.0-rc6"/>
+    <property name="base.version" value="2.1.1"/>
     <property name="scm.connection" 
value="scm:git://git.apache.org/cassandra.git"/>
     <property name="scm.developerConnection" 
value="scm:git://git.apache.org/cassandra.git"/>
     <property name="scm.url" 
value="http://git-wip-us.apache.org/repos/asf?p=cassandra.git;a=tree"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 90be914..17d1771 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -149,9 +149,6 @@ public class BatchStatement implements CQLStatement, 
MeasurableForPreparedCache
                     throw new InvalidRequestException("Batch with conditions 
cannot span multiple tables");
                 ksName = stmt.keyspace();
                 cfName = stmt.columnFamily();
-
-                if (stmt.requiresRead())
-                    throw new InvalidRequestException("Operations using list 
indexes are not allowed with IF conditions");
             }
         }
     }
@@ -240,7 +237,7 @@ public class BatchStatement implements CQLStatement, 
MeasurableForPreparedCache
      * Checks batch size to ensure threshold is met. If not, a warning is 
logged.
      * @param cfs ColumnFamilies that will store the batch's mutations.
      */
-    private void verifyBatchSize(Iterable<ColumnFamily> cfs)
+    public static void verifyBatchSize(Iterable<ColumnFamily> cfs)
     {
         long size = 0;
         long warnThreshold = DatabaseDescriptor.getBatchSizeWarnThreshold();
@@ -306,8 +303,7 @@ public class BatchStatement implements CQLStatement, 
MeasurableForPreparedCache
         ByteBuffer key = null;
         String ksName = null;
         String cfName = null;
-        ColumnFamily updates = null;
-        CQL3CasConditions conditions = null;
+        CQL3CasRequest casRequest = null;
         Set<ColumnDefinition> columnsWithConditions = new LinkedHashSet<>();
 
         for (int i = 0; i < statements.size(); i++)
@@ -323,8 +319,7 @@ public class BatchStatement implements CQLStatement, 
MeasurableForPreparedCache
                 key = pks.get(0);
                 ksName = statement.cfm.ksName;
                 cfName = statement.cfm.cfName;
-                conditions = new CQL3CasConditions(statement.cfm, now);
-                updates = 
ArrayBackedSortedColumns.factory.create(statement.cfm);
+                casRequest = new CQL3CasRequest(statement.cfm, key, true);
             }
             else if (!key.equals(pks.get(0)))
             {
@@ -334,22 +329,18 @@ public class BatchStatement implements CQLStatement, 
MeasurableForPreparedCache
             Composite clusteringPrefix = 
statement.createClusteringPrefix(statementOptions);
             if (statement.hasConditions())
             {
-                statement.addUpdatesAndConditions(key, clusteringPrefix, 
updates, conditions, statementOptions, timestamp);
+                statement.addConditions(clusteringPrefix, casRequest, 
statementOptions);
                 // As soon as we have a ifNotExists, we set 
columnsWithConditions to null so that everything is in the resultSet
                 if (statement.hasIfNotExistCondition() || 
statement.hasIfExistCondition())
                     columnsWithConditions = null;
                 else if (columnsWithConditions != null)
                     Iterables.addAll(columnsWithConditions, 
statement.getColumnsWithConditions());
             }
-            else
-            {
-                UpdateParameters params = 
statement.makeUpdateParameters(Collections.singleton(key), clusteringPrefix, 
statementOptions, false, now);
-                statement.addUpdateForKey(updates, key, clusteringPrefix, 
params);
-            }
+            casRequest.addRowUpdate(clusteringPrefix, statement, 
statementOptions, timestamp);
         }
 
-        verifyBatchSize(Collections.singleton(updates));
-        ColumnFamily result = StorageProxy.cas(ksName, cfName, key, 
conditions, updates, options.getSerialConsistency(), options.getConsistency());
+        ColumnFamily result = StorageProxy.cas(ksName, cfName, key, 
casRequest, options.getSerialConsistency(), options.getConsistency());
+
         return new 
ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, 
result, columnsWithConditions, true, options.forStatement(0)));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java 
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
deleted file mode 100644
index 8b5a403..0000000
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.cql3.statements;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.service.CASConditions;
-import org.apache.cassandra.utils.Pair;
-
-/**
- * Processed CAS conditions on potentially multiple rows of the same partition.
- */
-public class CQL3CasConditions implements CASConditions
-{
-    private final CFMetaData cfm;
-    private final long now;
-
-    // We index RowCondition by the prefix of the row they applied to for 2 
reasons:
-    //   1) this allows to keep things sorted to build the ColumnSlice array 
below
-    //   2) this allows to detect when contradictory conditions are set (not 
exists with some other conditions on the same row)
-    private final SortedMap<Composite, RowCondition> conditions;
-
-    public CQL3CasConditions(CFMetaData cfm, long now)
-    {
-        this.cfm = cfm;
-        // We will use now for Cell.isLive() which expects milliseconds but 
the argument is in microseconds.
-        this.now = now / 1000;
-        this.conditions = new TreeMap<>(cfm.comparator);
-    }
-
-    public void addNotExist(Composite prefix) throws InvalidRequestException
-    {
-        RowCondition previous = conditions.put(prefix, new 
NotExistCondition(prefix, now));
-        if (previous != null && !(previous instanceof NotExistCondition))
-        {
-            // these should be prevented by the parser, but it doesn't hurt to 
check
-            if (previous instanceof ExistCondition)
-                throw new InvalidRequestException("Cannot mix IF EXISTS and IF 
NOT EXISTS conditions for the same row");
-            else
-                throw new InvalidRequestException("Cannot mix IF conditions 
and IF NOT EXISTS for the same row");
-        }
-    }
-
-    public void addExist(Composite prefix) throws InvalidRequestException
-    {
-        RowCondition previous = conditions.put(prefix, new 
ExistCondition(prefix, now));
-        // this should be prevented by the parser, but it doesn't hurt to check
-        if (previous != null && previous instanceof NotExistCondition)
-            throw new InvalidRequestException("Cannot mix IF EXISTS and IF NOT 
EXISTS conditions for the same row");
-    }
-
-    public void addConditions(Composite prefix, Collection<ColumnCondition> 
conds, QueryOptions options) throws InvalidRequestException
-    {
-        RowCondition condition = conditions.get(prefix);
-        if (condition == null)
-        {
-            condition = new ColumnsConditions(prefix, now);
-            conditions.put(prefix, condition);
-        }
-        else if (!(condition instanceof ColumnsConditions))
-        {
-            throw new InvalidRequestException("Cannot mix IF conditions and IF 
NOT EXISTS for the same row");
-        }
-        ((ColumnsConditions)condition).addConditions(conds, options);
-    }
-
-    public IDiskAtomFilter readFilter()
-    {
-        assert !conditions.isEmpty();
-        ColumnSlice[] slices = new ColumnSlice[conditions.size()];
-        int i = 0;
-        // We always read CQL rows entirely as on CAS failure we want to be 
able to distinguish between "row exists
-        // but all values for which there were conditions are null" and "row 
doesn't exists", and we can't rely on the
-        // row marker for that (see #6623)
-        for (Composite prefix : conditions.keySet())
-            slices[i++] = prefix.slice();
-
-        int toGroup = cfm.comparator.isDense() ? -1 : 
cfm.clusteringColumns().size();
-        assert ColumnSlice.validateSlices(slices, cfm.comparator, false);
-        return new SliceQueryFilter(slices, false, slices.length, toGroup);
-    }
-
-    public boolean appliesTo(ColumnFamily current) throws 
InvalidRequestException
-    {
-        for (RowCondition condition : conditions.values())
-        {
-            if (!condition.appliesTo(current))
-                return false;
-        }
-        return true;
-    }
-
-    private static abstract class RowCondition
-    {
-        public final Composite rowPrefix;
-        protected final long now;
-
-        protected RowCondition(Composite rowPrefix, long now)
-        {
-            this.rowPrefix = rowPrefix;
-            this.now = now;
-        }
-
-        public abstract boolean appliesTo(ColumnFamily current) throws 
InvalidRequestException;
-    }
-
-    private static class NotExistCondition extends RowCondition
-    {
-        private NotExistCondition(Composite rowPrefix, long now)
-        {
-            super(rowPrefix, now);
-        }
-
-        public boolean appliesTo(ColumnFamily current)
-        {
-            if (current == null)
-                return true;
-
-            Iterator<Cell> iter = current.iterator(new ColumnSlice[]{ 
rowPrefix.slice() });
-            while (iter.hasNext())
-                if (iter.next().isLive(now))
-                    return false;
-            return true;
-        }
-    }
-
-    private static class ExistCondition extends RowCondition
-    {
-        private ExistCondition(Composite rowPrefix, long now)
-        {
-            super (rowPrefix, now);
-        }
-
-        public boolean appliesTo(ColumnFamily current)
-        {
-            if (current == null)
-                return false;
-
-            Iterator<Cell> iter = current.iterator(new ColumnSlice[]{ 
rowPrefix.slice() });
-            while (iter.hasNext())
-                if (iter.next().isLive(now))
-                    return true;
-            return false;
-        }
-    }
-
-    private static class ColumnsConditions extends RowCondition
-    {
-        private final Map<Pair<ColumnIdentifier, ByteBuffer>, 
ColumnCondition.Bound> conditions = new HashMap<>();
-
-        private ColumnsConditions(Composite rowPrefix, long now)
-        {
-            super(rowPrefix, now);
-        }
-
-        public void addConditions(Collection<ColumnCondition> conds, 
QueryOptions options) throws InvalidRequestException
-        {
-            for (ColumnCondition condition : conds)
-            {
-                // We will need the variables in appliesTo but with protocol 
batches, each condition in this object can have a
-                // different list of variables.
-                ColumnCondition.Bound current = condition.bind(options);
-                ColumnCondition.Bound previous = 
conditions.put(Pair.create(condition.column.name, 
current.getCollectionElementValue()), current);
-                // If 2 conditions are actually equal, let it slide
-                if (previous != null && !previous.equals(current))
-                    throw new InvalidRequestException("Duplicate and 
incompatible conditions for column " + condition.column.name);
-            }
-        }
-
-        public boolean appliesTo(ColumnFamily current) throws 
InvalidRequestException
-        {
-            if (current == null)
-                return conditions.isEmpty();
-
-            for (ColumnCondition.Bound condition : conditions.values())
-                if (!condition.appliesTo(rowPrefix, current, now))
-                    return false;
-            return true;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java 
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
new file mode 100644
index 0000000..a85c1e5
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.CASRequest;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * Processed CAS conditions and update on potentially multiple rows of the 
same partition.
+ */
+public class CQL3CasRequest implements CASRequest
+{
+    private final CFMetaData cfm;
+    private final ByteBuffer key;
+    private final long now;
+    private final boolean isBatch;
+
+    // We index RowCondition by the prefix of the row they applied to for 2 
reasons:
+    //   1) this allows to keep things sorted to build the ColumnSlice array 
below
+    //   2) this allows to detect when contradictory conditions are set (not 
exists with some other conditions on the same row)
+    private final SortedMap<Composite, RowCondition> conditions;
+
+    private final List<RowUpdate> updates = new ArrayList<>();
+
+    public CQL3CasRequest(CFMetaData cfm, ByteBuffer key, boolean isBatch)
+    {
+        this.cfm = cfm;
+        // When checking if conditions apply, we want to use a fixed reference 
time for a whole request to check
+        // for expired cells. Note that this is unrelated to the cell 
timestamp.
+        this.now = System.currentTimeMillis();
+        this.key = key;
+        this.conditions = new TreeMap<>(cfm.comparator);
+        this.isBatch = isBatch;
+    }
+
+    public void addRowUpdate(Composite prefix, ModificationStatement stmt, 
QueryOptions options, long timestamp)
+    {
+        updates.add(new RowUpdate(prefix, stmt, options, timestamp));
+    }
+
+    public void addNotExist(Composite prefix) throws InvalidRequestException
+    {
+        RowCondition previous = conditions.put(prefix, new 
NotExistCondition(prefix, now));
+        if (previous != null && !(previous instanceof NotExistCondition))
+        {
+            // these should be prevented by the parser, but it doesn't hurt to 
check
+            if (previous instanceof ExistCondition)
+                throw new InvalidRequestException("Cannot mix IF EXISTS and IF 
NOT EXISTS conditions for the same row");
+            else
+                throw new InvalidRequestException("Cannot mix IF conditions 
and IF NOT EXISTS for the same row");
+        }
+    }
+
+    public void addExist(Composite prefix) throws InvalidRequestException
+    {
+        RowCondition previous = conditions.put(prefix, new 
ExistCondition(prefix, now));
+        // this should be prevented by the parser, but it doesn't hurt to check
+        if (previous != null && previous instanceof NotExistCondition)
+            throw new InvalidRequestException("Cannot mix IF EXISTS and IF NOT 
EXISTS conditions for the same row");
+    }
+
+    public void addConditions(Composite prefix, Collection<ColumnCondition> 
conds, QueryOptions options) throws InvalidRequestException
+    {
+        RowCondition condition = conditions.get(prefix);
+        if (condition == null)
+        {
+            condition = new ColumnsConditions(prefix, now);
+            conditions.put(prefix, condition);
+        }
+        else if (!(condition instanceof ColumnsConditions))
+        {
+            throw new InvalidRequestException("Cannot mix IF conditions and IF 
NOT EXISTS for the same row");
+        }
+        ((ColumnsConditions)condition).addConditions(conds, options);
+    }
+
+    public IDiskAtomFilter readFilter()
+    {
+        assert !conditions.isEmpty();
+        ColumnSlice[] slices = new ColumnSlice[conditions.size()];
+        int i = 0;
+        // We always read CQL rows entirely as on CAS failure we want to be 
able to distinguish between "row exists
+        // but all values for which there were conditions are null" and "row 
doesn't exists", and we can't rely on the
+        // row marker for that (see #6623)
+        for (Composite prefix : conditions.keySet())
+            slices[i++] = prefix.slice();
+
+        int toGroup = cfm.comparator.isDense() ? -1 : 
cfm.clusteringColumns().size();
+        assert ColumnSlice.validateSlices(slices, cfm.comparator, false);
+        return new SliceQueryFilter(slices, false, slices.length, toGroup);
+    }
+
+    public boolean appliesTo(ColumnFamily current) throws 
InvalidRequestException
+    {
+        for (RowCondition condition : conditions.values())
+        {
+            if (!condition.appliesTo(current))
+                return false;
+        }
+        return true;
+    }
+
+    public ColumnFamily makeUpdates(ColumnFamily current) throws 
InvalidRequestException
+    {
+        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
+        for (RowUpdate upd : updates)
+            upd.applyUpdates(current, cf);
+
+        if (isBatch)
+            BatchStatement.verifyBatchSize(Collections.singleton(cf));
+
+        return cf;
+    }
+
+    /**
+     * Due to some operation on lists, we can't generate the update that a 
given Modification statement does before
+     * we get the values read by the initial read of Paxos. A RowUpdate thus 
just store the relevant information
+     * (include the statement iself) to generate those updates. We'll have 
multiple RowUpdate for a Batch, otherwise
+     * we'll have only one.
+     */
+    private class RowUpdate
+    {
+        private final Composite rowPrefix;
+        private final ModificationStatement stmt;
+        private final QueryOptions options;
+        private final long timestamp;
+
+        private RowUpdate(Composite rowPrefix, ModificationStatement stmt, 
QueryOptions options, long timestamp)
+        {
+            this.rowPrefix = rowPrefix;
+            this.stmt = stmt;
+            this.options = options;
+            this.timestamp = timestamp;
+        }
+
+        public void applyUpdates(ColumnFamily current, ColumnFamily updates) 
throws InvalidRequestException
+        {
+            Map<ByteBuffer, CQL3Row> map = null;
+            if (stmt.requiresRead())
+            {
+                // Uses the "current" values read by Paxos for lists operation 
that requires a read
+                Iterator<CQL3Row> iter = cfm.comparator.CQL3RowBuilder(cfm, 
now).group(current.iterator(new ColumnSlice[]{ rowPrefix.slice() }));
+                if (iter.hasNext())
+                {
+                    map = Collections.singletonMap(key, iter.next());
+                    assert !iter.hasNext() : "We shoudn't be updating more 
than one CQL row per-ModificationStatement";
+                }
+            }
+
+            UpdateParameters params = new UpdateParameters(cfm, options, 
timestamp, stmt.getTimeToLive(options), map);
+            stmt.addUpdateForKey(updates, key, rowPrefix, params);
+        }
+    }
+
+    private static abstract class RowCondition
+    {
+        public final Composite rowPrefix;
+        protected final long now;
+
+        protected RowCondition(Composite rowPrefix, long now)
+        {
+            this.rowPrefix = rowPrefix;
+            this.now = now;
+        }
+
+        public abstract boolean appliesTo(ColumnFamily current) throws 
InvalidRequestException;
+    }
+
+    private static class NotExistCondition extends RowCondition
+    {
+        private NotExistCondition(Composite rowPrefix, long now)
+        {
+            super(rowPrefix, now);
+        }
+
+        public boolean appliesTo(ColumnFamily current)
+        {
+            if (current == null)
+                return true;
+
+            Iterator<Cell> iter = current.iterator(new ColumnSlice[]{ 
rowPrefix.slice() });
+            while (iter.hasNext())
+                if (iter.next().isLive(now))
+                    return false;
+            return true;
+        }
+    }
+
+    private static class ExistCondition extends RowCondition
+    {
+        private ExistCondition(Composite rowPrefix, long now)
+        {
+            super (rowPrefix, now);
+        }
+
+        public boolean appliesTo(ColumnFamily current)
+        {
+            if (current == null)
+                return false;
+
+            Iterator<Cell> iter = current.iterator(new ColumnSlice[]{ 
rowPrefix.slice() });
+            while (iter.hasNext())
+                if (iter.next().isLive(now))
+                    return true;
+            return false;
+        }
+    }
+
+    private static class ColumnsConditions extends RowCondition
+    {
+        private final Map<Pair<ColumnIdentifier, ByteBuffer>, 
ColumnCondition.Bound> conditions = new HashMap<>();
+
+        private ColumnsConditions(Composite rowPrefix, long now)
+        {
+            super(rowPrefix, now);
+        }
+
+        public void addConditions(Collection<ColumnCondition> conds, 
QueryOptions options) throws InvalidRequestException
+        {
+            for (ColumnCondition condition : conds)
+            {
+                // We will need the variables in appliesTo but with protocol 
batches, each condition in this object can have a
+                // different list of variables.
+                ColumnCondition.Bound current = condition.bind(options);
+                ColumnCondition.Bound previous = 
conditions.put(Pair.create(condition.column.name, 
current.getCollectionElementValue()), current);
+                // If 2 conditions are actually equal, let it slide
+                if (previous != null && !previous.equals(current))
+                    throw new InvalidRequestException("Duplicate and 
incompatible conditions for column " + condition.column.name);
+            }
+        }
+
+        public boolean appliesTo(ColumnFamily current) throws 
InvalidRequestException
+        {
+            if (current == null)
+                return conditions.isEmpty();
+
+            for (ColumnCondition.Bound condition : conditions.values())
+                if (!condition.appliesTo(rowPrefix, current, now))
+                    return false;
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index fef0e94..774883d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -147,14 +147,8 @@ public abstract class ModificationStatement implements 
CQLStatement, MeasurableF
 
     public void validate(ClientState state) throws InvalidRequestException
     {
-        if (hasConditions())
-        {
-            if (attrs.isTimestampSet())
-                throw new InvalidRequestException("Cannot provide custom 
timestamp for conditional updates");
-
-            if (requiresRead())
-                throw new InvalidRequestException("Operations using list 
indexes are not allowed with IF conditions");
-        }
+        if (hasConditions() && attrs.isTimestampSet())
+            throw new InvalidRequestException("Cannot provide custom timestamp 
for conditional updates");
 
         if (isCounter() && attrs.isTimestampSet())
             throw new InvalidRequestException("Cannot provide custom timestamp 
for counter updates");
@@ -414,32 +408,20 @@ public abstract class ModificationStatement implements 
CQLStatement, MeasurableF
 
     public boolean requiresRead()
     {
+        // Lists SET operation incurs a read.
         for (Operation op : columnOperations)
             if (op.requiresRead())
                 return true;
+
         return false;
     }
 
     protected Map<ByteBuffer, CQL3Row> readRequiredRows(Collection<ByteBuffer> 
partitionKeys, Composite clusteringPrefix, boolean local, ConsistencyLevel cl)
     throws RequestExecutionException, RequestValidationException
     {
-        // Lists SET operation incurs a read.
-        boolean requiresRead = false;
-        for (Operation op : columnOperations)
-        {
-            if (op.requiresRead())
-            {
-                requiresRead = true;
-                break;
-            }
-        }
-
-        return requiresRead ? readRows(partitionKeys, clusteringPrefix, cfm, 
local, cl) : null;
-    }
+        if (!requiresRead())
+            return null;
 
-    protected Map<ByteBuffer, CQL3Row> readRows(Collection<ByteBuffer> 
partitionKeys, Composite rowPrefix, CFMetaData cfm, boolean local, 
ConsistencyLevel cl)
-    throws RequestExecutionException, RequestValidationException
-    {
         try
         {
             cl.validateForRead(keyspace());
@@ -449,7 +431,7 @@ public abstract class ModificationStatement implements 
CQLStatement, MeasurableF
             throw new InvalidRequestException(String.format("Write operation 
require a read but consistency %s is not supported on reads", cl));
         }
 
-        ColumnSlice[] slices = new ColumnSlice[]{ rowPrefix.slice() };
+        ColumnSlice[] slices = new ColumnSlice[]{ clusteringPrefix.slice() };
         List<ReadCommand> commands = new 
ArrayList<ReadCommand>(partitionKeys.size());
         long now = System.currentTimeMillis();
         for (ByteBuffer key : partitionKeys)
@@ -527,46 +509,41 @@ public abstract class ModificationStatement implements 
CQLStatement, MeasurableF
             throw new InvalidRequestException("IN on the partition key is not 
supported with conditional updates");
 
         ByteBuffer key = keys.get(0);
-
         long now = options.getTimestamp(queryState);
-        CQL3CasConditions conditions = new CQL3CasConditions(cfm, now);
         Composite prefix = createClusteringPrefix(options);
-        ColumnFamily updates = ArrayBackedSortedColumns.factory.create(cfm);
-        addUpdatesAndConditions(key, prefix, updates, conditions, options, 
getTimestamp(now, options));
+
+        CQL3CasRequest request = new CQL3CasRequest(cfm, key, false);
+        addConditions(prefix, request, options);
+        request.addRowUpdate(prefix, this, options, now);
 
         ColumnFamily result = StorageProxy.cas(keyspace(),
                                                columnFamily(),
                                                key,
-                                               conditions,
-                                               updates,
+                                               request,
                                                options.getSerialConsistency(),
                                                options.getConsistency());
         return new ResultMessage.Rows(buildCasResultSet(key, result, options));
     }
 
-    public void addUpdatesAndConditions(ByteBuffer key, Composite 
clusteringPrefix, ColumnFamily updates, CQL3CasConditions conditions, 
QueryOptions options, long now)
-    throws InvalidRequestException
+    public void addConditions(Composite clusteringPrefix, CQL3CasRequest 
request, QueryOptions options) throws InvalidRequestException
     {
-        UpdateParameters updParams = new UpdateParameters(cfm, options, now, 
getTimeToLive(options), null);
-        addUpdateForKey(updates, key, clusteringPrefix, updParams);
-
         if (ifNotExists)
         {
             // If we use ifNotExists, if the statement applies to any non 
static columns, then the condition is on the row of the non-static
             // columns and the prefix should be the clusteringPrefix. But if 
only static columns are set, then the ifNotExists apply to the existence
             // of any static columns and we should use the prefix for the 
"static part" of the partition.
-            conditions.addNotExist(clusteringPrefix);
+            request.addNotExist(clusteringPrefix);
         }
         else if (ifExists)
         {
-            conditions.addExist(clusteringPrefix);
+            request.addExist(clusteringPrefix);
         }
         else
         {
             if (columnConditions != null)
-                conditions.addConditions(clusteringPrefix, columnConditions, 
options);
+                request.addConditions(clusteringPrefix, columnConditions, 
options);
             if (staticConditions != null)
-                conditions.addConditions(cfm.comparator.staticPrefix(), 
staticConditions, options);
+                request.addConditions(cfm.comparator.staticPrefix(), 
staticConditions, options);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/src/java/org/apache/cassandra/service/CASConditions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CASConditions.java 
b/src/java/org/apache/cassandra/service/CASConditions.java
deleted file mode 100644
index c0a2111..0000000
--- a/src/java/org/apache/cassandra/service/CASConditions.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-
-/**
- * Abstract the conditions to be fulfilled by a CAS operation.
- */
-public interface CASConditions
-{
-    /**
-     * The filter to use to fetch the value to compare for the CAS.
-     */
-    public IDiskAtomFilter readFilter();
-
-    /**
-     * Returns whether the provided CF, that represents the values fetched 
using the
-     * readFilter(), match the CAS conditions this object stands for.
-     */
-    public boolean appliesTo(ColumnFamily current) throws 
InvalidRequestException;
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/src/java/org/apache/cassandra/service/CASRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CASRequest.java 
b/src/java/org/apache/cassandra/service/CASRequest.java
new file mode 100644
index 0000000..3d86637
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/CASRequest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+/**
+ * Abstract the conditions and updates for a CAS operation.
+ */
+public interface CASRequest
+{
+    /**
+     * The filter to use to fetch the value to compare for the CAS.
+     */
+    public IDiskAtomFilter readFilter();
+
+    /**
+     * Returns whether the provided CF, that represents the values fetched 
using the
+     * readFilter(), match the CAS conditions this object stands for.
+     */
+    public boolean appliesTo(ColumnFamily current) throws 
InvalidRequestException;
+
+    /**
+     * The updates to perform of a CAS success. The values fetched using the 
readFilter()
+     * are passed as argument.
+     */
+    public ColumnFamily makeUpdates(ColumnFamily current) throws 
InvalidRequestException;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 62fc0d0..1c0c482 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -190,8 +190,7 @@ public class StorageProxy implements StorageProxyMBean
      * @param keyspaceName the keyspace for the CAS
      * @param cfName the column family for the CAS
      * @param key the row key for the row to CAS
-     * @param conditions the conditions for the CAS to apply.
-     * @param updates the value to insert if {@code condtions} matches the 
current values.
+     * @param request the conditions for the CAS to apply as well as the 
update to perform if the conditions hold.
      * @param consistencyForPaxos the consistency for the paxos prepare and 
propose round. This can only be either SERIAL or LOCAL_SERIAL.
      * @param consistencyForCommit the consistency for write done during the 
commit phase. This can be anything, except SERIAL or LOCAL_SERIAL.
      *
@@ -201,8 +200,7 @@ public class StorageProxy implements StorageProxyMBean
     public static ColumnFamily cas(String keyspaceName,
                                    String cfName,
                                    ByteBuffer key,
-                                   CASConditions conditions,
-                                   ColumnFamily updates,
+                                   CASRequest request,
                                    ConsistencyLevel consistencyForPaxos,
                                    ConsistencyLevel consistencyForCommit)
     throws UnavailableException, IsBootstrappingException, 
ReadTimeoutException, WriteTimeoutException, InvalidRequestException
@@ -226,18 +224,19 @@ public class StorageProxy implements StorageProxyMBean
             // read the current values and check they validate the conditions
             Tracing.trace("Reading existing values for CAS precondition");
             long timestamp = System.currentTimeMillis();
-            ReadCommand readCommand = ReadCommand.create(keyspaceName, key, 
cfName, timestamp, conditions.readFilter());
+            ReadCommand readCommand = ReadCommand.create(keyspaceName, key, 
cfName, timestamp, request.readFilter());
             List<Row> rows = read(Arrays.asList(readCommand), 
consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL? 
ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM);
             ColumnFamily current = rows.get(0).cf;
-            if (!conditions.appliesTo(current))
+            if (!request.appliesTo(current))
             {
-                Tracing.trace("CAS precondition {} does not match current 
values {}", conditions, current);
+                Tracing.trace("CAS precondition does not match current values 
{}", current);
                 // We should not return null as this means success
                 return current == null ? 
ArrayBackedSortedColumns.factory.create(metadata) : current;
             }
 
             // finish the paxos round w/ the desired updates
             // TODO turn null updates into delete?
+            ColumnFamily updates = request.makeUpdates(current);
 
             // Apply triggers to cas updates. A consideration here is that
             // triggers emit Mutations, and so a given trigger implementation

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db108c3/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java 
b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 33cd012..de0b125 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -63,7 +63,7 @@ import org.apache.cassandra.locator.DynamicEndpointSnitch;
 import org.apache.cassandra.metrics.ClientMetrics;
 import org.apache.cassandra.scheduler.IRequestScheduler;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.service.CASConditions;
+import org.apache.cassandra.service.CASRequest;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.StorageProxy;
@@ -784,8 +784,7 @@ public class CassandraServer implements Cassandra.Iface
             ColumnFamily result = StorageProxy.cas(cState.getKeyspace(),
                                                    column_family,
                                                    key,
-                                                   new 
ThriftCASConditions(cfExpected),
-                                                   cfUpdates,
+                                                   new 
ThriftCASRequest(cfExpected, cfUpdates),
                                                    
ThriftConversion.fromThrift(serial_consistency_level),
                                                    
ThriftConversion.fromThrift(commit_consistency_level));
             return result == null
@@ -2249,13 +2248,15 @@ public class CassandraServer implements Cassandra.Iface
         });
     }
 
-    private static class ThriftCASConditions implements CASConditions
+    private static class ThriftCASRequest implements CASRequest
     {
         private final ColumnFamily expected;
+        private final ColumnFamily updates;
 
-        private ThriftCASConditions(ColumnFamily expected)
+        private ThriftCASRequest(ColumnFamily expected, ColumnFamily updates)
         {
             this.expected = expected;
+            this.updates = updates;
         }
 
         public IDiskAtomFilter readFilter()
@@ -2300,10 +2301,9 @@ public class CassandraServer implements Cassandra.Iface
             return cf != null && !cf.hasOnlyTombstones(now);
         }
 
-        @Override
-        public String toString()
+        public ColumnFamily makeUpdates(ColumnFamily current)
         {
-            return expected.toString();
+            return updates;
         }
     }
 }

Reply via email to