This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 8ae2b094654aac6a2f2d8a79a1aa08a7215c8621 Merge: 7f1659c 8333d0b Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Fri Jan 22 10:56:20 2021 +0100 Merge branch 'cassandra-3.11' into trunk CHANGES.txt | 1 + .../PartitionKeySingleRestrictionSet.java | 7 +- .../cql3/restrictions/RestrictionSet.java | 66 +++++---- .../cassandra/cql3/statements/BatchStatement.java | 48 ++++--- .../cql3/statements/BatchUpdatesCollector.java | 50 +++++-- .../cql3/statements/ModificationStatement.java | 17 ++- .../statements/SingleTableUpdatesCollector.java | 26 ++-- .../cassandra/cql3/statements/UpdateStatement.java | 9 +- src/java/org/apache/cassandra/db/Mutation.java | 17 ++- .../test/microbench/BatchStatementBench.java | 149 +++++++++++++++++++++ 10 files changed, 302 insertions(+), 88 deletions(-) diff --cc CHANGES.txt index cb51088,29d6b9f..a946f1d --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,16 -1,9 +1,17 @@@ -3.11.10 +4.0-beta5 + * Upgrade netty and chronicle-queue dependencies to get Auditing and native library loading working on arm64 architectures (CASSANDRA-16384,CASSANDRA-16392) + * Release StreamingTombstoneHistogramBuilder spool when switching writers (CASSANDRA-14834) + * Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318) + * Fix client notifications in CQL protocol v5 (CASSANDRA-16353) + * Too defensive check when picking sstables for preview repair (CASSANDRA-16284) + * Ensure pre-negotiation native protocol responses have correct stream id (CASSANDRA-16376) + * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279) + * SSLFactory should initialize SSLContext before setting protocols (CASSANDRA-16362) + * Restore sasi dependencies jflex, snowball-stemmer, and concurrent-trees, in the cassandra-all pom (CASSANDRA-16303) + * Fix DecimalDeserializer#toString OOM (CASSANDRA-14925) +Merged from 3.11: + * Reduce amount of allocations during batch statement execution (CASSANDRA-16201) * Update jflex-1.6.0.jar to match upstream (CASSANDRA-16393) - * Fix DecimalDeserializer#toString OOM (CASSANDRA-14925) - * Rate limit validation compactions using compaction_throughput_mb_per_sec (CASSANDRA-16161) - * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to default of 1GB (CASSANDRA-16071) Merged from 3.0: * Prevent unbounded number of pending flushing tasks (CASSANDRA-16261) * Improve empty hint file handling during startup (CASSANDRA-16162) diff --cc src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java index 427c396,2bbda38..7a5d5b9 --- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java @@@ -51,6 -49,8 +51,8 @@@ final class RestrictionSet implements R } }; - private static final TreeMap<ColumnDefinition, SingleRestriction> EMPTY = new TreeMap<>(COLUMN_DEFINITION_COMPARATOR); ++ private static final TreeMap<ColumnMetadata, SingleRestriction> EMPTY = new TreeMap<>(COLUMN_DEFINITION_COMPARATOR); + /** * The restrictions per column. */ @@@ -61,13 -61,26 +63,26 @@@ */ private final boolean hasMultiColumnRestrictions; + private final boolean hasIn; + private final boolean hasContains; + private final boolean hasSlice; + private final boolean hasOnlyEqualityRestrictions; + public RestrictionSet() { - this(new TreeMap<ColumnMetadata, SingleRestriction>(COLUMN_DEFINITION_COMPARATOR), false); + this(EMPTY, false, + false, + false, + false, + true); } - private RestrictionSet(TreeMap<ColumnDefinition, SingleRestriction> restrictions, + private RestrictionSet(TreeMap<ColumnMetadata, SingleRestriction> restrictions, - boolean hasMultiColumnRestrictions) + boolean hasMultiColumnRestrictions, + boolean hasIn, + boolean hasContains, + boolean hasSlice, + boolean hasOnlyEqualityRestrictions) { this.restrictions = restrictions; this.hasMultiColumnRestrictions = hasMultiColumnRestrictions; @@@ -129,14 -146,25 +148,25 @@@ public RestrictionSet addRestriction(SingleRestriction restriction) { // RestrictionSet is immutable so we need to clone the restrictions map. - TreeMap<ColumnMetadata, SingleRestriction> newRestrictions = new TreeMap<>(this.restrictions); - return new RestrictionSet(mergeRestrictions(newRestrictions, restriction), hasMultiColumnRestrictions || restriction.isMultiColumn()); - TreeMap<ColumnDefinition, SingleRestriction> newRestricitons = new TreeMap<>(this.restrictions); ++ TreeMap<ColumnMetadata, SingleRestriction> newRestricitons = new TreeMap<>(this.restrictions); + + boolean newHasIn = hasIn || restriction.isIN(); + boolean newHasContains = hasContains || restriction.isContains(); + boolean newHasSlice = hasSlice || restriction.isSlice(); + boolean newHasOnlyEqualityRestrictions = hasOnlyEqualityRestrictions && (restriction.isEQ() || restriction.isIN()); + + return new RestrictionSet(mergeRestrictions(newRestricitons, restriction), + hasMultiColumnRestrictions || restriction.isMultiColumn(), + newHasIn, + newHasContains, + newHasSlice, + newHasOnlyEqualityRestrictions); } - private TreeMap<ColumnDefinition, SingleRestriction> mergeRestrictions(TreeMap<ColumnDefinition, SingleRestriction> restrictions, - SingleRestriction restriction) + private TreeMap<ColumnMetadata, SingleRestriction> mergeRestrictions(TreeMap<ColumnMetadata, SingleRestriction> restrictions, + SingleRestriction restriction) { - Collection<ColumnDefinition> columnDefs = restriction.getColumnDefs(); + Collection<ColumnMetadata> columnDefs = restriction.getColumnDefs(); Set<SingleRestriction> existingRestrictions = getRestrictions(columnDefs); if (existingRestrictions.isEmpty()) diff --cc src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 2cf2660,caf8c97..80bd437 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@@ -261,26 -219,36 +263,49 @@@ public class BatchStatement implements return statements; } - private List<? extends IMutation> getMutations(BatchQueryOptions options, - boolean local, - long batchTimestamp, - int nowInSeconds, - long queryStartNanoTime) + @VisibleForTesting - public Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now, long queryStartNanoTime) - throws RequestExecutionException, RequestValidationException ++ public List<? extends IMutation> getMutations(BatchQueryOptions options, ++ boolean local, ++ long batchTimestamp, ++ int nowInSeconds, ++ long queryStartNanoTime) { - Set<String> tablesWithZeroGcGs = null; ++ if (statements.isEmpty()) ++ return Collections.emptyList(); + List<List<ByteBuffer>> partitionKeys = new ArrayList<>(statements.size()); - Map<UUID, HashMultiset<ByteBuffer>> partitionCounts = Maps.newHashMapWithExpectedSize(updatedColumns.size()); ++ Map<TableId, HashMultiset<ByteBuffer>> partitionCounts = new HashMap<>(updatedColumns.size()); ++ TableMetadata metadata = statements.get(0).metadata; + for (int i = 0, isize = statements.size(); i < isize; i++) + { + ModificationStatement stmt = statements.get(i); ++ if (metadata != null && !stmt.metadata.id.equals(metadata.id)) ++ metadata = null; + List<ByteBuffer> stmtPartitionKeys = stmt.buildPartitionKeyNames(options.forStatement(i)); + partitionKeys.add(stmtPartitionKeys); - HashMultiset<ByteBuffer> perKeyCountsForTable = partitionCounts.computeIfAbsent(stmt.cfm.cfId, k -> HashMultiset.create()); ++ HashMultiset<ByteBuffer> perKeyCountsForTable = partitionCounts.computeIfAbsent(stmt.metadata.id, k -> HashMultiset.create()); + for (int stmtIdx = 0, stmtSize = stmtPartitionKeys.size(); stmtIdx < stmtSize; stmtIdx++) + perKeyCountsForTable.add(stmtPartitionKeys.get(stmtIdx)); + } + - UpdatesCollector collector = new UpdatesCollector(updatedColumns, partitionCounts); - for (int i = 0; i < statements.size(); i++) + Set<String> tablesWithZeroGcGs = null; - BatchUpdatesCollector collector = new BatchUpdatesCollector(updatedColumns, updatedRows()); - for (int i = 0; i < statements.size(); i++) ++ UpdatesCollector collector; ++ if (metadata != null) ++ collector = new SingleTableUpdatesCollector(metadata, updatedColumns.get(metadata.id), partitionCounts.get(metadata.id)); ++ else ++ collector = new BatchUpdatesCollector(updatedColumns, partitionCounts); ++ ++ for (int i = 0, isize = statements.size(); i < isize; i++) { ModificationStatement statement = statements.get(i); - if (isLogged() && statement.cfm.params.gcGraceSeconds == 0) + if (isLogged() && statement.metadata().params.gcGraceSeconds == 0) { if (tablesWithZeroGcGs == null) tablesWithZeroGcGs = new HashSet<>(); - tablesWithZeroGcGs.add(String.format("%s.%s", statement.cfm.ksName, statement.cfm.cfName)); + tablesWithZeroGcGs.add(statement.metadata.toString()); } QueryOptions statementOptions = options.forStatement(i); - long timestamp = attrs.getTimestamp(now, statementOptions); - statement.addUpdates(collector, partitionKeys.get(i), statementOptions, local, timestamp, queryStartNanoTime); + long timestamp = attrs.getTimestamp(batchTimestamp, statementOptions); - statement.addUpdates(collector, statementOptions, local, timestamp, nowInSeconds, queryStartNanoTime); ++ statement.addUpdates(collector, partitionKeys.get(i), statementOptions, local, timestamp, nowInSeconds, queryStartNanoTime); } if (tablesWithZeroGcGs != null) diff --cc src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java index 8f70ffc,0000000..cb88bdd mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java @@@ -1,273 -1,0 +1,297 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements; + +import java.nio.ByteBuffer; +import java.util.*; + ++import com.google.common.collect.HashMultiset; +import com.google.common.collect.ImmutableMap; ++import com.google.common.collect.Maps; + +import org.apache.cassandra.db.virtual.VirtualMutation; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.PartitionUpdate; + ++import static org.apache.cassandra.utils.MonotonicClock.approxTime; ++ +/** + * Utility class to collect updates. + * + * <p>In a batch statement we don't want to recreate mutations every time as this is particularly inefficient when + * applying multiple batch to the same partition (see #6737). </p> + * + */ +final class BatchUpdatesCollector implements UpdatesCollector +{ + /** + * The columns that will be updated for each table (keyed by the table ID). + */ + private final Map<TableId, RegularAndStaticColumns> updatedColumns; + + /** - * The estimated number of updated row. ++ * The number of updated rows per table and key. + */ - private final int updatedRows; ++ private final Map<TableId, HashMultiset<ByteBuffer>> perPartitionKeyCounts; + + /** + * The mutations per keyspace. ++ * ++ * optimised for the common single-keyspace case ++ * ++ * Key is keyspace name, then we have an IMutationBuilder for each touched partition key in that keyspace ++ * ++ * MutationBuilder holds a PartitionUpdate.Builder + */ - private final Map<String, Map<ByteBuffer, IMutationBuilder>> mutationBuilders = new HashMap<>(); ++ private final Map<String, Map<ByteBuffer, IMutationBuilder>> mutationBuilders = Maps.newHashMapWithExpectedSize(1); ++ + - BatchUpdatesCollector(Map<TableId, RegularAndStaticColumns> updatedColumns, int updatedRows) ++ BatchUpdatesCollector(Map<TableId, RegularAndStaticColumns> updatedColumns, Map<TableId, HashMultiset<ByteBuffer>> perPartitionKeyCounts) + { + super(); + this.updatedColumns = updatedColumns; - this.updatedRows = updatedRows; ++ this.perPartitionKeyCounts = perPartitionKeyCounts; + } + + /** + * Gets the <code>PartitionUpdate.Builder</code> for the specified column family and key. If the builder does not + * exist it will be created. + * + * @param metadata the column family meta data + * @param dk the partition key + * @param consistency the consistency level + * @return the <code>PartitionUpdate.Builder</code> for the specified column family and key + */ + public PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency) + { + IMutationBuilder mut = getMutationBuilder(metadata, dk, consistency); + PartitionUpdate.Builder upd = mut.get(metadata.id); + if (upd == null) + { + RegularAndStaticColumns columns = updatedColumns.get(metadata.id); + assert columns != null; - upd = new PartitionUpdate.Builder(metadata, dk, columns, updatedRows); ++ upd = new PartitionUpdate.Builder(metadata, dk, columns, perPartitionKeyCounts.get(metadata.id).count(dk.getKey())); + mut.add(upd); + } + return upd; + } + + private IMutationBuilder getMutationBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency) + { - return keyspaceMap(metadata.keyspace).computeIfAbsent(dk.getKey(), k -> makeMutationBuilder(metadata, dk, consistency)); ++ Map<ByteBuffer, IMutationBuilder> ksMap = keyspaceMap(metadata.keyspace); ++ IMutationBuilder mutationBuilder = ksMap.get(dk.getKey()); ++ if (mutationBuilder == null) ++ { ++ mutationBuilder = makeMutationBuilder(metadata, dk, consistency); ++ ksMap.put(dk.getKey(), mutationBuilder); ++ } ++ return mutationBuilder; + } + + private IMutationBuilder makeMutationBuilder(TableMetadata metadata, DecoratedKey partitionKey, ConsistencyLevel cl) + { + if (metadata.isVirtual()) + { + return new VirtualMutationBuilder(metadata.keyspace, partitionKey); + } + else + { - MutationBuilder builder = new MutationBuilder(metadata.keyspace, partitionKey); ++ MutationBuilder builder = new MutationBuilder(metadata.keyspace, partitionKey, 1); + return metadata.isCounter() ? new CounterMutationBuilder(builder, cl) : builder; + } + } + + /** + * Returns a collection containing all the mutations. + * @return a collection containing all the mutations. + */ + public List<IMutation> toMutations() + { - //TODO: The case where all statement where on the same keyspace is pretty common, optimize for that? + List<IMutation> ms = new ArrayList<>(); + for (Map<ByteBuffer, IMutationBuilder> ksMap : mutationBuilders.values()) + { + for (IMutationBuilder builder : ksMap.values()) + { + IMutation mutation = builder.build(); + mutation.validateIndexedColumns(); + ms.add(mutation); + } + } + return ms; + } + + /** + * Returns the key-mutation mappings for the specified keyspace. + * + * @param ksName the keyspace name + * @return the key-mutation mappings for the specified keyspace. + */ + private Map<ByteBuffer, IMutationBuilder> keyspaceMap(String ksName) + { - return mutationBuilders.computeIfAbsent(ksName, k -> new HashMap<>()); ++ Map<ByteBuffer, IMutationBuilder> ksMap = mutationBuilders.get(ksName); ++ if (ksMap == null) ++ { ++ ksMap = Maps.newHashMapWithExpectedSize(1); ++ mutationBuilders.put(ksName, ksMap); ++ } ++ return ksMap; + } + + private interface IMutationBuilder + { + /** + * Add a new PartitionUpdate builder to this mutation builder + * @param builder the builder to add + * @return this + */ + IMutationBuilder add(PartitionUpdate.Builder builder); + + /** + * Build the immutable mutation + */ + IMutation build(); + + /** + * Get the builder for the given tableId + */ + PartitionUpdate.Builder get(TableId tableId); + } + + private static class MutationBuilder implements IMutationBuilder + { - private final HashMap<TableId, PartitionUpdate.Builder> modifications = new HashMap<>(); ++ private final Map<TableId, PartitionUpdate.Builder> modifications; + private final DecoratedKey key; + private final String keyspaceName; - private final long createdAt = System.currentTimeMillis(); ++ private final long createdAt = approxTime.now(); + - private MutationBuilder(String keyspaceName, DecoratedKey key) ++ private MutationBuilder(String keyspaceName, DecoratedKey key, int initialSize) + { + this.keyspaceName = keyspaceName; + this.key = key; ++ this.modifications = Maps.newHashMapWithExpectedSize(initialSize); + } + + public MutationBuilder add(PartitionUpdate.Builder updateBuilder) + { + assert updateBuilder != null; + assert updateBuilder.partitionKey().getPartitioner() == key.getPartitioner(); + PartitionUpdate.Builder prev = modifications.put(updateBuilder.metadata().id, updateBuilder); + if (prev != null) + // developer error + throw new IllegalArgumentException("Table " + updateBuilder.metadata().name + " already has modifications in this mutation: " + prev); + return this; + } + + public Mutation build() + { + ImmutableMap.Builder<TableId, PartitionUpdate> updates = new ImmutableMap.Builder<>(); + for (Map.Entry<TableId, PartitionUpdate.Builder> updateEntry : modifications.entrySet()) + { + PartitionUpdate update = updateEntry.getValue().build(); + updates.put(updateEntry.getKey(), update); + } + return new Mutation(keyspaceName, key, updates.build(), createdAt); + } + + public PartitionUpdate.Builder get(TableId tableId) + { + return modifications.get(tableId); + } + + public DecoratedKey key() + { + return key; + } + + public boolean isEmpty() + { + return modifications.isEmpty(); + } + + public String getKeyspaceName() + { + return keyspaceName; + } + } + + private static class CounterMutationBuilder implements IMutationBuilder + { + private final MutationBuilder mutationBuilder; + private final ConsistencyLevel cl; + + private CounterMutationBuilder(MutationBuilder mutationBuilder, ConsistencyLevel cl) + { + this.mutationBuilder = mutationBuilder; + this.cl = cl; + } + + public IMutationBuilder add(PartitionUpdate.Builder builder) + { + return mutationBuilder.add(builder); + } + + public IMutation build() + { + return new CounterMutation(mutationBuilder.build(), cl); + } + + public PartitionUpdate.Builder get(TableId id) + { + return mutationBuilder.get(id); + } + } + + private static class VirtualMutationBuilder implements IMutationBuilder + { + private final String keyspaceName; + private final DecoratedKey partitionKey; + + private final HashMap<TableId, PartitionUpdate.Builder> modifications = new HashMap<>(); + + private VirtualMutationBuilder(String keyspaceName, DecoratedKey partitionKey) + { + this.keyspaceName = keyspaceName; + this.partitionKey = partitionKey; + } + + @Override + public VirtualMutationBuilder add(PartitionUpdate.Builder builder) + { + PartitionUpdate.Builder prev = modifications.put(builder.metadata().id, builder); + if (null != prev) + throw new IllegalStateException(); + return this; + } + + @Override + public VirtualMutation build() + { + ImmutableMap.Builder<TableId, PartitionUpdate> updates = new ImmutableMap.Builder<>(); + modifications.forEach((tableId, updateBuilder) -> updates.put(tableId, updateBuilder.build())); + return new VirtualMutation(keyspaceName, partitionKey, updates.build()); + } + + @Override + public PartitionUpdate.Builder get(TableId tableId) + { + return modifications.get(tableId); + } + } +} diff --cc src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 49a3f3c,d2e693a..0ba105c --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@@ -20,8 -20,8 +20,10 @@@ package org.apache.cassandra.cql3.state import java.nio.ByteBuffer; import java.util.*; + import com.google.common.collect.HashMultiset; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; ++import com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -364,12 -335,7 +366,8 @@@ public abstract class ModificationState public boolean requiresRead() { + // Lists SET operation incurs a read. - for (Operation op : allOperations()) - if (op.requiresRead()) - return true; - - return false; + return !requiresRead.isEmpty(); } private Map<DecoratedKey, Partition> readRequiredLists(Collection<ByteBuffer> partitionKeys, @@@ -678,26 -625,28 +676,27 @@@ * * @return list of the mutations */ - private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now, long queryStartNanoTime) + private List<? extends IMutation> getMutations(QueryOptions options, + boolean local, + long timestamp, + int nowInSeconds, + long queryStartNanoTime) { - UpdatesCollector collector = new SingleTableUpdatesCollector(metadata, updatedColumns, 1); - addUpdates(collector, options, local, timestamp, nowInSeconds, queryStartNanoTime); + List<ByteBuffer> keys = buildPartitionKeyNames(options); - HashMultiset<ByteBuffer> perPartitionKeyCounts = HashMultiset.create(); - for (int i = 0; i < keys.size(); i++) - perPartitionKeyCounts.add(keys.get(i)); // avoid .addAll since that allocates an iterator - - UpdatesCollector collector = new UpdatesCollector(Collections.singletonMap(cfm.cfId, updatedColumns), Collections.singletonMap(cfm.cfId, perPartitionKeyCounts)); - addUpdates(collector, keys, options, local, now, queryStartNanoTime); - collector.validateIndexedColumns(); - ++ HashMultiset<ByteBuffer> perPartitionKeyCounts = HashMultiset.create(keys); ++ SingleTableUpdatesCollector collector = new SingleTableUpdatesCollector(metadata, updatedColumns, perPartitionKeyCounts); ++ addUpdates(collector, keys, options, local, timestamp, nowInSeconds, queryStartNanoTime); return collector.toMutations(); } final void addUpdates(UpdatesCollector collector, + List<ByteBuffer> keys, QueryOptions options, boolean local, - long now, + long timestamp, + int nowInSeconds, long queryStartNanoTime) { - List<ByteBuffer> keys = buildPartitionKeyNames(options); -- if (hasSlices()) { Slices slices = createSlices(options); diff --cc src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java index 6ef551d,0000000..6dc2d41 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java +++ b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java @@@ -1,105 -1,0 +1,113 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements; + +import java.nio.ByteBuffer; +import java.util.ArrayList; - import java.util.Collection; - import java.util.HashMap; +import java.util.List; +import java.util.Map; + ++import com.google.common.collect.HashMultiset; ++import com.google.common.collect.Maps; ++ +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.CounterMutation; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.virtual.VirtualMutation; +import org.apache.cassandra.schema.TableMetadata; + +/** + * Utility class to collect updates. + */ +final class SingleTableUpdatesCollector implements UpdatesCollector +{ + /** + * the table to be updated + */ + private final TableMetadata metadata; + + /** + * the columns to update + */ + private final RegularAndStaticColumns updatedColumns; + + /** - * The estimated number of updated row. ++ * The number of updated rows per key. + */ - private final int updatedRows; ++ private final HashMultiset<ByteBuffer> perPartitionKeyCounts; + + /** + * the partition update builders per key + */ - private final Map<ByteBuffer, PartitionUpdate.Builder> puBuilders = new HashMap<>(); ++ private final Map<ByteBuffer, PartitionUpdate.Builder> puBuilders; + + /** + * if it is a counter table, we will set this + */ + private ConsistencyLevel counterConsistencyLevel = null; + - SingleTableUpdatesCollector(TableMetadata metadata, RegularAndStaticColumns updatedColumns, int updatedRows) ++ SingleTableUpdatesCollector(TableMetadata metadata, RegularAndStaticColumns updatedColumns, HashMultiset<ByteBuffer> perPartitionKeyCounts) + { + this.metadata = metadata; + this.updatedColumns = updatedColumns; - this.updatedRows = updatedRows; ++ this.perPartitionKeyCounts = perPartitionKeyCounts; ++ this.puBuilders = Maps.newHashMapWithExpectedSize(perPartitionKeyCounts.size()); + } + + public PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency) + { + if (metadata.isCounter()) + counterConsistencyLevel = consistency; - return puBuilders.computeIfAbsent(dk.getKey(), (k) -> new PartitionUpdate.Builder(metadata, dk, updatedColumns, updatedRows)); ++ PartitionUpdate.Builder builder = puBuilders.get(dk.getKey()); ++ if (builder == null) ++ { ++ builder = new PartitionUpdate.Builder(metadata, dk, updatedColumns, perPartitionKeyCounts.count(dk.getKey())); ++ puBuilders.put(dk.getKey(), builder); ++ } ++ return builder; + } + + /** + * Returns a collection containing all the mutations. + * @return a collection containing all the mutations. + */ + public List<IMutation> toMutations() + { - List<IMutation> ms = new ArrayList<>(); ++ List<IMutation> ms = new ArrayList<>(puBuilders.size()); + for (PartitionUpdate.Builder builder : puBuilders.values()) + { + IMutation mutation; + + if (metadata.isVirtual()) + mutation = new VirtualMutation(builder.build()); + else if (metadata.isCounter()) + mutation = new CounterMutation(new Mutation(builder.build()), counterConsistencyLevel); + else + mutation = new Mutation(builder.build()); + + mutation.validateIndexedColumns(); + ms.add(mutation); + } + + return ms; + } +} diff --cc src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index 9a87e7c,86fe990..f67db14 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@@ -74,31 -75,34 +74,32 @@@ public class UpdateStatement extends Mo List<Operation> updates = getRegularOperations(); - // For compact table, when we translate it to thrift, we don't have a row marker. So we don't accept an insert/update - // that only sets the PK unless the is no declared non-PK columns (in the latter we just set the value empty). - - // For a dense layout, when we translate it to thrift, we don't have a row marker. So we don't accept an insert/update - // that only sets the PK unless the is no declared non-PK columns (which we recognize because in that case the compact - // value is of type "EmptyType"). - if ((cfm.isCompactTable() && !cfm.isSuper()) && updates.isEmpty()) + // For compact table, we don't accept an insert/update that only sets the PK unless the is no + // declared non-PK columns (which we recognize because in that case + // the compact value is of type "EmptyType"). + if (metadata().isCompactTable() && updates.isEmpty()) { - checkTrue(CompactTables.hasEmptyCompactValue(cfm), - "Column %s is mandatory for this COMPACT STORAGE table", - cfm.compactValueColumn().name); + TableMetadata.CompactTableMetadata metadata = (TableMetadata.CompactTableMetadata) metadata(); + RequestValidations.checkTrue(metadata.hasEmptyCompactValue(), + "Column %s is mandatory for this COMPACT STORAGE table", + metadata.compactValueColumn); - updates = Collections.<Operation>singletonList(new Constants.Setter(cfm.compactValueColumn(), EMPTY)); + updates = Collections.singletonList(new Constants.Setter(metadata.compactValueColumn, EMPTY)); } - for (Operation op : updates) - op.execute(updateBuilder.partitionKey(), params); + for (int i = 0, isize = updates.size(); i < isize; i++) - updates.get(i).execute(update.partitionKey(), params); ++ updates.get(i).execute(updateBuilder.partitionKey(), params); - update.add(params.buildRow()); + updateBuilder.add(params.buildRow()); } if (updatesStaticRow()) { params.newRow(Clustering.STATIC_CLUSTERING); - for (Operation op : getStaticOperations()) - op.execute(updateBuilder.partitionKey(), params); + List<Operation> staticOps = getStaticOperations(); + for (int i = 0, isize = staticOps.size(); i < isize; i++) - staticOps.get(i).execute(update.partitionKey(), params); - update.add(params.buildRow()); ++ staticOps.get(i).execute(updateBuilder.partitionKey(), params); + updateBuilder.add(params.buildRow()); } } diff --cc src/java/org/apache/cassandra/db/Mutation.java index 0b64620,7f19073..8a1ffc1 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@@ -65,42 -75,59 +65,51 @@@ public class Mutation implements IMutat public Mutation(PartitionUpdate update) { - this(update.metadata().keyspace, update.partitionKey(), ImmutableMap.of(update.metadata().id, update), approxTime.now()); - this(update.metadata().ksName, update.partitionKey(), Collections.singletonMap(update.metadata().cfId, update)); ++ this(update.metadata().keyspace, update.partitionKey(), ImmutableMap.of(update.metadata().id, update), approxTime.now(), update.metadata().params.cdc); } - protected Mutation(String keyspaceName, DecoratedKey key, Map<UUID, PartitionUpdate> modifications) + public Mutation(String keyspaceName, DecoratedKey key, ImmutableMap<TableId, PartitionUpdate> modifications, long approxCreatedAtNanos) { - this(keyspaceName, key, modifications, System.currentTimeMillis()); ++ this(keyspaceName, key, modifications, approxCreatedAtNanos, cdcEnabled(modifications.values())); + } + - private Mutation(String keyspaceName, DecoratedKey key, Map<UUID, PartitionUpdate> modifications, long createdAt) - { - this(keyspaceName, key, modifications, createdAt, cdcEnabled(modifications)); - } - - private Mutation(String keyspaceName, DecoratedKey key, Map<UUID, PartitionUpdate> modifications, long createdAt, boolean cdcEnabled) ++ public Mutation(String keyspaceName, DecoratedKey key, ImmutableMap<TableId, PartitionUpdate> modifications, long approxCreatedAtNanos, boolean cdcEnabled) + { this.keyspaceName = keyspaceName; this.key = key; this.modifications = modifications; + this.cdcEnabled = cdcEnabled; - this.createdAt = createdAt; - } - - private static boolean cdcEnabled(Map<UUID, PartitionUpdate> modifications) - { - boolean cdcEnabled = false; - for (PartitionUpdate pu : modifications.values()) - cdcEnabled |= pu.metadata().params.cdc; - return cdcEnabled; ++ this.approxCreatedAtNanos = approxCreatedAtNanos; + } - public Mutation copy() ++ private static boolean cdcEnabled(Iterable<PartitionUpdate> modifications) + { - return new Mutation(keyspaceName, key, new HashMap<>(modifications)); + boolean cdc = false; - for (PartitionUpdate pu : modifications.values()) ++ for (PartitionUpdate pu : modifications) + cdc |= pu.metadata().params.cdc; - this.cdcEnabled = cdc; - this.approxCreatedAtNanos = approxCreatedAtNanos; ++ return cdc; } - public Mutation without(Set<UUID> cfIds) + public Mutation without(Set<TableId> tableIds) { - if (cfIds.isEmpty()) + if (tableIds.isEmpty()) return this; - Mutation copy = copy(); - copy.modifications.keySet().removeAll(cfIds); - - copy.cdcEnabled = false; - for (PartitionUpdate pu : modifications.values()) - copy.cdcEnabled |= pu.metadata().params.cdc; + ImmutableMap.Builder<TableId, PartitionUpdate> builder = new ImmutableMap.Builder<>(); + for (Map.Entry<TableId, PartitionUpdate> update : modifications.entrySet()) + { + if (!tableIds.contains(update.getKey())) + { + builder.put(update); + } + } - return copy; + return new Mutation(keyspaceName, key, builder.build(), approxCreatedAtNanos); } - public Mutation without(UUID cfId) + public Mutation without(TableId tableId) { - return without(Collections.singleton(cfId)); + return without(Collections.singleton(tableId)); } public String getKeyspaceName() diff --cc test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java index 0000000,2a4e1fb..8638006 mode 000000,100644..100644 --- a/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java +++ b/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java @@@ -1,0 -1,147 +1,149 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.test.microbench; + + import java.nio.ByteBuffer; + import java.util.ArrayList; + import java.util.Collection; + import java.util.List; + import java.util.concurrent.TimeUnit; + + import com.google.common.collect.Lists; + -import org.apache.cassandra.config.CFMetaData; + import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; + import org.apache.cassandra.cql3.Attributes; + import org.apache.cassandra.cql3.BatchQueryOptions; ++import org.apache.cassandra.cql3.QueryHandler; + import org.apache.cassandra.cql3.QueryOptions; + import org.apache.cassandra.cql3.QueryProcessor; ++import org.apache.cassandra.cql3.VariableSpecifications; + import org.apache.cassandra.cql3.statements.BatchStatement; + import org.apache.cassandra.cql3.statements.ModificationStatement; -import org.apache.cassandra.cql3.statements.ParsedStatement; ++import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; + import org.apache.cassandra.dht.Murmur3Partitioner; + import org.apache.cassandra.schema.KeyspaceMetadata; + import org.apache.cassandra.schema.KeyspaceParams; ++import org.apache.cassandra.schema.Schema; ++import org.apache.cassandra.schema.TableMetadata; + import org.apache.cassandra.service.QueryState; + import org.apache.cassandra.utils.FBUtilities; + import org.openjdk.jmh.annotations.Benchmark; + import org.openjdk.jmh.annotations.BenchmarkMode; + import org.openjdk.jmh.annotations.Fork; + import org.openjdk.jmh.annotations.Measurement; + import org.openjdk.jmh.annotations.Mode; + import org.openjdk.jmh.annotations.OutputTimeUnit; + import org.openjdk.jmh.annotations.Param; + import org.openjdk.jmh.annotations.Scope; + import org.openjdk.jmh.annotations.Setup; + import org.openjdk.jmh.annotations.State; + import org.openjdk.jmh.annotations.Threads; + import org.openjdk.jmh.annotations.Warmup; + import org.openjdk.jmh.profile.GCProfiler; + import org.openjdk.jmh.results.Result; + import org.openjdk.jmh.results.RunResult; + import org.openjdk.jmh.runner.Runner; + import org.openjdk.jmh.runner.options.Options; + import org.openjdk.jmh.runner.options.OptionsBuilder; + + import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + + + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) + @Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS) + @Fork(value = 1,jvmArgsAppend = "-Xmx512M") + @Threads(1) + @State(Scope.Benchmark) + public class BatchStatementBench + { + static + { - DatabaseDescriptor.clientInitialization(); ++ ++ DatabaseDescriptor.toolInitialization(); + // Partitioner is not set in client mode. + if (DatabaseDescriptor.getPartitioner() == null) + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); + } + + static String keyspace = "keyspace1"; + String table = "tbl"; + + int nowInSec = FBUtilities.nowInSeconds(); + long queryStartTime = System.nanoTime(); + BatchStatement bs; + BatchQueryOptions bqo; + + @Param({"true", "false"}) + boolean uniquePartition; + + @Param({"10000"}) + int batchSize; + + @Setup + public void setup() throws Throwable + { + Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1))); - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace); - CFMetaData metadata = CFMetaData.compile(String.format("CREATE TABLE %s (id int, ck int, v int, primary key (id, ck))", table), keyspace); ++ KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(keyspace); ++ TableMetadata metadata = CreateTableStatement.parse(String.format("CREATE TABLE %s (id int, ck int, v int, primary key (id, ck))", table), keyspace).build(); + - Schema.instance.load(metadata); - Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(metadata))); ++ Schema.instance.load(ksm.withSwapped(ksm.tables.with(metadata))); + + List<ModificationStatement> modifications = new ArrayList<>(batchSize); + List<List<ByteBuffer>> parameters = new ArrayList<>(batchSize); + List<Object> queryOrIdList = new ArrayList<>(batchSize); - ParsedStatement.Prepared prepared = QueryProcessor.parseStatement(String.format("INSERT INTO %s.%s (id, ck, v) VALUES (?,?,?)", keyspace, table), QueryState.forInternalCalls()); ++ QueryHandler.Prepared prepared = QueryProcessor.prepareInternal(String.format("INSERT INTO %s.%s (id, ck, v) VALUES (?,?,?)", keyspace, table)); + + for (int i = 0; i < batchSize; i++) + { + modifications.add((ModificationStatement) prepared.statement); + parameters.add(Lists.newArrayList(bytes(uniquePartition ? i : 1), bytes(i), bytes(i))); + queryOrIdList.add(prepared.rawCQLStatement); + } - bs = new BatchStatement(3, BatchStatement.Type.UNLOGGED, modifications, Attributes.none()); ++ bs = new BatchStatement(BatchStatement.Type.UNLOGGED, VariableSpecifications.empty(), modifications, Attributes.none()); + bqo = BatchQueryOptions.withPerStatementVariables(QueryOptions.DEFAULT, parameters, queryOrIdList); + } + + @Benchmark + public void bench() + { - bs.getMutations(bqo, false, nowInSec, queryStartTime); ++ bs.getMutations(bqo, false, nowInSec, nowInSec, queryStartTime); + } + + + public static void main(String... args) throws Exception { + Options opts = new OptionsBuilder() + .include(".*"+BatchStatementBench.class.getSimpleName()+".*") + .jvmArgs("-server") + .forks(1) + .mode(Mode.Throughput) + .addProfiler(GCProfiler.class) + .build(); + + Collection<RunResult> records = new Runner(opts).run(); + for ( RunResult result : records) { + Result r = result.getPrimaryResult(); + System.out.println("API replied benchmark score: " + + r.getScore() + " " + + r.getScoreUnit() + " over " + + r.getStatistics().getN() + " iterations"); + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org