This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 8ae2b094654aac6a2f2d8a79a1aa08a7215c8621
Merge: 7f1659c 8333d0b
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Fri Jan 22 10:56:20 2021 +0100

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 .../PartitionKeySingleRestrictionSet.java          |   7 +-
 .../cql3/restrictions/RestrictionSet.java          |  66 +++++----
 .../cassandra/cql3/statements/BatchStatement.java  |  48 ++++---
 .../cql3/statements/BatchUpdatesCollector.java     |  50 +++++--
 .../cql3/statements/ModificationStatement.java     |  17 ++-
 .../statements/SingleTableUpdatesCollector.java    |  26 ++--
 .../cassandra/cql3/statements/UpdateStatement.java |   9 +-
 src/java/org/apache/cassandra/db/Mutation.java     |  17 ++-
 .../test/microbench/BatchStatementBench.java       | 149 +++++++++++++++++++++
 10 files changed, 302 insertions(+), 88 deletions(-)

diff --cc CHANGES.txt
index cb51088,29d6b9f..a946f1d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,16 -1,9 +1,17 @@@
 -3.11.10
 +4.0-beta5
 + * Upgrade netty and chronicle-queue dependencies to get Auditing and native 
library loading working on arm64 architectures (CASSANDRA-16384,CASSANDRA-16392)
 + * Release StreamingTombstoneHistogramBuilder spool when switching writers 
(CASSANDRA-14834)
 + * Correct memtable on-heap size calculations to match actual use 
(CASSANDRA-16318)
 + * Fix client notifications in CQL protocol v5 (CASSANDRA-16353)
 + * Too defensive check when picking sstables for preview repair 
(CASSANDRA-16284)
 + * Ensure pre-negotiation native protocol responses have correct stream id 
(CASSANDRA-16376)
 + * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279)
 + * SSLFactory should initialize SSLContext before setting protocols 
(CASSANDRA-16362)
 + * Restore sasi dependencies jflex, snowball-stemmer, and concurrent-trees, 
in the cassandra-all pom (CASSANDRA-16303)
 + * Fix DecimalDeserializer#toString OOM (CASSANDRA-14925)
 +Merged from 3.11:
+  * Reduce amount of allocations during batch statement execution 
(CASSANDRA-16201)
   * Update jflex-1.6.0.jar to match upstream (CASSANDRA-16393)
 - * Fix DecimalDeserializer#toString OOM (CASSANDRA-14925)
 - * Rate limit validation compactions using compaction_throughput_mb_per_sec 
(CASSANDRA-16161)
 - * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to 
default of 1GB (CASSANDRA-16071)
  Merged from 3.0:
   * Prevent unbounded number of pending flushing tasks (CASSANDRA-16261)
   * Improve empty hint file handling during startup (CASSANDRA-16162)
diff --cc src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
index 427c396,2bbda38..7a5d5b9
--- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
@@@ -51,6 -49,8 +51,8 @@@ final class RestrictionSet implements R
          }
      };
  
 -    private static final TreeMap<ColumnDefinition, SingleRestriction> EMPTY = 
new TreeMap<>(COLUMN_DEFINITION_COMPARATOR);
++    private static final TreeMap<ColumnMetadata, SingleRestriction> EMPTY = 
new TreeMap<>(COLUMN_DEFINITION_COMPARATOR);
+ 
      /**
       * The restrictions per column.
       */
@@@ -61,13 -61,26 +63,26 @@@
       */
      private final boolean hasMultiColumnRestrictions;
  
+     private final boolean hasIn;
+     private final boolean hasContains;
+     private final boolean hasSlice;
+     private final boolean hasOnlyEqualityRestrictions;
+ 
      public RestrictionSet()
      {
-         this(new TreeMap<ColumnMetadata, 
SingleRestriction>(COLUMN_DEFINITION_COMPARATOR), false);
+         this(EMPTY, false,
+              false,
+              false,
+              false,
+              true);
      }
  
 -    private RestrictionSet(TreeMap<ColumnDefinition, SingleRestriction> 
restrictions,
 +    private RestrictionSet(TreeMap<ColumnMetadata, SingleRestriction> 
restrictions,
-                            boolean hasMultiColumnRestrictions)
+                            boolean hasMultiColumnRestrictions,
+                            boolean hasIn,
+                            boolean hasContains,
+                            boolean hasSlice,
+                            boolean hasOnlyEqualityRestrictions)
      {
          this.restrictions = restrictions;
          this.hasMultiColumnRestrictions = hasMultiColumnRestrictions;
@@@ -129,14 -146,25 +148,25 @@@
      public RestrictionSet addRestriction(SingleRestriction restriction)
      {
          // RestrictionSet is immutable so we need to clone the restrictions 
map.
-         TreeMap<ColumnMetadata, SingleRestriction> newRestrictions = new 
TreeMap<>(this.restrictions);
-         return new RestrictionSet(mergeRestrictions(newRestrictions, 
restriction), hasMultiColumnRestrictions || restriction.isMultiColumn());
 -        TreeMap<ColumnDefinition, SingleRestriction> newRestricitons = new 
TreeMap<>(this.restrictions);
++        TreeMap<ColumnMetadata, SingleRestriction> newRestricitons = new 
TreeMap<>(this.restrictions);
+ 
+         boolean newHasIn = hasIn || restriction.isIN();
+         boolean newHasContains = hasContains || restriction.isContains();
+         boolean newHasSlice = hasSlice || restriction.isSlice();
+         boolean newHasOnlyEqualityRestrictions = hasOnlyEqualityRestrictions 
&& (restriction.isEQ() || restriction.isIN());
+ 
+         return new RestrictionSet(mergeRestrictions(newRestricitons, 
restriction),
+                                   hasMultiColumnRestrictions || 
restriction.isMultiColumn(),
+                                   newHasIn,
+                                   newHasContains,
+                                   newHasSlice,
+                                   newHasOnlyEqualityRestrictions);
      }
  
 -    private TreeMap<ColumnDefinition, SingleRestriction> 
mergeRestrictions(TreeMap<ColumnDefinition, SingleRestriction> restrictions,
 -                                                                           
SingleRestriction restriction)
 +    private TreeMap<ColumnMetadata, SingleRestriction> 
mergeRestrictions(TreeMap<ColumnMetadata, SingleRestriction> restrictions,
 +                                                                         
SingleRestriction restriction)
      {
 -        Collection<ColumnDefinition> columnDefs = restriction.getColumnDefs();
 +        Collection<ColumnMetadata> columnDefs = restriction.getColumnDefs();
          Set<SingleRestriction> existingRestrictions = 
getRestrictions(columnDefs);
  
          if (existingRestrictions.isEmpty())
diff --cc src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 2cf2660,caf8c97..80bd437
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@@ -261,26 -219,36 +263,49 @@@ public class BatchStatement implements 
          return statements;
      }
  
-     private List<? extends IMutation> getMutations(BatchQueryOptions options,
-                                                          boolean local,
-                                                          long batchTimestamp,
-                                                          int nowInSeconds,
-                                                          long 
queryStartNanoTime)
+     @VisibleForTesting
 -    public Collection<? extends IMutation> getMutations(BatchQueryOptions 
options, boolean local, long now, long queryStartNanoTime)
 -    throws RequestExecutionException, RequestValidationException
++    public List<? extends IMutation> getMutations(BatchQueryOptions options,
++                                                  boolean local,
++                                                  long batchTimestamp,
++                                                  int nowInSeconds,
++                                                  long queryStartNanoTime)
      {
 -        Set<String> tablesWithZeroGcGs = null;
++        if (statements.isEmpty())
++            return Collections.emptyList();
+         List<List<ByteBuffer>> partitionKeys = new 
ArrayList<>(statements.size());
 -        Map<UUID, HashMultiset<ByteBuffer>> partitionCounts = 
Maps.newHashMapWithExpectedSize(updatedColumns.size());
++        Map<TableId, HashMultiset<ByteBuffer>> partitionCounts = new 
HashMap<>(updatedColumns.size());
++        TableMetadata metadata = statements.get(0).metadata;
+         for (int i = 0, isize = statements.size(); i < isize; i++)
+         {
+             ModificationStatement stmt = statements.get(i);
++            if (metadata != null && !stmt.metadata.id.equals(metadata.id))
++                metadata = null;
+             List<ByteBuffer> stmtPartitionKeys = 
stmt.buildPartitionKeyNames(options.forStatement(i));
+             partitionKeys.add(stmtPartitionKeys);
 -            HashMultiset<ByteBuffer> perKeyCountsForTable = 
partitionCounts.computeIfAbsent(stmt.cfm.cfId, k -> HashMultiset.create());
++            HashMultiset<ByteBuffer> perKeyCountsForTable = 
partitionCounts.computeIfAbsent(stmt.metadata.id, k -> HashMultiset.create());
+             for (int stmtIdx = 0, stmtSize = stmtPartitionKeys.size(); 
stmtIdx < stmtSize; stmtIdx++)
+                 perKeyCountsForTable.add(stmtPartitionKeys.get(stmtIdx));
+         }
+ 
 -        UpdatesCollector collector = new UpdatesCollector(updatedColumns, 
partitionCounts);
 -        for (int i = 0; i < statements.size(); i++)
 +        Set<String> tablesWithZeroGcGs = null;
-         BatchUpdatesCollector collector = new 
BatchUpdatesCollector(updatedColumns, updatedRows());
-         for (int i = 0; i < statements.size(); i++)
++        UpdatesCollector collector;
++        if (metadata != null)
++            collector = new SingleTableUpdatesCollector(metadata, 
updatedColumns.get(metadata.id), partitionCounts.get(metadata.id));
++        else
++            collector = new BatchUpdatesCollector(updatedColumns, 
partitionCounts);
++
++        for (int i = 0, isize = statements.size(); i < isize; i++)
          {
              ModificationStatement statement = statements.get(i);
 -            if (isLogged() && statement.cfm.params.gcGraceSeconds == 0)
 +            if (isLogged() && statement.metadata().params.gcGraceSeconds == 0)
              {
                  if (tablesWithZeroGcGs == null)
                      tablesWithZeroGcGs = new HashSet<>();
 -                tablesWithZeroGcGs.add(String.format("%s.%s", 
statement.cfm.ksName, statement.cfm.cfName));
 +                tablesWithZeroGcGs.add(statement.metadata.toString());
              }
              QueryOptions statementOptions = options.forStatement(i);
 -            long timestamp = attrs.getTimestamp(now, statementOptions);
 -            statement.addUpdates(collector, partitionKeys.get(i), 
statementOptions, local, timestamp, queryStartNanoTime);
 +            long timestamp = attrs.getTimestamp(batchTimestamp, 
statementOptions);
-             statement.addUpdates(collector, statementOptions, local, 
timestamp, nowInSeconds, queryStartNanoTime);
++            statement.addUpdates(collector, partitionKeys.get(i), 
statementOptions, local, timestamp, nowInSeconds, queryStartNanoTime);
          }
  
          if (tablesWithZeroGcGs != null)
diff --cc 
src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
index 8f70ffc,0000000..cb88bdd
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
@@@ -1,273 -1,0 +1,297 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.statements;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
++import com.google.common.collect.HashMultiset;
 +import com.google.common.collect.ImmutableMap;
++import com.google.common.collect.Maps;
 +
 +import org.apache.cassandra.db.virtual.VirtualMutation;
 +import org.apache.cassandra.schema.TableId;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +
++import static org.apache.cassandra.utils.MonotonicClock.approxTime;
++
 +/**
 + * Utility class to collect updates.
 + *
 + * <p>In a batch statement we don't want to recreate mutations every time as 
this is particularly inefficient when
 + * applying multiple batch to the same partition (see #6737). </p>
 + *
 + */
 +final class BatchUpdatesCollector implements UpdatesCollector
 +{
 +    /**
 +     * The columns that will be updated for each table (keyed by the table 
ID).
 +     */
 +    private final Map<TableId, RegularAndStaticColumns> updatedColumns;
 +
 +    /**
-      * The estimated number of updated row.
++     * The number of updated rows per table and key.
 +     */
-     private final int updatedRows;
++    private final Map<TableId, HashMultiset<ByteBuffer>> 
perPartitionKeyCounts;
 +
 +    /**
 +     * The mutations per keyspace.
++     *
++     * optimised for the common single-keyspace case
++     *
++     * Key is keyspace name, then we have an IMutationBuilder for each 
touched partition key in that keyspace
++     *
++     * MutationBuilder holds a PartitionUpdate.Builder
 +     */
-     private final Map<String, Map<ByteBuffer, IMutationBuilder>> 
mutationBuilders = new HashMap<>();
++    private final Map<String, Map<ByteBuffer, IMutationBuilder>> 
mutationBuilders = Maps.newHashMapWithExpectedSize(1);
++
 +
-     BatchUpdatesCollector(Map<TableId, RegularAndStaticColumns> 
updatedColumns, int updatedRows)
++    BatchUpdatesCollector(Map<TableId, RegularAndStaticColumns> 
updatedColumns, Map<TableId, HashMultiset<ByteBuffer>> perPartitionKeyCounts)
 +    {
 +        super();
 +        this.updatedColumns = updatedColumns;
-         this.updatedRows = updatedRows;
++        this.perPartitionKeyCounts = perPartitionKeyCounts;
 +    }
 +
 +    /**
 +     * Gets the <code>PartitionUpdate.Builder</code> for the specified column 
family and key. If the builder does not
 +     * exist it will be created.
 +     *
 +     * @param metadata the column family meta data
 +     * @param dk the partition key
 +     * @param consistency the consistency level
 +     * @return the <code>PartitionUpdate.Builder</code> for the specified 
column family and key
 +     */
 +    public PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata 
metadata, DecoratedKey dk, ConsistencyLevel consistency)
 +    {
 +        IMutationBuilder mut = getMutationBuilder(metadata, dk, consistency);
 +        PartitionUpdate.Builder upd = mut.get(metadata.id);
 +        if (upd == null)
 +        {
 +            RegularAndStaticColumns columns = updatedColumns.get(metadata.id);
 +            assert columns != null;
-             upd = new PartitionUpdate.Builder(metadata, dk, columns, 
updatedRows);
++            upd = new PartitionUpdate.Builder(metadata, dk, columns, 
perPartitionKeyCounts.get(metadata.id).count(dk.getKey()));
 +            mut.add(upd);
 +        }
 +        return upd;
 +    }
 +
 +    private IMutationBuilder getMutationBuilder(TableMetadata metadata, 
DecoratedKey dk, ConsistencyLevel consistency)
 +    {
-         return keyspaceMap(metadata.keyspace).computeIfAbsent(dk.getKey(), k 
-> makeMutationBuilder(metadata, dk, consistency));
++        Map<ByteBuffer, IMutationBuilder> ksMap = 
keyspaceMap(metadata.keyspace);
++        IMutationBuilder mutationBuilder = ksMap.get(dk.getKey());
++        if (mutationBuilder == null)
++        {
++            mutationBuilder = makeMutationBuilder(metadata, dk, consistency);
++            ksMap.put(dk.getKey(), mutationBuilder);
++        }
++        return mutationBuilder;
 +    }
 +
 +    private IMutationBuilder makeMutationBuilder(TableMetadata metadata, 
DecoratedKey partitionKey, ConsistencyLevel cl)
 +    {
 +        if (metadata.isVirtual())
 +        {
 +            return new VirtualMutationBuilder(metadata.keyspace, 
partitionKey);
 +        }
 +        else
 +        {
-             MutationBuilder builder = new MutationBuilder(metadata.keyspace, 
partitionKey);
++            MutationBuilder builder = new MutationBuilder(metadata.keyspace, 
partitionKey, 1);
 +            return metadata.isCounter() ? new CounterMutationBuilder(builder, 
cl) : builder;
 +        }
 +    }
 +
 +    /**
 +     * Returns a collection containing all the mutations.
 +     * @return a collection containing all the mutations.
 +     */
 +    public List<IMutation> toMutations()
 +    {
-         //TODO: The case where all statement where on the same keyspace is 
pretty common, optimize for that?
 +        List<IMutation> ms = new ArrayList<>();
 +        for (Map<ByteBuffer, IMutationBuilder> ksMap : 
mutationBuilders.values())
 +        {
 +            for (IMutationBuilder builder : ksMap.values())
 +            {
 +                IMutation mutation = builder.build();
 +                mutation.validateIndexedColumns();
 +                ms.add(mutation);
 +            }
 +        }
 +        return ms;
 +    }
 +
 +    /**
 +     * Returns the key-mutation mappings for the specified keyspace.
 +     *
 +     * @param ksName the keyspace name
 +     * @return the key-mutation mappings for the specified keyspace.
 +     */
 +    private Map<ByteBuffer, IMutationBuilder> keyspaceMap(String ksName)
 +    {
-         return mutationBuilders.computeIfAbsent(ksName, k -> new HashMap<>());
++        Map<ByteBuffer, IMutationBuilder> ksMap = 
mutationBuilders.get(ksName);
++        if (ksMap == null)
++        {
++            ksMap = Maps.newHashMapWithExpectedSize(1);
++            mutationBuilders.put(ksName, ksMap);
++        }
++        return ksMap;
 +    }
 +
 +    private interface IMutationBuilder
 +    {
 +        /**
 +         * Add a new PartitionUpdate builder to this mutation builder
 +         * @param builder the builder to add
 +         * @return this
 +         */
 +        IMutationBuilder add(PartitionUpdate.Builder builder);
 +
 +        /**
 +         * Build the immutable mutation
 +         */
 +        IMutation build();
 +
 +        /**
 +         * Get the builder for the given tableId
 +         */
 +        PartitionUpdate.Builder get(TableId tableId);
 +    }
 +
 +    private static class MutationBuilder implements IMutationBuilder
 +    {
-         private final HashMap<TableId, PartitionUpdate.Builder> modifications 
= new HashMap<>();
++        private final Map<TableId, PartitionUpdate.Builder> modifications;
 +        private final DecoratedKey key;
 +        private final String keyspaceName;
-         private final long createdAt = System.currentTimeMillis();
++        private final long createdAt = approxTime.now();
 +
-         private MutationBuilder(String keyspaceName, DecoratedKey key)
++        private MutationBuilder(String keyspaceName, DecoratedKey key, int 
initialSize)
 +        {
 +            this.keyspaceName = keyspaceName;
 +            this.key = key;
++            this.modifications = Maps.newHashMapWithExpectedSize(initialSize);
 +        }
 +
 +        public MutationBuilder add(PartitionUpdate.Builder updateBuilder)
 +        {
 +            assert updateBuilder != null;
 +            assert updateBuilder.partitionKey().getPartitioner() == 
key.getPartitioner();
 +            PartitionUpdate.Builder prev = 
modifications.put(updateBuilder.metadata().id, updateBuilder);
 +            if (prev != null)
 +                // developer error
 +                throw new IllegalArgumentException("Table " + 
updateBuilder.metadata().name + " already has modifications in this mutation: " 
+ prev);
 +            return this;
 +        }
 +
 +        public Mutation build()
 +        {
 +            ImmutableMap.Builder<TableId, PartitionUpdate> updates = new 
ImmutableMap.Builder<>();
 +            for (Map.Entry<TableId, PartitionUpdate.Builder> updateEntry : 
modifications.entrySet())
 +            {
 +                PartitionUpdate update = updateEntry.getValue().build();
 +                updates.put(updateEntry.getKey(), update);
 +            }
 +            return new Mutation(keyspaceName, key, updates.build(), 
createdAt);
 +        }
 +
 +        public PartitionUpdate.Builder get(TableId tableId)
 +        {
 +            return modifications.get(tableId);
 +        }
 +
 +        public DecoratedKey key()
 +        {
 +            return key;
 +        }
 +
 +        public boolean isEmpty()
 +        {
 +            return modifications.isEmpty();
 +        }
 +
 +        public String getKeyspaceName()
 +        {
 +            return keyspaceName;
 +        }
 +    }
 +
 +    private static class CounterMutationBuilder implements IMutationBuilder
 +    {
 +        private final MutationBuilder mutationBuilder;
 +        private final ConsistencyLevel cl;
 +
 +        private CounterMutationBuilder(MutationBuilder mutationBuilder, 
ConsistencyLevel cl)
 +        {
 +            this.mutationBuilder = mutationBuilder;
 +            this.cl = cl;
 +        }
 +
 +        public IMutationBuilder add(PartitionUpdate.Builder builder)
 +        {
 +            return mutationBuilder.add(builder);
 +        }
 +
 +        public IMutation build()
 +        {
 +            return new CounterMutation(mutationBuilder.build(), cl);
 +        }
 +
 +        public PartitionUpdate.Builder get(TableId id)
 +        {
 +            return mutationBuilder.get(id);
 +        }
 +    }
 +
 +    private static class VirtualMutationBuilder implements IMutationBuilder
 +    {
 +        private final String keyspaceName;
 +        private final DecoratedKey partitionKey;
 +
 +        private final HashMap<TableId, PartitionUpdate.Builder> modifications 
= new HashMap<>();
 +
 +        private VirtualMutationBuilder(String keyspaceName, DecoratedKey 
partitionKey)
 +        {
 +            this.keyspaceName = keyspaceName;
 +            this.partitionKey = partitionKey;
 +        }
 +
 +        @Override
 +        public VirtualMutationBuilder add(PartitionUpdate.Builder builder)
 +        {
 +            PartitionUpdate.Builder prev = 
modifications.put(builder.metadata().id, builder);
 +            if (null != prev)
 +                throw new IllegalStateException();
 +            return this;
 +        }
 +
 +        @Override
 +        public VirtualMutation build()
 +        {
 +            ImmutableMap.Builder<TableId, PartitionUpdate> updates = new 
ImmutableMap.Builder<>();
 +            modifications.forEach((tableId, updateBuilder) -> 
updates.put(tableId, updateBuilder.build()));
 +            return new VirtualMutation(keyspaceName, partitionKey, 
updates.build());
 +        }
 +
 +        @Override
 +        public PartitionUpdate.Builder get(TableId tableId)
 +        {
 +            return modifications.get(tableId);
 +        }
 +    }
 +}
diff --cc 
src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 49a3f3c,d2e693a..0ba105c
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@@ -20,8 -20,8 +20,10 @@@ package org.apache.cassandra.cql3.state
  import java.nio.ByteBuffer;
  import java.util.*;
  
+ import com.google.common.collect.HashMultiset;
 +import com.google.common.collect.ImmutableList;
  import com.google.common.collect.Iterables;
++import com.google.common.collect.Maps;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -364,12 -335,7 +366,8 @@@ public abstract class ModificationState
  
      public boolean requiresRead()
      {
 +        // Lists SET operation incurs a read.
-         for (Operation op : allOperations())
-             if (op.requiresRead())
-                 return true;
- 
-         return false;
+         return !requiresRead.isEmpty();
      }
  
      private Map<DecoratedKey, Partition> 
readRequiredLists(Collection<ByteBuffer> partitionKeys,
@@@ -678,26 -625,28 +676,27 @@@
       *
       * @return list of the mutations
       */
 -    private Collection<? extends IMutation> getMutations(QueryOptions 
options, boolean local, long now, long queryStartNanoTime)
 +    private List<? extends IMutation> getMutations(QueryOptions options,
 +                                                         boolean local,
 +                                                         long timestamp,
 +                                                         int nowInSeconds,
 +                                                         long 
queryStartNanoTime)
      {
-         UpdatesCollector collector = new 
SingleTableUpdatesCollector(metadata, updatedColumns, 1);
-         addUpdates(collector, options, local, timestamp, nowInSeconds, 
queryStartNanoTime);
+         List<ByteBuffer> keys = buildPartitionKeyNames(options);
 -        HashMultiset<ByteBuffer> perPartitionKeyCounts = 
HashMultiset.create();
 -        for (int i = 0; i < keys.size(); i++)
 -            perPartitionKeyCounts.add(keys.get(i)); // avoid .addAll since 
that allocates an iterator
 -
 -        UpdatesCollector collector = new 
UpdatesCollector(Collections.singletonMap(cfm.cfId, updatedColumns), 
Collections.singletonMap(cfm.cfId, perPartitionKeyCounts));
 -        addUpdates(collector, keys, options, local, now, queryStartNanoTime);
 -        collector.validateIndexedColumns();
 -
++        HashMultiset<ByteBuffer> perPartitionKeyCounts = 
HashMultiset.create(keys);
++        SingleTableUpdatesCollector collector = new 
SingleTableUpdatesCollector(metadata, updatedColumns, perPartitionKeyCounts);
++        addUpdates(collector, keys, options, local, timestamp, nowInSeconds, 
queryStartNanoTime);
          return collector.toMutations();
      }
  
      final void addUpdates(UpdatesCollector collector,
+                           List<ByteBuffer> keys,
                            QueryOptions options,
                            boolean local,
 -                          long now,
 +                          long timestamp,
 +                          int nowInSeconds,
                            long queryStartNanoTime)
      {
-         List<ByteBuffer> keys = buildPartitionKeyNames(options);
--
          if (hasSlices())
          {
              Slices slices = createSlices(options);
diff --cc 
src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
index 6ef551d,0000000..6dc2d41
mode 100644,000000..100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
@@@ -1,105 -1,0 +1,113 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.statements;
 +
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
- import java.util.Collection;
- import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +
++import com.google.common.collect.HashMultiset;
++import com.google.common.collect.Maps;
++
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.CounterMutation;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.IMutation;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.RegularAndStaticColumns;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.virtual.VirtualMutation;
 +import org.apache.cassandra.schema.TableMetadata;
 +
 +/**
 + * Utility class to collect updates.
 + */
 +final class SingleTableUpdatesCollector implements UpdatesCollector
 +{
 +    /**
 +     * the table to be updated
 +     */
 +    private final TableMetadata metadata;
 +
 +    /**
 +     * the columns to update
 +     */
 +    private final RegularAndStaticColumns updatedColumns;
 +
 +    /**
-      * The estimated number of updated row.
++     * The number of updated rows per key.
 +     */
-     private final int updatedRows;
++    private final HashMultiset<ByteBuffer> perPartitionKeyCounts;
 +
 +    /**
 +     * the partition update builders per key
 +     */
-     private final Map<ByteBuffer, PartitionUpdate.Builder> puBuilders = new 
HashMap<>();
++    private final Map<ByteBuffer, PartitionUpdate.Builder> puBuilders;
 +
 +    /**
 +     * if it is a counter table, we will set this
 +     */
 +    private ConsistencyLevel counterConsistencyLevel = null;
 +
-     SingleTableUpdatesCollector(TableMetadata metadata, 
RegularAndStaticColumns updatedColumns, int updatedRows)
++    SingleTableUpdatesCollector(TableMetadata metadata, 
RegularAndStaticColumns updatedColumns, HashMultiset<ByteBuffer> 
perPartitionKeyCounts)
 +    {
 +        this.metadata = metadata;
 +        this.updatedColumns = updatedColumns;
-         this.updatedRows = updatedRows;
++        this.perPartitionKeyCounts = perPartitionKeyCounts;
++        this.puBuilders = 
Maps.newHashMapWithExpectedSize(perPartitionKeyCounts.size());
 +    }
 +
 +    public PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata 
metadata, DecoratedKey dk, ConsistencyLevel consistency)
 +    {
 +        if (metadata.isCounter())
 +            counterConsistencyLevel = consistency;
-         return puBuilders.computeIfAbsent(dk.getKey(), (k) -> new 
PartitionUpdate.Builder(metadata, dk, updatedColumns, updatedRows));
++        PartitionUpdate.Builder builder = puBuilders.get(dk.getKey());
++        if (builder == null)
++        {
++            builder = new PartitionUpdate.Builder(metadata, dk, 
updatedColumns, perPartitionKeyCounts.count(dk.getKey()));
++            puBuilders.put(dk.getKey(), builder);
++        }
++        return builder;
 +    }
 +
 +    /**
 +     * Returns a collection containing all the mutations.
 +     * @return a collection containing all the mutations.
 +     */
 +    public List<IMutation> toMutations()
 +    {
-         List<IMutation> ms = new ArrayList<>();
++        List<IMutation> ms = new ArrayList<>(puBuilders.size());
 +        for (PartitionUpdate.Builder builder : puBuilders.values())
 +        {
 +            IMutation mutation;
 +
 +            if (metadata.isVirtual())
 +                mutation = new VirtualMutation(builder.build());
 +            else if (metadata.isCounter())
 +                mutation = new CounterMutation(new Mutation(builder.build()), 
counterConsistencyLevel);
 +            else
 +                mutation = new Mutation(builder.build());
 +
 +            mutation.validateIndexedColumns();
 +            ms.add(mutation);
 +        }
 +
 +        return ms;
 +    }
 +}
diff --cc src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 9a87e7c,86fe990..f67db14
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@@ -74,31 -75,34 +74,32 @@@ public class UpdateStatement extends Mo
  
              List<Operation> updates = getRegularOperations();
  
 -            // For compact table, when we translate it to thrift, we don't 
have a row marker. So we don't accept an insert/update
 -            // that only sets the PK unless the is no declared non-PK columns 
(in the latter we just set the value empty).
 -
 -            // For a dense layout, when we translate it to thrift, we don't 
have a row marker. So we don't accept an insert/update
 -            // that only sets the PK unless the is no declared non-PK columns 
(which we recognize because in that case the compact
 -            // value is of type "EmptyType").
 -            if ((cfm.isCompactTable() && !cfm.isSuper()) && updates.isEmpty())
 +            // For compact table, we don't accept an insert/update that only 
sets the PK unless the is no
 +            // declared non-PK columns (which we recognize because in that 
case
 +            // the compact value is of type "EmptyType").
 +            if (metadata().isCompactTable() && updates.isEmpty())
              {
 -                checkTrue(CompactTables.hasEmptyCompactValue(cfm),
 -                          "Column %s is mandatory for this COMPACT STORAGE 
table",
 -                          cfm.compactValueColumn().name);
 +                TableMetadata.CompactTableMetadata metadata = 
(TableMetadata.CompactTableMetadata) metadata();
 +                RequestValidations.checkTrue(metadata.hasEmptyCompactValue(),
 +                                             "Column %s is mandatory for this 
COMPACT STORAGE table",
 +                                             metadata.compactValueColumn);
  
 -                updates = Collections.<Operation>singletonList(new 
Constants.Setter(cfm.compactValueColumn(), EMPTY));
 +                updates = Collections.singletonList(new 
Constants.Setter(metadata.compactValueColumn, EMPTY));
              }
  
-             for (Operation op : updates)
-                 op.execute(updateBuilder.partitionKey(), params);
+             for (int i = 0, isize = updates.size(); i < isize; i++)
 -                updates.get(i).execute(update.partitionKey(), params);
++                updates.get(i).execute(updateBuilder.partitionKey(), params);
  
 -            update.add(params.buildRow());
 +            updateBuilder.add(params.buildRow());
          }
  
          if (updatesStaticRow())
          {
              params.newRow(Clustering.STATIC_CLUSTERING);
-             for (Operation op : getStaticOperations())
-                 op.execute(updateBuilder.partitionKey(), params);
+             List<Operation> staticOps = getStaticOperations();
+             for (int i = 0, isize = staticOps.size(); i < isize; i++)
 -                staticOps.get(i).execute(update.partitionKey(), params);
 -            update.add(params.buildRow());
++                staticOps.get(i).execute(updateBuilder.partitionKey(), 
params);
 +            updateBuilder.add(params.buildRow());
          }
      }
  
diff --cc src/java/org/apache/cassandra/db/Mutation.java
index 0b64620,7f19073..8a1ffc1
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@@ -65,42 -75,59 +65,51 @@@ public class Mutation implements IMutat
  
      public Mutation(PartitionUpdate update)
      {
-         this(update.metadata().keyspace, update.partitionKey(), 
ImmutableMap.of(update.metadata().id, update), approxTime.now());
 -        this(update.metadata().ksName, update.partitionKey(), 
Collections.singletonMap(update.metadata().cfId, update));
++        this(update.metadata().keyspace, update.partitionKey(), 
ImmutableMap.of(update.metadata().id, update), approxTime.now(), 
update.metadata().params.cdc);
      }
  
 -    protected Mutation(String keyspaceName, DecoratedKey key, Map<UUID, 
PartitionUpdate> modifications)
 +    public Mutation(String keyspaceName, DecoratedKey key, 
ImmutableMap<TableId, PartitionUpdate> modifications, long approxCreatedAtNanos)
      {
 -        this(keyspaceName, key, modifications, System.currentTimeMillis());
++        this(keyspaceName, key, modifications, approxCreatedAtNanos, 
cdcEnabled(modifications.values()));
+     }
+ 
 -    private Mutation(String keyspaceName, DecoratedKey key, Map<UUID, 
PartitionUpdate> modifications, long createdAt)
 -    {
 -        this(keyspaceName, key, modifications, createdAt, 
cdcEnabled(modifications));
 -    }
 -
 -    private Mutation(String keyspaceName, DecoratedKey key, Map<UUID, 
PartitionUpdate> modifications, long createdAt, boolean cdcEnabled)
++    public Mutation(String keyspaceName, DecoratedKey key, 
ImmutableMap<TableId, PartitionUpdate> modifications, long 
approxCreatedAtNanos, boolean cdcEnabled)
+     {
          this.keyspaceName = keyspaceName;
          this.key = key;
          this.modifications = modifications;
+         this.cdcEnabled = cdcEnabled;
 -        this.createdAt = createdAt;
 -    }
 -
 -    private static boolean cdcEnabled(Map<UUID, PartitionUpdate> 
modifications)
 -    {
 -        boolean cdcEnabled = false;
 -        for (PartitionUpdate pu : modifications.values())
 -            cdcEnabled |= pu.metadata().params.cdc;
 -        return cdcEnabled;
++        this.approxCreatedAtNanos = approxCreatedAtNanos;
+     }
  
 -    public Mutation copy()
++    private static boolean cdcEnabled(Iterable<PartitionUpdate> modifications)
+     {
 -        return new Mutation(keyspaceName, key, new HashMap<>(modifications));
 +        boolean cdc = false;
-         for (PartitionUpdate pu : modifications.values())
++        for (PartitionUpdate pu : modifications)
 +            cdc |= pu.metadata().params.cdc;
-         this.cdcEnabled = cdc;
-         this.approxCreatedAtNanos = approxCreatedAtNanos;
++        return cdc;
      }
  
 -    public Mutation without(Set<UUID> cfIds)
 +    public Mutation without(Set<TableId> tableIds)
      {
 -        if (cfIds.isEmpty())
 +        if (tableIds.isEmpty())
              return this;
  
 -        Mutation copy = copy();
 -        copy.modifications.keySet().removeAll(cfIds);
 -
 -        copy.cdcEnabled = false;
 -        for (PartitionUpdate pu : modifications.values())
 -            copy.cdcEnabled |= pu.metadata().params.cdc;
 +        ImmutableMap.Builder<TableId, PartitionUpdate> builder = new 
ImmutableMap.Builder<>();
 +        for (Map.Entry<TableId, PartitionUpdate> update : 
modifications.entrySet())
 +        {
 +            if (!tableIds.contains(update.getKey()))
 +            {
 +                builder.put(update);
 +            }
 +        }
  
 -        return copy;
 +        return new Mutation(keyspaceName, key, builder.build(), 
approxCreatedAtNanos);
      }
  
 -    public Mutation without(UUID cfId)
 +    public Mutation without(TableId tableId)
      {
 -        return without(Collections.singleton(cfId));
 +        return without(Collections.singleton(tableId));
      }
  
      public String getKeyspaceName()
diff --cc 
test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
index 0000000,2a4e1fb..8638006
mode 000000,100644..100644
--- 
a/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
+++ 
b/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
@@@ -1,0 -1,147 +1,149 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.test.microbench;
+ 
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.List;
+ import java.util.concurrent.TimeUnit;
+ 
+ import com.google.common.collect.Lists;
+ 
 -import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.Schema;
+ import org.apache.cassandra.cql3.Attributes;
+ import org.apache.cassandra.cql3.BatchQueryOptions;
++import org.apache.cassandra.cql3.QueryHandler;
+ import org.apache.cassandra.cql3.QueryOptions;
+ import org.apache.cassandra.cql3.QueryProcessor;
++import org.apache.cassandra.cql3.VariableSpecifications;
+ import org.apache.cassandra.cql3.statements.BatchStatement;
+ import org.apache.cassandra.cql3.statements.ModificationStatement;
 -import org.apache.cassandra.cql3.statements.ParsedStatement;
++import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
+ import org.apache.cassandra.schema.KeyspaceMetadata;
+ import org.apache.cassandra.schema.KeyspaceParams;
++import org.apache.cassandra.schema.Schema;
++import org.apache.cassandra.schema.TableMetadata;
+ import org.apache.cassandra.service.QueryState;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.openjdk.jmh.annotations.Benchmark;
+ import org.openjdk.jmh.annotations.BenchmarkMode;
+ import org.openjdk.jmh.annotations.Fork;
+ import org.openjdk.jmh.annotations.Measurement;
+ import org.openjdk.jmh.annotations.Mode;
+ import org.openjdk.jmh.annotations.OutputTimeUnit;
+ import org.openjdk.jmh.annotations.Param;
+ import org.openjdk.jmh.annotations.Scope;
+ import org.openjdk.jmh.annotations.Setup;
+ import org.openjdk.jmh.annotations.State;
+ import org.openjdk.jmh.annotations.Threads;
+ import org.openjdk.jmh.annotations.Warmup;
+ import org.openjdk.jmh.profile.GCProfiler;
+ import org.openjdk.jmh.results.Result;
+ import org.openjdk.jmh.results.RunResult;
+ import org.openjdk.jmh.runner.Runner;
+ import org.openjdk.jmh.runner.options.Options;
+ import org.openjdk.jmh.runner.options.OptionsBuilder;
+ 
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+ 
+ 
+ @BenchmarkMode(Mode.Throughput)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+ @Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS)
+ @Fork(value = 1,jvmArgsAppend = "-Xmx512M")
+ @Threads(1)
+ @State(Scope.Benchmark)
+ public class BatchStatementBench
+ {
+     static
+     {
 -        DatabaseDescriptor.clientInitialization();
++
++        DatabaseDescriptor.toolInitialization();
+         // Partitioner is not set in client mode.
+         if (DatabaseDescriptor.getPartitioner() == null)
+             
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+     }
+ 
+     static String keyspace = "keyspace1";
+     String table = "tbl";
+ 
+     int nowInSec = FBUtilities.nowInSeconds();
+     long queryStartTime = System.nanoTime();
+     BatchStatement bs;
+     BatchQueryOptions bqo;
+ 
+     @Param({"true", "false"})
+     boolean uniquePartition;
+ 
+     @Param({"10000"})
+     int batchSize;
+ 
+     @Setup
+     public void setup() throws Throwable
+     {
+         Schema.instance.load(KeyspaceMetadata.create(keyspace, 
KeyspaceParams.simple(1)));
 -        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
 -        CFMetaData metadata = CFMetaData.compile(String.format("CREATE TABLE 
%s (id int, ck int, v int, primary key (id, ck))", table), keyspace);
++        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(keyspace);
++        TableMetadata metadata = 
CreateTableStatement.parse(String.format("CREATE TABLE %s (id int, ck int, v 
int, primary key (id, ck))", table), keyspace).build();
+ 
 -        Schema.instance.load(metadata);
 -        
Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(metadata)));
++        Schema.instance.load(ksm.withSwapped(ksm.tables.with(metadata)));
+ 
+         List<ModificationStatement> modifications = new 
ArrayList<>(batchSize);
+         List<List<ByteBuffer>> parameters = new ArrayList<>(batchSize);
+         List<Object> queryOrIdList = new ArrayList<>(batchSize);
 -        ParsedStatement.Prepared prepared = 
QueryProcessor.parseStatement(String.format("INSERT INTO %s.%s (id, ck, v) 
VALUES (?,?,?)", keyspace, table), QueryState.forInternalCalls());
++        QueryHandler.Prepared prepared = 
QueryProcessor.prepareInternal(String.format("INSERT INTO %s.%s (id, ck, v) 
VALUES (?,?,?)", keyspace, table));
+ 
+         for (int i = 0; i < batchSize; i++)
+         {
+             modifications.add((ModificationStatement) prepared.statement);
+             parameters.add(Lists.newArrayList(bytes(uniquePartition ? i : 1), 
bytes(i), bytes(i)));
+             queryOrIdList.add(prepared.rawCQLStatement);
+         }
 -        bs = new BatchStatement(3, BatchStatement.Type.UNLOGGED, 
modifications, Attributes.none());
++        bs = new BatchStatement(BatchStatement.Type.UNLOGGED, 
VariableSpecifications.empty(), modifications, Attributes.none());
+         bqo = 
BatchQueryOptions.withPerStatementVariables(QueryOptions.DEFAULT, parameters, 
queryOrIdList);
+     }
+ 
+     @Benchmark
+     public void bench()
+     {
 -        bs.getMutations(bqo, false, nowInSec, queryStartTime);
++        bs.getMutations(bqo, false, nowInSec, nowInSec, queryStartTime);
+     }
+ 
+ 
+     public static void main(String... args) throws Exception {
+         Options opts = new OptionsBuilder()
+                        
.include(".*"+BatchStatementBench.class.getSimpleName()+".*")
+                        .jvmArgs("-server")
+                        .forks(1)
+                        .mode(Mode.Throughput)
+                        .addProfiler(GCProfiler.class)
+                        .build();
+ 
+         Collection<RunResult> records = new Runner(opts).run();
+         for ( RunResult result : records) {
+             Result r = result.getPrimaryResult();
+             System.out.println("API replied benchmark score: "
+                                + r.getScore() + " "
+                                + r.getScoreUnit() + " over "
+                                + r.getStatistics().getN() + " iterations");
+         }
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to