Merge branch 'cassandra-2.0' into cassandra-2.1 Conflicts: src/java/org/apache/cassandra/cql3/statements/BatchStatement.java src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java src/java/org/apache/cassandra/db/Mutation.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3b4084b6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3b4084b6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3b4084b6 Branch: refs/heads/cassandra-2.1 Commit: 3b4084b6c9d7330889de23ee27c3483777054e55 Parents: 13b753b 54a7e00 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Fri Feb 21 11:03:52 2014 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Feb 21 11:03:52 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cql3/statements/BatchStatement.java | 57 ++++++++++++++------ .../cql3/statements/ModificationStatement.java | 40 +++++++------- .../cql3/statements/UpdateStatement.java | 8 --- .../apache/cassandra/db/CounterMutation.java | 5 ++ 5 files changed, 66 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b4084b6/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b4084b6/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index ab2b4bc,b1dbb31..21e60f8 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@@ -130,23 -139,40 +141,40 @@@ public class BatchStatement implements boolean local, ConsistencyLevel cl, long now, - Map<Pair<String, ByteBuffer>, IMutation> mutations) + Map<String, Map<ByteBuffer, IMutation>> mutations) throws RequestExecutionException, RequestValidationException { - // Group mutation together, otherwise they won't get applied atomically - for (IMutation m : statement.getMutations(variables, local, cl, attrs.getTimestamp(now, variables), true)) + String ksName = statement.keyspace(); + Map<ByteBuffer, IMutation> ksMap = mutations.get(ksName); + if (ksMap == null) { - Pair<String, ByteBuffer> key = Pair.create(m.getKeyspaceName(), m.key()); - IMutation existing = mutations.get(key); + ksMap = new HashMap<>(); + mutations.put(ksName, ksMap); + } + + // The following does the same than statement.getMutations(), but we inline it here because + // we don't want to recreate mutations every time as this is particularly inefficient when applying + // multiple batch to the same partition (see #6737). + List<ByteBuffer> keys = statement.buildPartitionKeyNames(variables); - ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(variables); ++ Composite clusteringPrefix = statement.createClusteringPrefix(variables); + UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now); - if (existing == null) + for (ByteBuffer key : keys) + { + IMutation mutation = ksMap.get(key); - RowMutation rm; ++ Mutation mut; + if (mutation == null) { - mutations.put(key, m); - rm = new RowMutation(ksName, key); - mutation = type == Type.COUNTER ? new CounterMutation(rm, cl) : rm; ++ mut = new Mutation(ksName, key); ++ mutation = type == Type.COUNTER ? new CounterMutation(mut, cl) : mut; + ksMap.put(key, mutation); } else { - existing.addAll(m); - rm = type == Type.COUNTER ? ((CounterMutation)mutation).rowMutation() : (RowMutation)mutation; ++ mut = type == Type.COUNTER ? ((CounterMutation)mutation).getMutation() : (Mutation)mutation; } + - statement.addUpdateForKey(rm.addOrGet(statement.cfm, UnsortedColumns.factory), key, clusteringPrefix, params); ++ statement.addUpdateForKey(mut.addOrGet(statement.cfm), key, clusteringPrefix, params); } } @@@ -215,9 -241,9 +243,9 @@@ throw new InvalidRequestException("Batch with conditions cannot span multiple partitions"); } - ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(statementVariables); ++ Composite clusteringPrefix = statement.createClusteringPrefix(statementVariables); if (statement.hasConditions()) { - Composite clusteringPrefix = statement.createClusteringPrefix(statementVariables); statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementVariables, timestamp); // As soon as we have a ifNotExists, we set columnsWithConditions to null so that everything is in the resultSet if (statement.hasIfNotExistCondition()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b4084b6/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index db0b7a9,ecefcb9..f90293b --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@@ -380,7 -415,7 +380,7 @@@ public abstract class ModificationState return null; } - protected Map<ByteBuffer, CQL3Row> readRequiredRows(List<ByteBuffer> partitionKeys, Composite clusteringPrefix, boolean local, ConsistencyLevel cl) - protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(Collection<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl) ++ protected Map<ByteBuffer, CQL3Row> readRequiredRows(Collection<ByteBuffer> partitionKeys, Composite clusteringPrefix, boolean local, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException { // Lists SET operation incurs a read. @@@ -394,10 -430,10 +394,10 @@@ } } - return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead, (CompositeType)cfm.comparator, local, cl); + return requiresRead ? readRows(partitionKeys, clusteringPrefix, cfm, local, cl) : null; } - protected Map<ByteBuffer, CQL3Row> readRows(List<ByteBuffer> partitionKeys, Composite rowPrefix, CFMetaData cfm, boolean local, ConsistencyLevel cl) - private Map<ByteBuffer, ColumnGroupMap> readRows(Collection<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl) ++ protected Map<ByteBuffer, CQL3Row> readRows(Collection<ByteBuffer> partitionKeys, Composite rowPrefix, CFMetaData cfm, boolean local, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException { try @@@ -604,12 -651,8 +604,12 @@@ if (hasConditions()) throw new UnsupportedOperationException(); - for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp(), false)) + for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp())) - mutation.apply(); + { + // We don't use counters internally. + assert mutation instanceof Mutation; + ((Mutation) mutation).apply(); + } return null; } @@@ -628,37 -671,33 +628,33 @@@ throws RequestExecutionException, RequestValidationException { List<ByteBuffer> keys = buildPartitionKeyNames(variables); - ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables); + Composite clusteringPrefix = createClusteringPrefix(variables); - // Some lists operation requires reading - Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, clusteringPrefix, local, cl); - UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows); + UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now); Collection<IMutation> mutations = new ArrayList<IMutation>(); for (ByteBuffer key: keys) { ThriftValidation.validateKey(cfm, key); - ColumnFamily cf = UnsortedColumns.factory.create(cfm); + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm); addUpdateForKey(cf, key, clusteringPrefix, params); - mutations.add(makeMutation(key, cf, cl, isBatch)); - RowMutation rm = new RowMutation(cfm.ksName, key, cf); - mutations.add(isCounter() ? new CounterMutation(rm, cl) : rm); ++ Mutation mut = new Mutation(cfm.ksName, key, cf); ++ mutations.add(isCounter() ? new CounterMutation(mut, cl) : mut); } return mutations; } - private IMutation makeMutation(ByteBuffer key, ColumnFamily cf, ConsistencyLevel cl, boolean isBatch) + public UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys, - ColumnNameBuilder prefix, ++ Composite prefix, + List<ByteBuffer> variables, + boolean local, + ConsistencyLevel cl, + long now) + throws RequestExecutionException, RequestValidationException { - Mutation mutation; - if (isBatch) - { - // we might group other mutations together with this one later, so make it mutable - mutation = new Mutation(cfm.ksName, key); - mutation.add(cf); - } - else - { - mutation = new Mutation(cfm.ksName, key, cf); - } - return isCounter() ? new CounterMutation(mutation, cl) : mutation; + // Some lists operation requires reading - Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, prefix, local, cl); ++ Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, prefix, local, cl); + return new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows); } public static abstract class Parsed extends CFStatement http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b4084b6/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b4084b6/src/java/org/apache/cassandra/db/CounterMutation.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/CounterMutation.java index 5d96c70,fb363c2..41187ac --- a/src/java/org/apache/cassandra/db/CounterMutation.java +++ b/src/java/org/apache/cassandra/db/CounterMutation.java @@@ -65,12 -63,17 +65,17 @@@ public class CounterMutation implement public Collection<ColumnFamily> getColumnFamilies() { - return rowMutation.getColumnFamilies(); + return mutation.getColumnFamilies(); } - public ByteBuffer key() ++ public Mutation getMutation() + { - return rowMutation.key(); ++ return mutation; + } + - public RowMutation rowMutation() + public ByteBuffer key() { - return rowMutation; + return mutation.key(); } public ConsistencyLevel consistency()