Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 13b753b01 -> 3b4084b6c
Optimize single partition batch statements patch by slebresne; reviewed by benedict for CASSANDRA-6737 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/54a7e003 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/54a7e003 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/54a7e003 Branch: refs/heads/cassandra-2.1 Commit: 54a7e0034148f451ff493f9f5363c26f10a21f20 Parents: edf16c9 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Wed Feb 19 19:10:09 2014 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Feb 21 10:18:02 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cql3/statements/BatchStatement.java | 57 ++++++++++++++------ .../cql3/statements/DeleteStatement.java | 8 --- .../cql3/statements/ModificationStatement.java | 40 +++++++------- .../cql3/statements/UpdateStatement.java | 8 --- .../org/apache/cassandra/db/RowMutation.java | 7 ++- 6 files changed, 67 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bbacc4d..a5e1016 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -19,6 +19,7 @@ * Improve repair tasks(snapshot, differencing) concurrency (CASSANDRA-6566) * Fix replaying pre-2.0 commit logs (CASSANDRA-6714) * Add static columns to CQL3 (CASSANDRA-6561) + * Optimize single partition batch statements (CASSANDRA-6737) Merged from 1.2: * Catch memtable flush exceptions during shutdown (CASSANDRA-6735) * Fix broken streams when replacing with same IP (CASSANDRA-6622) http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index d4acbae..b1dbb31 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -30,7 +30,6 @@ import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.transport.messages.ResultMessage; -import org.apache.cassandra.utils.Pair; /** * A <code>BATCH</code> statement parsed from a CQL query. @@ -113,14 +112,26 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache private Collection<? extends IMutation> getMutations(BatchVariables variables, boolean local, ConsistencyLevel cl, long now) throws RequestExecutionException, RequestValidationException { - Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>(); + Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>(); for (int i = 0; i < statements.size(); i++) { ModificationStatement statement = statements.get(i); List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i); addStatementMutations(statement, statementVariables, local, cl, now, mutations); } - return mutations.values(); + return unzipMutations(mutations); + } + + private Collection<? extends IMutation> unzipMutations(Map<String, Map<ByteBuffer, IMutation>> mutations) + { + // The case where all statement where on the same keyspace is pretty common + if (mutations.size() == 1) + return mutations.values().iterator().next().values(); + + List<IMutation> ms = new ArrayList<>(); + for (Map<ByteBuffer, IMutation> ksMap : mutations.values()) + ms.addAll(ksMap.values()); + return ms; } private void addStatementMutations(ModificationStatement statement, @@ -128,23 +139,40 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache boolean local, ConsistencyLevel cl, long now, - Map<Pair<String, ByteBuffer>, IMutation> mutations) + Map<String, Map<ByteBuffer, IMutation>> mutations) throws RequestExecutionException, RequestValidationException { - // Group mutation together, otherwise they won't get applied atomically - for (IMutation m : statement.getMutations(variables, local, cl, attrs.getTimestamp(now, variables), true)) + String ksName = statement.keyspace(); + Map<ByteBuffer, IMutation> ksMap = mutations.get(ksName); + if (ksMap == null) { - Pair<String, ByteBuffer> key = Pair.create(m.getKeyspaceName(), m.key()); - IMutation existing = mutations.get(key); + ksMap = new HashMap<>(); + mutations.put(ksName, ksMap); + } - if (existing == null) + // The following does the same than statement.getMutations(), but we inline it here because + // we don't want to recreate mutations every time as this is particularly inefficient when applying + // multiple batch to the same partition (see #6737). + List<ByteBuffer> keys = statement.buildPartitionKeyNames(variables); + ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(variables); + UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now); + + for (ByteBuffer key : keys) + { + IMutation mutation = ksMap.get(key); + RowMutation rm; + if (mutation == null) { - mutations.put(key, m); + rm = new RowMutation(ksName, key); + mutation = type == Type.COUNTER ? new CounterMutation(rm, cl) : rm; + ksMap.put(key, mutation); } else { - existing.addAll(m); + rm = type == Type.COUNTER ? ((CounterMutation)mutation).rowMutation() : (RowMutation)mutation; } + + statement.addUpdateForKey(rm.addOrGet(statement.cfm, UnsortedColumns.factory), key, clusteringPrefix, params); } } @@ -213,9 +241,9 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache throw new InvalidRequestException("Batch with conditions cannot span multiple partitions"); } + ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(statementVariables); if (statement.hasConditions()) { - ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(statementVariables); statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementVariables, timestamp); // As soon as we have a ifNotExists, we set columnsWithConditions to null so that everything is in the resultSet if (statement.hasIfNotExistCondition()) @@ -225,9 +253,8 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache } else { - // getPartitionKey will already have thrown if there is more than one key involved - IMutation mut = statement.getMutations(statementVariables, false, cl, timestamp, true).iterator().next(); - updates.resolve(mut.getColumnFamilies().iterator().next()); + UpdateParameters params = statement.makeUpdateParameters(Collections.singleton(key), clusteringPrefix, statementVariables, false, cl, now); + statement.addUpdateForKey(updates, key, clusteringPrefix, params); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java index cd5f2a2..6efe100 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java @@ -41,14 +41,6 @@ public class DeleteStatement extends ModificationStatement return false; } - public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) - throws InvalidRequestException - { - ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm); - addUpdateForKey(cf, key, builder, params); - return cf; - } - public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) throws InvalidRequestException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index ac8d2e1..ecefcb9 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -415,7 +415,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF return null; } - protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl) + protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(Collection<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException { // Lists SET operation incurs a read. @@ -433,7 +433,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead, (CompositeType)cfm.comparator, local, cl); } - private Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl) + private Map<ByteBuffer, ColumnGroupMap> readRows(Collection<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException { try @@ -516,7 +516,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF else cl.validateForWrite(cfm.ksName); - Collection<? extends IMutation> mutations = getMutations(options.getValues(), false, cl, queryState.getTimestamp(), false); + Collection<? extends IMutation> mutations = getMutations(options.getValues(), false, cl, queryState.getTimestamp()); if (!mutations.isEmpty()) StorageProxy.mutateWithTriggers(mutations, cl, false); @@ -651,7 +651,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF if (hasConditions()) throw new UnsupportedOperationException(); - for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp(), false)) + for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp())) mutation.apply(); return null; } @@ -667,15 +667,13 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF * @return list of the mutations * @throws InvalidRequestException on invalid requests */ - public Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now, boolean isBatch) + private Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now) throws RequestExecutionException, RequestValidationException { List<ByteBuffer> keys = buildPartitionKeyNames(variables); ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables); - // Some lists operation requires reading - Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, clusteringPrefix, local, cl); - UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows); + UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now); Collection<IMutation> mutations = new ArrayList<IMutation>(); for (ByteBuffer key: keys) @@ -683,25 +681,23 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF ThriftValidation.validateKey(cfm, key); ColumnFamily cf = UnsortedColumns.factory.create(cfm); addUpdateForKey(cf, key, clusteringPrefix, params); - mutations.add(makeMutation(key, cf, cl, isBatch)); + RowMutation rm = new RowMutation(cfm.ksName, key, cf); + mutations.add(isCounter() ? new CounterMutation(rm, cl) : rm); } return mutations; } - private IMutation makeMutation(ByteBuffer key, ColumnFamily cf, ConsistencyLevel cl, boolean isBatch) + public UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys, + ColumnNameBuilder prefix, + List<ByteBuffer> variables, + boolean local, + ConsistencyLevel cl, + long now) + throws RequestExecutionException, RequestValidationException { - RowMutation rm; - if (isBatch) - { - // we might group other mutations together with this one later, so make it mutable - rm = new RowMutation(cfm.ksName, key); - rm.add(cf); - } - else - { - rm = new RowMutation(cfm.ksName, key, cf); - } - return isCounter() ? new CounterMutation(rm, cl) : rm; + // Some lists operation requires reading + Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, prefix, local, cl); + return new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows); } public static abstract class Parsed extends CFStatement http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index 0e6481b..dcf22ef 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@ -98,14 +98,6 @@ public class UpdateStatement extends ModificationStatement } } - public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) - throws InvalidRequestException - { - ColumnFamily cf = UnsortedColumns.factory.create(cfm); - addUpdateForKey(cf, key, builder, params); - return cf; - } - public static class ParsedInsert extends ModificationStatement.Parsed { private final List<ColumnIdentifier> columnNames; http://git-wip-us.apache.org/repos/asf/cassandra/blob/54a7e003/src/java/org/apache/cassandra/db/RowMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java index e9d177b..49ee2c5 100644 --- a/src/java/org/apache/cassandra/db/RowMutation.java +++ b/src/java/org/apache/cassandra/db/RowMutation.java @@ -125,10 +125,15 @@ public class RowMutation implements IMutation public ColumnFamily addOrGet(CFMetaData cfm) { + return addOrGet(cfm, TreeMapBackedSortedColumns.factory); + } + + public ColumnFamily addOrGet(CFMetaData cfm, ColumnFamily.Factory factory) + { ColumnFamily cf = modifications.get(cfm.cfId); if (cf == null) { - cf = TreeMapBackedSortedColumns.factory.create(cfm); + cf = factory.create(cfm); modifications.put(cfm.cfId, cf); } return cf;