Author: eevans Date: Wed Nov 17 16:57:35 2010 New Revision: 1036112 URL: http://svn.apache.org/viewvc?rev=1036112&view=rev Log: batched UPDATEs
Patch by eevans Added: cassandra/trunk/src/java/org/apache/cassandra/cql/BatchUpdateStatement.java Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java cassandra/trunk/test/system/test_cql.py Added: cassandra/trunk/src/java/org/apache/cassandra/cql/BatchUpdateStatement.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/BatchUpdateStatement.java?rev=1036112&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/BatchUpdateStatement.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/BatchUpdateStatement.java Wed Nov 17 16:57:35 2010 @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.cassandra.cql; + +import java.util.List; + +import org.apache.cassandra.thrift.ConsistencyLevel; + +/** + * A <code>BATCH UPDATE</code> statement parsed from a CQL query. + * + */ +public class BatchUpdateStatement +{ + private ConsistencyLevel consistency; + private List<UpdateStatement> updates; + + /** + * Creates a new BatchUpdateStatement from a list of UpdateStatements and a + * Thrift consistency level. + * + * @param updates a list of UpdateStatements + * @param consistency Thrift consistency level enum + */ + public BatchUpdateStatement(List<UpdateStatement> updates, ConsistencyLevel consistency) + { + this.updates = updates; + this.consistency = consistency; + } + + public ConsistencyLevel getConsistencyLevel() + { + return consistency; + } + + public List<UpdateStatement> getUpdates() + { + return updates; + } + + public String toString() + { + return String.format("BatchUpdateStatement(updates=%s, consistency=%s)", + updates, + consistency); + } +} Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g?rev=1036112&r1=1036111&r2=1036112&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g Wed Nov 17 16:57:35 2010 @@ -45,6 +45,7 @@ options { query returns [CQLStatement stmnt] : selectStatement { $stmnt = new CQLStatement(StatementType.SELECT, $selectStatement.expr); } | updateStatement { $stmnt = new CQLStatement(StatementType.UPDATE, $updateStatement.expr); } + | batchUpdateStatement { $stmnt = new CQLStatement(StatementType.BATCH_UPDATE, $batchUpdateStatement.expr); } | useStatement { $stmnt = new CQLStatement(StatementType.USE, $useStatement.keyspace); } ; @@ -90,6 +91,26 @@ selectStatement returns [SelectStatement ; /** + * BEGIN BATCH [USING CONSISTENCY.<LVL>] + * UPDATE <CF> SET name1 = value1 WHERE KEY = keyname1; + * UPDATE <CF> SET name2 = value2 WHERE KEY = keyname2; + * UPDATE <CF> SET name3 = value3 WHERE KEY = keyname3; + * APPLY BATCH + */ +batchUpdateStatement returns [BatchUpdateStatement expr] + : { + ConsistencyLevel cLevel = ConsistencyLevel.ONE; + List<UpdateStatement> updates = new ArrayList<UpdateStatement>(); + } + K_BEGIN K_BATCH ( K_USING K_CONSISTENCY '.' K_LEVEL { cLevel = ConsistencyLevel.valueOf($K_LEVEL.text); } )? + u1=updateStatement { updates.add(u1); } ( uN=updateStatement { updates.add(uN); } )* + K_APPLY K_BATCH EOF + { + return new BatchUpdateStatement(updates, cLevel); + } + ; + +/** * UPDATE * <CF> * USING @@ -102,7 +123,7 @@ selectStatement returns [SelectStatement */ updateStatement returns [UpdateStatement expr] : { - ConsistencyLevel cLevel = ConsistencyLevel.ONE; + ConsistencyLevel cLevel = null; Map<Term, Term> columns = new HashMap<Term, Term>(); } K_UPDATE columnFamily=IDENT @@ -177,6 +198,9 @@ K_FIRST: F I R S T; K_REVERSED: R E V E R S E D; K_COUNT: C O U N T; K_SET: S E T; +K_BEGIN: B E G I N; +K_APPLY: A P P L Y; +K_BATCH: B A T C H; // Case-insensitive alpha characters fragment A: ('a'|'A'); Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1036112&r1=1036111&r2=1036112&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java Wed Nov 17 16:57:35 2010 @@ -24,7 +24,6 @@ package org.apache.cassandra.cql; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -211,10 +210,9 @@ public class QueryProcessor return rows; } - private static void batchUpdate(String keyspace, List<UpdateStatement> updateStatements) + private static void batchUpdate(String keyspace, List<UpdateStatement> updateStatements, ConsistencyLevel consistency) throws InvalidRequestException, UnavailableException, TimedOutException { - ConsistencyLevel consistency = updateStatements.get(0).getConsistencyLevel(); List<RowMutation> rowMutations = new ArrayList<RowMutation>(); for (UpdateStatement update : updateStatements) @@ -392,7 +390,19 @@ public class QueryProcessor case UPDATE: UpdateStatement update = (UpdateStatement)statement.statement; - batchUpdate(keyspace, Collections.singletonList(update)); + batchUpdate(keyspace, Collections.singletonList(update), update.getConsistencyLevel()); + avroResult.type = CqlResultType.VOID; + return avroResult; + + case BATCH_UPDATE: + BatchUpdateStatement batch = (BatchUpdateStatement)statement.statement; + + for (UpdateStatement up : batch.getUpdates()) + if (up.isSetConsistencyLevel()) + throw newInvalidRequestException( + "Consistency level must be set on the BATCH, not individual UPDATE statements"); + + batchUpdate(keyspace, batch.getUpdates(), batch.getConsistencyLevel()); avroResult.type = CqlResultType.VOID; return avroResult; Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java?rev=1036112&r1=1036111&r2=1036112&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java Wed Nov 17 16:57:35 2010 @@ -22,5 +22,5 @@ package org.apache.cassandra.cql; public enum StatementType { - SELECT, UPDATE, USE; + SELECT, UPDATE, BATCH_UPDATE, USE; } Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java?rev=1036112&r1=1036111&r2=1036112&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java Wed Nov 17 16:57:35 2010 @@ -30,8 +30,9 @@ import org.apache.cassandra.thrift.Consi */ public class UpdateStatement { + public static final ConsistencyLevel defaultConsistency = ConsistencyLevel.ONE; private String columnFamily; - private ConsistencyLevel cLevel; + private ConsistencyLevel cLevel = null; private Map<Term, Term> columns; private Term key; @@ -52,9 +53,38 @@ public class UpdateStatement this.key = key; } + /** + * Creates a new UpdateStatement from a column family name, columns map, + * and key term. + * + * @param columnFamily column family name + * @param columns a map of column name/values pairs + * @param key the key name + */ + public UpdateStatement(String columnFamily, Map<Term, Term> columns, Term key) + { + this(columnFamily, null, columns, key); + } + + /** + * Returns the consistency level of this <code>UPDATE</code> statement, either + * one parsed from the CQL statement, or the default level otherwise. + * + * @return the consistency level as a Thrift enum. + */ public ConsistencyLevel getConsistencyLevel() { - return cLevel; + return (cLevel != null) ? cLevel : defaultConsistency; + } + + /** + * True if an explicit consistency level was parsed from the statement. + * + * @return true if a consistency was parsed, false otherwise. + */ + public boolean isSetConsistencyLevel() + { + return (cLevel != null); } public String getColumnFamily() Modified: cassandra/trunk/test/system/test_cql.py URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_cql.py?rev=1036112&r1=1036111&r2=1036112&view=diff ============================================================================== --- cassandra/trunk/test/system/test_cql.py (original) +++ cassandra/trunk/test/system/test_cql.py Wed Nov 17 16:57:35 2010 @@ -23,53 +23,25 @@ def load_sample(dbconn): """) dbconn.execute(""" - UPDATE StandardLong1 SET 1L = "1", 2L = "2", 3L = "3", 4L = "4" - WHERE KEY = "aa"; - """) - dbconn.execute(""" - UPDATE StandardLong1 SET 5L = "5", 6L = "6", 7L = "8", 9L = "9" - WHERE KEY = "ab"; - """) - dbconn.execute(""" - UPDATE StandardLong1 SET 9L = "9", 8L = "8", 7L = "7", 6L = "6" - WHERE KEY = "ac"; - """) - dbconn.execute(""" - UPDATE StandardLong1 SET 5L = "5", 4L = "4", 3L = "3", 2L = "2" - WHERE KEY = "ad"; - """) - dbconn.execute(""" - UPDATE StandardLong1 SET 1L = "1", 2L = "2", 3L = "3", 4L = "4" - WHERE KEY = "ae"; - """) - dbconn.execute(""" - UPDATE StandardLong1 SET 1L = "1", 2L = "2", 3L = "3", 4L = "4" - WHERE KEY = "af"; - """) - dbconn.execute(""" - UPDATE StandardLong1 SET 5L = "5", 6L = "6", 7L = "8", 9L = "9" - WHERE KEY = "ag"; + BEGIN BATCH USING CONSISTENCY.ONE + UPDATE StandardLong1 SET 1L="1", 2L="2", 3L="3", 4L="4" WHERE KEY="aa"; + UPDATE StandardLong1 SET 5L="5", 6L="6", 7L="8", 9L="9" WHERE KEY="ab"; + UPDATE StandardLong1 SET 9L="9", 8L="8", 7L="7", 6L="6" WHERE KEY="ac"; + UPDATE StandardLong1 SET 5L="5", 4L="4", 3L="3", 2L="2" WHERE KEY="ad"; + UPDATE StandardLong1 SET 1L="1", 2L="2", 3L="3", 4L="4" WHERE KEY="ae"; + UPDATE StandardLong1 SET 1L="1", 2L="2", 3L="3", 4L="4" WHERE KEY="af"; + UPDATE StandardLong1 SET 5L="5", 6L="6", 7L="8", 9L="9" WHERE KEY="ag"; + APPLY BATCH """) dbconn.execute(""" - UPDATE Indexed1 SET "birthdate" = 100L, "unindexed" = 250L - WHERE KEY = "asmith"; - """) - dbconn.execute(""" - UPDATE Indexed1 SET "birthdate" = 100L, "unindexed" = 200L - WHERE KEY = "dozer"; - """) - dbconn.execute(""" - UPDATE Indexed1 SET "birthdate" = 175L, "unindexed" = 200L - WHERE KEY = "morpheus"; - """) - dbconn.execute(""" - UPDATE Indexed1 SET "birthdate" = 150L, "unindexed" = 250L - WHERE KEY = "neo"; - """) - dbconn.execute(""" - UPDATE Indexed1 SET "birthdate" = 125L, "unindexed" = 200L - WHERE KEY = "trinity"; + BEGIN BATCH + UPDATE Indexed1 SET "birthdate"=100L, "unindexed"=250L WHERE KEY="asmith"; + UPDATE Indexed1 SET "birthdate"=100L, "unindexed"=200L WHERE KEY="dozer"; + UPDATE Indexed1 SET "birthdate"=175L, "unindexed"=200L WHERE KEY="morpheus"; + UPDATE Indexed1 SET "birthdate"=150L, "unindexed"=250L WHERE KEY="neo"; + UPDATE Indexed1 SET "birthdate"=125L, "unindexed"=200L WHERE KEY="trinity"; + APPLY BATCH """) def init(keyspace="Keyspace1"):