Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 a7ad9d2a5 -> ee401cf81
Allow counter mutations in UNLOGGED batches patch by tjake; reviewed by Aleksey Yeschenko for CASSANDRA-7351 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ee401cf8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ee401cf8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ee401cf8 Branch: refs/heads/cassandra-2.1 Commit: ee401cf8131a779069805cbe9ef4ab05d4a63b9a Parents: a7ad9d2 Author: Jake Luciani <[email protected]> Authored: Fri Jun 20 15:16:12 2014 -0700 Committer: Aleksey Yeschenko <[email protected]> Committed: Fri Jun 20 15:16:12 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cql3/QueryProcessor.java | 1 + .../cql3/statements/BatchStatement.java | 75 ++++++---- .../transport/messages/BatchMessage.java | 11 +- .../org/apache/cassandra/cql3/BatchTests.java | 145 +++++++++++++++++++ .../org/apache/cassandra/cql3/DeleteTest.java | 27 ++-- 6 files changed, 216 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee401cf8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b0d8e49..4f68cf7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.0-rc2 + * Allow counter mutations in UNLOGGED batches (CASSANDRA-7351) * Modify reconcile logic to always pick a tombstone over a counter cell (CASSANDRA-7346) * Avoid incremental compaction on Windows (CASSANDRA-7365) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee401cf8/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index 287a700..86362f7 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -418,6 +418,7 @@ public class QueryProcessor implements QueryHandler { ClientState clientState = queryState.getClientState(); batch.checkAccess(clientState); + batch.validate(); batch.validate(clientState); return batch.execute(queryState, options); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee401cf8/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 3cec81b..b7d69cc 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -95,7 +95,8 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache statement.checkAccess(state); } - public void validate(ClientState state) throws InvalidRequestException + // Validates a prepared batch statement without validating its nested statements. + public void validate() throws InvalidRequestException { if (attrs.isTimeToLiveSet()) throw new InvalidRequestException("Global TTL on the BATCH statement is not supported."); @@ -109,15 +110,54 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache throw new InvalidRequestException("Cannot provide custom timestamp for counter BATCH"); } + boolean hasCounters = false; + boolean hasNonCounters = false; + for (ModificationStatement statement : statements) { + if (timestampSet && statement.isCounter()) + throw new InvalidRequestException("Cannot provide custom timestamp for a BATCH containing counters"); + if (timestampSet && statement.isTimestampSet()) throw new InvalidRequestException("Timestamp must be set either on BATCH or individual statements"); - statement.validate(state); + if (type == Type.COUNTER && !statement.isCounter()) + throw new InvalidRequestException("Cannot include non-counter statement in a counter batch"); + + if (type == Type.LOGGED && statement.isCounter()) + throw new InvalidRequestException("Cannot include a counter statement in a logged batch"); + + if (statement.isCounter()) + hasCounters = true; + else + hasNonCounters = true; + } + + if (hasCounters && hasNonCounters) + throw new InvalidRequestException("Counter and non-counter mutations cannot exist in the same batch"); + + if (hasConditions) + { + String ksName = null; + String cfName = null; + for (ModificationStatement stmt : statements) + { + if (ksName != null && (!stmt.keyspace().equals(ksName) || !stmt.columnFamily().equals(cfName))) + throw new InvalidRequestException("Batch with conditions cannot span multiple tables"); + ksName = stmt.keyspace(); + cfName = stmt.columnFamily(); + } } } + // The batch itself will be validated in either Parsed#prepare() - for regular CQL3 batches, + // or in QueryProcessor.processBatch() - for native protocol batches. + public void validate(ClientState state) throws InvalidRequestException + { + for (ModificationStatement statement : statements) + statement.validate(state); + } + public List<ModificationStatement> getStatements() { return statements; @@ -180,12 +220,12 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache { mut = new Mutation(ksName, key); mut.setSourceFrame(sourceFrame); - mutation = type == Type.COUNTER ? new CounterMutation(mut, options.getConsistency()) : mut; + mutation = statement.cfm.isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut; ksMap.put(key, mutation); } else { - mut = type == Type.COUNTER ? ((CounterMutation)mutation).getMutation() : (Mutation)mutation; + mut = statement.cfm.isCounter() ? ((CounterMutation)mutation).getMutation() : (Mutation)mutation; } statement.addUpdateForKey(mut.addOrGet(statement.cfm), key, clusteringPrefix, params); @@ -356,40 +396,25 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache { VariableSpecifications boundNames = getBoundVariables(); - List<ModificationStatement> statements = new ArrayList<ModificationStatement>(parsedStatements.size()); + List<ModificationStatement> statements = new ArrayList<>(parsedStatements.size()); boolean hasConditions = false; + for (ModificationStatement.Parsed parsed : parsedStatements) { ModificationStatement stmt = parsed.prepare(boundNames); if (stmt.hasConditions()) hasConditions = true; - if (stmt.isCounter() && type != Type.COUNTER) - throw new InvalidRequestException("Counter mutations are only allowed in COUNTER batches"); - - if (!stmt.isCounter() && type == Type.COUNTER) - throw new InvalidRequestException("Only counter mutations are allowed in COUNTER batches"); - statements.add(stmt); } - if (hasConditions) - { - String ksName = null; - String cfName = null; - for (ModificationStatement stmt : statements) - { - if (ksName != null && (!stmt.keyspace().equals(ksName) || !stmt.columnFamily().equals(cfName))) - throw new InvalidRequestException("Batch with conditions cannot span multiple tables"); - ksName = stmt.keyspace(); - cfName = stmt.columnFamily(); - } - } - Attributes prepAttrs = attrs.prepare("[batch]", "[batch]"); prepAttrs.collectMarkerSpecification(boundNames); - return new ParsedStatement.Prepared(new BatchStatement(boundNames.size(), type, statements, prepAttrs, hasConditions), boundNames); + BatchStatement batchStatement = new BatchStatement(boundNames.size(), type, statements, prepAttrs, hasConditions); + batchStatement.validate(); + + return new ParsedStatement.Prepared(batchStatement, boundNames); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee401cf8/src/java/org/apache/cassandra/transport/messages/BatchMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java index e2cb8a1..c199a62 100644 --- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java @@ -209,16 +209,7 @@ public class BatchMessage extends Message.Request ModificationStatement mst = (ModificationStatement)statement; hasConditions |= mst.hasConditions(); - if (mst.isCounter()) - { - if (type != BatchStatement.Type.COUNTER) - throw new InvalidRequestException("Cannot include counter statement in a non-counter batch"); - } - else - { - if (type == BatchStatement.Type.COUNTER) - throw new InvalidRequestException("Cannot include non-counter statement in a counter batch"); - } + statements.add(mst); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee401cf8/test/unit/org/apache/cassandra/cql3/BatchTests.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/BatchTests.java b/test/unit/org/apache/cassandra/cql3/BatchTests.java new file mode 100644 index 0000000..27d407e --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/BatchTests.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.service.EmbeddedCassandraService; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public class BatchTests +{ + private static EmbeddedCassandraService cassandra; + + private static Cluster cluster; + private static Session session; + + + private static PreparedStatement counter; + private static PreparedStatement noncounter; + + @BeforeClass() + public static void setup() throws ConfigurationException, IOException + { + cassandra = new EmbeddedCassandraService(); + cassandra.start(); + + cluster = Cluster.builder().addContactPoint("127.0.0.1").withPort(DatabaseDescriptor.getNativeTransportPort()).build(); + session = cluster.connect(); + + session.execute("drop keyspace if exists junit;"); + session.execute("create keyspace junit WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"); + session.execute("CREATE TABLE junit.noncounter (\n" + + " id int PRIMARY KEY,\n" + + " val text\n" + + ");"); + session.execute("CREATE TABLE junit.counter (\n" + + " id int PRIMARY KEY,\n" + + " val counter,\n" + + ");"); + + + noncounter = session.prepare("insert into junit.noncounter(id, val)values(?,?)"); + counter = session.prepare("update junit.counter set val = val + ? where id = ?"); + } + + @Test(expected = InvalidQueryException.class) + public void testMixedInCounterBatch() + { + sendBatch(BatchStatement.Type.COUNTER, true, true); + } + + @Test(expected = InvalidQueryException.class) + public void testMixedInLoggedBatch() + { + sendBatch(BatchStatement.Type.LOGGED, true, true); + } + + @Test(expected = InvalidQueryException.class) + public void testMixedInUnLoggedBatch() + { + sendBatch(BatchStatement.Type.UNLOGGED, true, true); + } + + @Test(expected = InvalidQueryException.class) + public void testNonCounterInCounterBatch() + { + sendBatch(BatchStatement.Type.COUNTER, false, true); + } + + @Test + public void testNonCounterInLoggedBatch() + { + sendBatch(BatchStatement.Type.LOGGED, false, true); + } + + @Test + public void testNonCounterInUnLoggedBatch() + { + sendBatch(BatchStatement.Type.UNLOGGED, false, true); + } + + @Test + public void testCounterInCounterBatch() + { + sendBatch(BatchStatement.Type.COUNTER, true, false); + } + + @Test + public void testCounterInUnLoggedBatch() + { + sendBatch(BatchStatement.Type.UNLOGGED, true, false); + } + + + @Test(expected = InvalidQueryException.class) + public void testCounterInLoggedBatch() + { + sendBatch(BatchStatement.Type.LOGGED, true, false); + } + + + + public void sendBatch(BatchStatement.Type type, boolean addCounter, boolean addNonCounter) + { + + assert addCounter || addNonCounter; + BatchStatement b = new BatchStatement(type); + + for (int i = 0; i < 10; i++) + { + if (addNonCounter) + b.add(noncounter.bind(i, "foo")); + + if (addCounter) + b.add(counter.bind((long)i, i)); + } + + session.execute(b); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee401cf8/test/unit/org/apache/cassandra/cql3/DeleteTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/DeleteTest.java b/test/unit/org/apache/cassandra/cql3/DeleteTest.java index 3395dcc..d2dbc79 100644 --- a/test/unit/org/apache/cassandra/cql3/DeleteTest.java +++ b/test/unit/org/apache/cassandra/cql3/DeleteTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.cassandra.cql3; @@ -16,9 +33,8 @@ import org.junit.Test; import java.io.IOException; -public class DeleteTest extends SchemaLoader +public class DeleteTest { - private static EmbeddedCassandraService cassandra; private static Cluster cluster; @@ -35,8 +51,6 @@ public class DeleteTest extends SchemaLoader @BeforeClass() public static void setup() throws ConfigurationException, IOException { - - Schema.instance.clear(); // Schema are now written on disk and will be reloaded cassandra = new EmbeddedCassandraService(); cassandra.start(); @@ -80,11 +94,6 @@ public class DeleteTest extends SchemaLoader " val text ,\n" + " PRIMARY KEY ( ( id ), cid )\n" + ");"); - try { - Thread.sleep(2000L); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } pstmtI = session.prepare("insert into junit.tpc_inherit_b ( id, cid, inh_b, val) values (?, ?, ?, ?)"); pstmtU = session.prepare("update junit.tpc_inherit_b set inh_b=?, val=? where id=? and cid=?");
