Make PartitionUpdate and Mutation immutable Patch by marcuse; reviewed by Aleksey Yeschenko for CASSANDRA-13867
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de7c24b3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de7c24b3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de7c24b3 Branch: refs/heads/trunk Commit: de7c24b395265ff619c622ed6be8d88453f158ac Parents: 45c7c45 Author: Marcus Eriksson <marc...@apache.org> Authored: Wed Sep 13 15:57:50 2017 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Jan 9 08:10:23 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cql3/statements/BatchStatement.java | 4 +- .../cql3/statements/BatchUpdatesCollector.java | 231 ++++++++++ .../cql3/statements/CQL3CasRequest.java | 26 +- .../cql3/statements/DeleteStatement.java | 18 +- .../cql3/statements/ModificationStatement.java | 33 +- .../statements/SingleTableUpdatesCollector.java | 101 +++++ .../cql3/statements/UpdateStatement.java | 12 +- .../cql3/statements/UpdatesCollector.java | 127 +----- .../apache/cassandra/db/CounterMutation.java | 7 +- src/java/org/apache/cassandra/db/IMutation.java | 6 + src/java/org/apache/cassandra/db/Mutation.java | 137 +++--- .../org/apache/cassandra/db/SimpleBuilders.java | 25 +- .../org/apache/cassandra/db/SystemKeyspace.java | 9 +- .../db/commitlog/CommitLogReplayer.java | 20 +- .../db/partitions/PartitionUpdate.java | 448 +++++++++---------- .../apache/cassandra/db/view/TableViews.java | 16 +- .../cassandra/db/view/ViewUpdateGenerator.java | 20 +- .../io/sstable/AbstractSSTableSimpleWriter.java | 4 +- .../io/sstable/SSTableSimpleUnsortedWriter.java | 22 +- .../io/sstable/SSTableSimpleWriter.java | 10 +- .../apache/cassandra/schema/SchemaKeyspace.java | 14 +- .../apache/cassandra/service/DataResolver.java | 8 +- .../apache/cassandra/service/paxos/Commit.java | 4 +- .../cassandra/db/ColumnFamilyStoreTest.java | 4 +- .../cassandra/db/CounterMutationTest.java | 8 +- .../cassandra/db/DeletePartitionTest.java | 3 +- test/unit/org/apache/cassandra/db/RowTest.java | 8 +- .../apache/cassandra/db/RowUpdateBuilder.java | 28 +- .../db/commitlog/CommitLogReaderTest.java | 4 +- .../db/compaction/CompactionsPurgeTest.java | 11 +- .../db/partition/PartitionUpdateTest.java | 15 + .../rows/RowAndDeletionMergeIteratorTest.java | 6 +- .../cassandra/index/sasi/SASIIndexTest.java | 142 +++--- .../cassandra/net/WriteCallbackInfoTest.java | 6 +- .../cassandra/triggers/TriggerExecutorTest.java | 18 +- 36 files changed, 912 insertions(+), 644 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ba0f842..623bf8a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Make PartitionUpdate and Mutation immutable (CASSANDRA-13867) * Fix CommitLogReplayer exception for CDC data (CASSANDRA-14066) * Fix cassandra-stress startup failure (CASSANDRA-14106) * Remove initialDirectories from CFS (CASSANDRA-13928) http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 7497f47..b54e3a0 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -224,7 +224,7 @@ public class BatchStatement implements CQLStatement throws RequestExecutionException, RequestValidationException { Set<String> tablesWithZeroGcGs = null; - UpdatesCollector collector = new UpdatesCollector(updatedColumns, updatedRows()); + BatchUpdatesCollector collector = new BatchUpdatesCollector(updatedColumns, updatedRows()); for (int i = 0; i < statements.size(); i++) { ModificationStatement statement = statements.get(i); @@ -247,8 +247,6 @@ public class BatchStatement implements CQLStatement ClientWarn.instance.warn(MessageFormatter.arrayFormat(LOGGED_BATCH_LOW_GCGS_WARNING, new Object[] { suffix, tablesWithZeroGcGs }) .getMessage()); } - - collector.validateIndexedColumns(); return collector.toMutations(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java new file mode 100644 index 0000000..9671b02 --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableMap; + +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.PartitionUpdate; + +/** + * Utility class to collect updates. + * + * <p>In a batch statement we don't want to recreate mutations every time as this is particularly inefficient when + * applying multiple batch to the same partition (see #6737). </p> + * + */ +final class BatchUpdatesCollector implements UpdatesCollector +{ + /** + * The columns that will be updated for each table (keyed by the table ID). + */ + private final Map<TableId, RegularAndStaticColumns> updatedColumns; + + /** + * The estimated number of updated row. + */ + private final int updatedRows; + + /** + * The mutations per keyspace. + */ + private final Map<String, Map<ByteBuffer, IMutationBuilder>> mutationBuilders = new HashMap<>(); + + BatchUpdatesCollector(Map<TableId, RegularAndStaticColumns> updatedColumns, int updatedRows) + { + super(); + this.updatedColumns = updatedColumns; + this.updatedRows = updatedRows; + } + + /** + * Gets the <code>PartitionUpdate.Builder</code> for the specified column family and key. If the builder does not + * exist it will be created. + * + * @param metadata the column family meta data + * @param dk the partition key + * @param consistency the consistency level + * @return the <code>PartitionUpdate.Builder</code> for the specified column family and key + */ + public PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency) + { + IMutationBuilder mut = getMutationBuilder(metadata, dk, consistency); + PartitionUpdate.Builder upd = mut.get(metadata.id); + if (upd == null) + { + RegularAndStaticColumns columns = updatedColumns.get(metadata.id); + assert columns != null; + upd = new PartitionUpdate.Builder(metadata, dk, columns, updatedRows); + mut.add(upd); + } + return upd; + } + + private IMutationBuilder getMutationBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency) + { + String ksName = metadata.keyspace; + IMutationBuilder mutationBuilder = keyspaceMap(ksName).get(dk.getKey()); + if (mutationBuilder == null) + { + MutationBuilder builder = new MutationBuilder(ksName, dk); + mutationBuilder = metadata.isCounter() ? new CounterMutationBuilder(builder, consistency) : builder; + keyspaceMap(ksName).put(dk.getKey(), mutationBuilder); + } + return mutationBuilder; + } + + /** + * Returns a collection containing all the mutations. + * @return a collection containing all the mutations. + */ + public Collection<IMutation> toMutations() + { + //TODO: The case where all statement where on the same keyspace is pretty common, optimize for that? + List<IMutation> ms = new ArrayList<>(); + for (Map<ByteBuffer, IMutationBuilder> ksMap : mutationBuilders.values()) + { + for (IMutationBuilder builder : ksMap.values()) + { + IMutation mutation = builder.build(); + mutation.validateIndexedColumns(); + ms.add(mutation); + } + } + return ms; + } + + /** + * Returns the key-mutation mappings for the specified keyspace. + * + * @param ksName the keyspace name + * @return the key-mutation mappings for the specified keyspace. + */ + private Map<ByteBuffer, IMutationBuilder> keyspaceMap(String ksName) + { + return mutationBuilders.computeIfAbsent(ksName, k -> new HashMap<>()); + } + + private interface IMutationBuilder + { + /** + * Add a new PartitionUpdate builder to this mutation builder + * @param builder the builder to add + * @return this + */ + IMutationBuilder add(PartitionUpdate.Builder builder); + + /** + * Build the immutable mutation + */ + IMutation build(); + + /** + * Get the builder for the given tableId + */ + PartitionUpdate.Builder get(TableId tableId); + } + + private static class MutationBuilder implements IMutationBuilder + { + private final HashMap<TableId, PartitionUpdate.Builder> modifications = new HashMap<>(); + private final DecoratedKey key; + private final String keyspaceName; + private final long createdAt = System.currentTimeMillis(); + + private MutationBuilder(String keyspaceName, DecoratedKey key) + { + this.keyspaceName = keyspaceName; + this.key = key; + } + + public MutationBuilder add(PartitionUpdate.Builder updateBuilder) + { + assert updateBuilder != null; + assert updateBuilder.partitionKey().getPartitioner() == key.getPartitioner(); + PartitionUpdate.Builder prev = modifications.put(updateBuilder.metadata().id, updateBuilder); + if (prev != null) + // developer error + throw new IllegalArgumentException("Table " + updateBuilder.metadata().name + " already has modifications in this mutation: " + prev); + return this; + } + + public Mutation build() + { + ImmutableMap.Builder<TableId, PartitionUpdate> updates = new ImmutableMap.Builder<>(); + for (Map.Entry<TableId, PartitionUpdate.Builder> updateEntry : modifications.entrySet()) + { + PartitionUpdate update = updateEntry.getValue().build(); + updates.put(updateEntry.getKey(), update); + } + return new Mutation(keyspaceName, key, updates.build(), createdAt); + } + + public PartitionUpdate.Builder get(TableId tableId) + { + return modifications.get(tableId); + } + + public DecoratedKey key() + { + return key; + } + + public boolean isEmpty() + { + return modifications.isEmpty(); + } + + public String getKeyspaceName() + { + return keyspaceName; + } + } + + private static class CounterMutationBuilder implements IMutationBuilder + { + private final MutationBuilder mutationBuilder; + private final ConsistencyLevel cl; + + private CounterMutationBuilder(MutationBuilder mutationBuilder, ConsistencyLevel cl) + { + this.mutationBuilder = mutationBuilder; + this.cl = cl; + } + + public IMutationBuilder add(PartitionUpdate.Builder builder) + { + return mutationBuilder.add(builder); + } + + public IMutation build() + { + return new CounterMutation(mutationBuilder.build(), cl); + } + + public PartitionUpdate.Builder get(TableId id) + { + return mutationBuilder.get(id); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java index ec11df3..8619945 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java +++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java @@ -235,14 +235,16 @@ public class CQL3CasRequest implements CASRequest public PartitionUpdate makeUpdates(FilteredPartition current) throws InvalidRequestException { - PartitionUpdate update = new PartitionUpdate(metadata, key, updatedColumns(), conditions.size()); + PartitionUpdate.Builder updateBuilder = new PartitionUpdate.Builder(metadata, key, updatedColumns(), conditions.size()); for (RowUpdate upd : updates) - upd.applyUpdates(current, update); + upd.applyUpdates(current, updateBuilder); for (RangeDeletion upd : rangeDeletions) - upd.applyUpdates(current, update); + upd.applyUpdates(current, updateBuilder); - Keyspace.openAndGetStore(metadata).indexManager.validate(update); - return update; + PartitionUpdate partitionUpdate = updateBuilder.build(); + Keyspace.openAndGetStore(metadata).indexManager.validate(partitionUpdate); + + return partitionUpdate; } /** @@ -266,11 +268,11 @@ public class CQL3CasRequest implements CASRequest this.timestamp = timestamp; } - public void applyUpdates(FilteredPartition current, PartitionUpdate updates) throws InvalidRequestException + public void applyUpdates(FilteredPartition current, PartitionUpdate.Builder updateBuilder) throws InvalidRequestException { - Map<DecoratedKey, Partition> map = stmt.requiresRead() ? Collections.<DecoratedKey, Partition>singletonMap(key, current) : null; - UpdateParameters params = new UpdateParameters(metadata, updates.columns(), options, timestamp, stmt.getTimeToLive(options), map); - stmt.addUpdateForKey(updates, clustering, params); + Map<DecoratedKey, Partition> map = stmt.requiresRead() ? Collections.singletonMap(key, current) : null; + UpdateParameters params = new UpdateParameters(metadata, updateBuilder.columns(), options, timestamp, stmt.getTimeToLive(options), map); + stmt.addUpdateForKey(updateBuilder, clustering, params); } } @@ -289,12 +291,12 @@ public class CQL3CasRequest implements CASRequest this.timestamp = timestamp; } - public void applyUpdates(FilteredPartition current, PartitionUpdate updates) throws InvalidRequestException + public void applyUpdates(FilteredPartition current, PartitionUpdate.Builder updateBuilder) throws InvalidRequestException { // No slice statements currently require a read, but this maintains consistency with RowUpdate, and future proofs us Map<DecoratedKey, Partition> map = stmt.requiresRead() ? Collections.<DecoratedKey, Partition>singletonMap(key, current) : null; - UpdateParameters params = new UpdateParameters(metadata, updates.columns(), options, timestamp, stmt.getTimeToLive(options), map); - stmt.addUpdateForKey(updates, slice, params); + UpdateParameters params = new UpdateParameters(metadata, updateBuilder.columns(), options, timestamp, stmt.getTimeToLive(options), map); + stmt.addUpdateForKey(updateBuilder, slice, params); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java index e880bf8..8f3349e 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java @@ -50,7 +50,7 @@ public class DeleteStatement extends ModificationStatement } @Override - public void addUpdateForKey(PartitionUpdate update, Clustering clustering, UpdateParameters params) + public void addUpdateForKey(PartitionUpdate.Builder updateBuilder, Clustering clustering, UpdateParameters params) throws InvalidRequestException { TableMetadata metadata = metadata(); @@ -63,19 +63,19 @@ public class DeleteStatement extends ModificationStatement // We're not deleting any specific columns so it's either a full partition deletion .... if (clustering.size() == 0) { - update.addPartitionDeletion(params.deletionTime()); + updateBuilder.addPartitionDeletion(params.deletionTime()); } // ... or a row deletion ... else if (clustering.size() == metadata.clusteringColumns().size()) { params.newRow(clustering); params.addRowDeletion(); - update.add(params.buildRow()); + updateBuilder.add(params.buildRow()); } // ... or a range of rows deletion. else { - update.add(params.makeRangeTombstone(metadata.comparator, clustering)); + updateBuilder.add(params.makeRangeTombstone(metadata.comparator, clustering)); } } else @@ -91,22 +91,22 @@ public class DeleteStatement extends ModificationStatement params.newRow(clustering); for (Operation op : regularDeletions) - op.execute(update.partitionKey(), params); - update.add(params.buildRow()); + op.execute(updateBuilder.partitionKey(), params); + updateBuilder.add(params.buildRow()); } if (!staticDeletions.isEmpty()) { params.newRow(Clustering.STATIC_CLUSTERING); for (Operation op : staticDeletions) - op.execute(update.partitionKey(), params); - update.add(params.buildRow()); + op.execute(updateBuilder.partitionKey(), params); + updateBuilder.add(params.buildRow()); } } } @Override - public void addUpdateForKey(PartitionUpdate update, Slice slice, UpdateParameters params) + public void addUpdateForKey(PartitionUpdate.Builder update, Slice slice, UpdateParameters params) { List<Operation> regularDeletions = getRegularOperations(); List<Operation> staticDeletions = getStaticOperations(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index decb99f..f0cfd0d 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -174,9 +174,9 @@ public abstract class ModificationStatement implements CQLStatement return restrictions; } - public abstract void addUpdateForKey(PartitionUpdate update, Clustering clustering, UpdateParameters params); + public abstract void addUpdateForKey(PartitionUpdate.Builder updateBuilder, Clustering clustering, UpdateParameters params); - public abstract void addUpdateForKey(PartitionUpdate update, Slice slice, UpdateParameters params); + public abstract void addUpdateForKey(PartitionUpdate.Builder updateBuilder, Slice slice, UpdateParameters params); public int getBoundTerms() { @@ -643,10 +643,8 @@ public abstract class ModificationStatement implements CQLStatement */ private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now, long queryStartNanoTime) { - UpdatesCollector collector = new UpdatesCollector(Collections.singletonMap(metadata.id, updatedColumns), 1); + UpdatesCollector collector = new SingleTableUpdatesCollector(metadata, updatedColumns, 1); addUpdates(collector, options, local, now, queryStartNanoTime); - collector.validateIndexedColumns(); - return collector.toMutations(); } @@ -678,10 +676,10 @@ public abstract class ModificationStatement implements CQLStatement Validation.validateKey(metadata(), key); DecoratedKey dk = metadata().partitioner.decorateKey(key); - PartitionUpdate upd = collector.getPartitionUpdate(metadata(), dk, options.getConsistency()); + PartitionUpdate.Builder updateBuilder = collector.getPartitionUpdateBuilder(metadata(), dk, options.getConsistency()); for (Slice slice : slices) - addUpdateForKey(upd, slice, params); + addUpdateForKey(updateBuilder, slice, params); } } else @@ -699,25 +697,24 @@ public abstract class ModificationStatement implements CQLStatement Validation.validateKey(metadata(), key); DecoratedKey dk = metadata().partitioner.decorateKey(key); - PartitionUpdate upd = collector.getPartitionUpdate(metadata, dk, options.getConsistency()); + PartitionUpdate.Builder updateBuilder = collector.getPartitionUpdateBuilder(metadata(), dk, options.getConsistency()); if (!restrictions.hasClusteringColumnsRestrictions()) { - addUpdateForKey(upd, Clustering.EMPTY, params); + addUpdateForKey(updateBuilder, Clustering.EMPTY, params); } else { for (Clustering clustering : clusterings) { - for (ByteBuffer c : clustering.getRawValues()) - { - if (c != null && c.remaining() > FBUtilities.MAX_UNSIGNED_SHORT) - throw new InvalidRequestException(String.format("Key length of %d is longer than maximum of %d", - clustering.dataSize(), - FBUtilities.MAX_UNSIGNED_SHORT)); - } - - addUpdateForKey(upd, clustering, params); + for (ByteBuffer c : clustering.getRawValues()) + { + if (c != null && c.remaining() > FBUtilities.MAX_UNSIGNED_SHORT) + throw new InvalidRequestException(String.format("Key length of %d is longer than maximum of %d", + clustering.dataSize(), + FBUtilities.MAX_UNSIGNED_SHORT)); + } + addUpdateForKey(updateBuilder, clustering, params); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java new file mode 100644 index 0000000..9eaf897 --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.CounterMutation; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.schema.TableMetadata; + +/** + * Utility class to collect updates. + */ +final class SingleTableUpdatesCollector implements UpdatesCollector +{ + /** + * the table to be updated + */ + private final TableMetadata metadata; + + /** + * the columns to update + */ + private final RegularAndStaticColumns updatedColumns; + + /** + * The estimated number of updated row. + */ + private final int updatedRows; + + /** + * the partition update builders per key + */ + private final Map<ByteBuffer, PartitionUpdate.Builder> puBuilders = new HashMap<>(); + + /** + * if it is a counter table, we will set this + */ + private ConsistencyLevel counterConsistencyLevel = null; + + SingleTableUpdatesCollector(TableMetadata metadata, RegularAndStaticColumns updatedColumns, int updatedRows) + { + this.metadata = metadata; + this.updatedColumns = updatedColumns; + this.updatedRows = updatedRows; + } + + public PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency) + { + if (metadata.isCounter()) + counterConsistencyLevel = consistency; + return puBuilders.computeIfAbsent(dk.getKey(), (k) -> new PartitionUpdate.Builder(metadata, dk, updatedColumns, updatedRows)); + } + + /** + * Returns a collection containing all the mutations. + * @return a collection containing all the mutations. + */ + public Collection<IMutation> toMutations() + { + List<IMutation> ms = new ArrayList<>(); + for (PartitionUpdate.Builder builder : puBuilders.values()) + { + IMutation mutation = null; + + if (metadata.isCounter()) + mutation = new CounterMutation(new Mutation(builder.build()), counterConsistencyLevel); + else + mutation = new Mutation(builder.build()); + mutation.validateIndexedColumns(); + ms.add(mutation); + } + + return ms; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index 7a2a1ba..a373ba1 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@ -63,7 +63,7 @@ public class UpdateStatement extends ModificationStatement } @Override - public void addUpdateForKey(PartitionUpdate update, Clustering clustering, UpdateParameters params) + public void addUpdateForKey(PartitionUpdate.Builder updateBuilder, Clustering clustering, UpdateParameters params) { if (updatesRegularRows()) { @@ -90,22 +90,22 @@ public class UpdateStatement extends ModificationStatement } for (Operation op : updates) - op.execute(update.partitionKey(), params); + op.execute(updateBuilder.partitionKey(), params); - update.add(params.buildRow()); + updateBuilder.add(params.buildRow()); } if (updatesStaticRow()) { params.newRow(Clustering.STATIC_CLUSTERING); for (Operation op : getStaticOperations()) - op.execute(update.partitionKey(), params); - update.add(params.buildRow()); + op.execute(updateBuilder.partitionKey(), params); + updateBuilder.add(params.buildRow()); } } @Override - public void addUpdateForKey(PartitionUpdate update, Slice slice, UpdateParameters params) + public void addUpdateForKey(PartitionUpdate.Builder update, Slice slice, UpdateParameters params) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java index f16c619..30db7ca 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java @@ -15,127 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.cassandra.cql3.statements; -import java.nio.ByteBuffer; -import java.util.*; +import java.util.Collection; -import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.schema.TableMetadata; -/** - * Utility class to collect updates. - * - * <p>In a batch statement we don't want to recreate mutations every time as this is particularly inefficient when - * applying multiple batch to the same partition (see #6737). </p> - * - */ -final class UpdatesCollector +public interface UpdatesCollector { - /** - * The columns that will be updated for each table (keyed by the table ID). - */ - private final Map<TableId, RegularAndStaticColumns> updatedColumns; - - /** - * The estimated number of updated row. - */ - private final int updatedRows; - - /** - * The mutations per keyspace. - */ - private final Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>(); - - public UpdatesCollector(Map<TableId, RegularAndStaticColumns> updatedColumns, int updatedRows) - { - super(); - this.updatedColumns = updatedColumns; - this.updatedRows = updatedRows; - } - - /** - * Gets the <code>PartitionUpdate</code> for the specified column family and key. If the update does not - * exist it will be created. - * - * @param metadata the column family meta data - * @param dk the partition key - * @param consistency the consistency level - * @return the <code>PartitionUpdate</code> for the specified column family and key - */ - public PartitionUpdate getPartitionUpdate(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency) - { - Mutation mut = getMutation(metadata, dk, consistency); - PartitionUpdate upd = mut.get(metadata); - if (upd == null) - { - RegularAndStaticColumns columns = updatedColumns.get(metadata.id); - assert columns != null; - upd = new PartitionUpdate(metadata, dk, columns, updatedRows); - mut.add(upd); - } - return upd; - } - - /** - * Check all partition updates contain only valid values for any - * indexed columns. - */ - public void validateIndexedColumns() - { - for (Map<ByteBuffer, IMutation> perKsMutations : mutations.values()) - for (IMutation mutation : perKsMutations.values()) - for (PartitionUpdate update : mutation.getPartitionUpdates()) - Keyspace.openAndGetStore(update.metadata()).indexManager.validate(update); - } - - private Mutation getMutation(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency) - { - String ksName = metadata.keyspace; - IMutation mutation = keyspaceMap(ksName).get(dk.getKey()); - if (mutation == null) - { - Mutation mut = new Mutation(ksName, dk); - mutation = metadata.isCounter() ? new CounterMutation(mut, consistency) : mut; - keyspaceMap(ksName).put(dk.getKey(), mutation); - return mut; - } - return metadata.isCounter() ? ((CounterMutation) mutation).getMutation() : (Mutation) mutation; - } - - /** - * Returns a collection containing all the mutations. - * @return a collection containing all the mutations. - */ - public Collection<IMutation> toMutations() - { - // The case where all statement where on the same keyspace is pretty common - if (mutations.size() == 1) - return mutations.values().iterator().next().values(); - - List<IMutation> ms = new ArrayList<>(); - for (Map<ByteBuffer, IMutation> ksMap : mutations.values()) - ms.addAll(ksMap.values()); - - return ms; - } - - /** - * Returns the key-mutation mappings for the specified keyspace. - * - * @param ksName the keyspace name - * @return the key-mutation mappings for the specified keyspace. - */ - private Map<ByteBuffer, IMutation> keyspaceMap(String ksName) - { - Map<ByteBuffer, IMutation> ksMap = mutations.get(ksName); - if (ksMap == null) - { - ksMap = new HashMap<>(); - mutations.put(ksName, ksMap); - } - return ksMap; - } + PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency); + Collection<IMutation> toMutations(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/CounterMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java index 0f1ad06..d04ddd8 100644 --- a/src/java/org/apache/cassandra/db/CounterMutation.java +++ b/src/java/org/apache/cassandra/db/CounterMutation.java @@ -24,6 +24,7 @@ import java.util.concurrent.locks.Lock; import com.google.common.base.Function; import com.google.common.base.Objects; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; @@ -112,7 +113,7 @@ public class CounterMutation implements IMutation */ public Mutation applyCounterMutation() throws WriteTimeoutException { - Mutation result = new Mutation(getKeyspaceName(), key()); + Mutation.PartitionUpdateCollector resultBuilder = new Mutation.PartitionUpdateCollector(getKeyspaceName(), key()); Keyspace keyspace = Keyspace.open(getKeyspaceName()); List<Lock> locks = new ArrayList<>(); @@ -121,7 +122,9 @@ public class CounterMutation implements IMutation { grabCounterLocks(keyspace, locks); for (PartitionUpdate upd : getPartitionUpdates()) - result.add(processModifications(upd)); + resultBuilder.add(processModifications(upd)); + + Mutation result = resultBuilder.build(); result.apply(); return result; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/IMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java index 3d4b1b2..9eaf19b 100644 --- a/src/java/org/apache/cassandra/db/IMutation.java +++ b/src/java/org/apache/cassandra/db/IMutation.java @@ -32,6 +32,12 @@ public interface IMutation public String toString(boolean shallow); public Collection<PartitionUpdate> getPartitionUpdates(); + public default void validateIndexedColumns() + { + for (PartitionUpdate pu : getPartitionUpdates()) + pu.validateIndexedColumns(); + } + /** * Computes the total data size of the specified mutations. * @param mutations the mutations http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index 062e1fe..a6a920c 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -22,6 +22,8 @@ import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.StringUtils; import org.apache.cassandra.config.DatabaseDescriptor; @@ -37,8 +39,6 @@ import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; -// TODO convert this to a Builder pattern instead of encouraging M.add directly, -// which is less-efficient since we have to keep a mutable HashMap around public class Mutation implements IMutation { public static final MutationSerializer serializer = new MutationSerializer(); @@ -52,37 +52,31 @@ public class Mutation implements IMutation private final DecoratedKey key; // map of column family id to mutations for that column family. - private final Map<TableId, PartitionUpdate> modifications; + private final ImmutableMap<TableId, PartitionUpdate> modifications; - // Time at which this mutation was instantiated - public final long createdAt = System.currentTimeMillis(); + // Time at which this mutation or the builder that built it was instantiated + final long createdAt; // keep track of when mutation has started waiting for a MV partition lock - public final AtomicLong viewLockAcquireStart = new AtomicLong(0); + final AtomicLong viewLockAcquireStart = new AtomicLong(0); - private boolean cdcEnabled = false; - - public Mutation(String keyspaceName, DecoratedKey key) - { - this(keyspaceName, key, new HashMap<>()); - } + private final boolean cdcEnabled; public Mutation(PartitionUpdate update) { - this(update.metadata().keyspace, update.partitionKey(), Collections.singletonMap(update.metadata().id, update)); + this(update.metadata().keyspace, update.partitionKey(), ImmutableMap.of(update.metadata().id, update), System.currentTimeMillis()); } - protected Mutation(String keyspaceName, DecoratedKey key, Map<TableId, PartitionUpdate> modifications) + public Mutation(String keyspaceName, DecoratedKey key, ImmutableMap<TableId, PartitionUpdate> modifications, long createdAt) { this.keyspaceName = keyspaceName; this.key = key; this.modifications = modifications; - for (PartitionUpdate pu : modifications.values()) - cdcEnabled |= pu.metadata().params.cdc; - } - public Mutation copy() - { - return new Mutation(keyspaceName, key, new HashMap<>(modifications)); + boolean cdc = false; + for (PartitionUpdate pu : modifications.values()) + cdc |= pu.metadata().params.cdc; + this.cdcEnabled = cdc; + this.createdAt = createdAt; } public Mutation without(Set<TableId> tableIds) @@ -90,15 +84,16 @@ public class Mutation implements IMutation if (tableIds.isEmpty()) return this; - Mutation copy = copy(); - - copy.modifications.keySet().removeAll(tableIds); - - copy.cdcEnabled = false; - for (PartitionUpdate pu : modifications.values()) - copy.cdcEnabled |= pu.metadata().params.cdc; + ImmutableMap.Builder<TableId, PartitionUpdate> builder = new ImmutableMap.Builder<>(); + for (Map.Entry<TableId, PartitionUpdate> update : modifications.entrySet()) + { + if (!tableIds.contains(update.getKey())) + { + builder.put(update); + } + } - return copy; + return new Mutation(keyspaceName, key, builder.build(), createdAt); } public Mutation without(TableId tableId) @@ -121,7 +116,7 @@ public class Mutation implements IMutation return key; } - public Collection<PartitionUpdate> getPartitionUpdates() + public ImmutableCollection<PartitionUpdate> getPartitionUpdates() { return modifications.values(); } @@ -131,33 +126,6 @@ public class Mutation implements IMutation return table == null ? null : modifications.get(table.id); } - /** - * Adds PartitionUpdate to the local set of modifications. - * Assumes no updates for the Table this PartitionUpdate impacts. - * - * @param update PartitionUpdate to append to Modifications list - * @return Mutation this mutation - * @throws IllegalArgumentException If PartitionUpdate for duplicate table is passed as argument - */ - public Mutation add(PartitionUpdate update) - { - assert update != null; - assert update.partitionKey().getPartitioner() == key.getPartitioner(); - - cdcEnabled |= update.metadata().params.cdc; - - PartitionUpdate prev = modifications.put(update.metadata().id, update); - if (prev != null) - // developer error - throw new IllegalArgumentException("Table " + update.metadata().name + " already has modifications in this mutation: " + prev); - return this; - } - - public PartitionUpdate get(TableMetadata metadata) - { - return modifications.get(metadata.id); - } - public boolean isEmpty() { return modifications.isEmpty(); @@ -196,7 +164,7 @@ public class Mutation implements IMutation } List<PartitionUpdate> updates = new ArrayList<>(mutations.size()); - Map<TableId, PartitionUpdate> modifications = new HashMap<>(updatedTables.size()); + ImmutableMap.Builder<TableId, PartitionUpdate> modifications = new ImmutableMap.Builder<>(); for (TableId table : updatedTables) { for (Mutation mutation : mutations) @@ -212,7 +180,7 @@ public class Mutation implements IMutation modifications.put(table, updates.size() == 1 ? updates.get(0) : PartitionUpdate.merge(updates)); updates.clear(); } - return new Mutation(ks, key, modifications); + return new Mutation(ks, key, modifications.build(), System.currentTimeMillis()); } public CompletableFuture<?> applyFuture() @@ -389,7 +357,7 @@ public class Mutation implements IMutation if (size == 1) return new Mutation(update); - Map<TableId, PartitionUpdate> modifications = new HashMap<>(size); + ImmutableMap.Builder<TableId, PartitionUpdate> modifications = new ImmutableMap.Builder<>(); DecoratedKey dk = update.partitionKey(); modifications.put(update.metadata().id, update); @@ -398,8 +366,7 @@ public class Mutation implements IMutation update = PartitionUpdate.serializer.deserialize(in, version, flag); modifications.put(update.metadata().id, update); } - - return new Mutation(update.metadata().keyspace, dk, modifications); + return new Mutation(update.metadata().keyspace, dk, modifications.build(), System.currentTimeMillis()); } public Mutation deserialize(DataInputPlus in, int version) throws IOException @@ -416,4 +383,52 @@ public class Mutation implements IMutation return size; } } + + /** + * Collects finalized partition updates + */ + public static class PartitionUpdateCollector + { + private final ImmutableMap.Builder<TableId, PartitionUpdate> modifications = new ImmutableMap.Builder<>(); + private final String keyspaceName; + private final DecoratedKey key; + private final long createdAt = System.currentTimeMillis(); + private boolean empty = true; + + public PartitionUpdateCollector(String keyspaceName, DecoratedKey key) + { + this.keyspaceName = keyspaceName; + this.key = key; + } + + public PartitionUpdateCollector add(PartitionUpdate partitionUpdate) + { + assert partitionUpdate != null; + assert partitionUpdate.partitionKey().getPartitioner() == key.getPartitioner(); + // note that ImmutableMap.Builder only allows put:ing the same key once, it will fail during build() below otherwise + modifications.put(partitionUpdate.metadata().id, partitionUpdate); + empty = false; + return this; + } + + public DecoratedKey key() + { + return key; + } + + public String getKeyspaceName() + { + return keyspaceName; + } + + public boolean isEmpty() + { + return empty; + } + + public Mutation build() + { + return new Mutation(keyspaceName, key, modifications.build(), createdAt); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/SimpleBuilders.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SimpleBuilders.java b/src/java/org/apache/cassandra/db/SimpleBuilders.java index a212834..3520d97 100644 --- a/src/java/org/apache/cassandra/db/SimpleBuilders.java +++ b/src/java/org/apache/cassandra/db/SimpleBuilders.java @@ -146,10 +146,10 @@ public abstract class SimpleBuilders if (updateBuilders.size() == 1) return new Mutation(updateBuilders.values().iterator().next().build()); - Mutation mutation = new Mutation(keyspaceName, key); + Mutation.PartitionUpdateCollector mutationBuilder = new Mutation.PartitionUpdateCollector(keyspaceName, key); for (PartitionUpdateBuilder builder : updateBuilders.values()) - mutation.add(builder.build()); - return mutation; + mutationBuilder.add(builder.build()); + return mutationBuilder.build(); } } @@ -159,6 +159,7 @@ public abstract class SimpleBuilders private final DecoratedKey key; private final Map<Clustering, RowBuilder> rowBuilders = new HashMap<>(); private List<RTBuilder> rangeBuilders = null; // We use that rarely, so create lazily + private List<RangeTombstone> rangeTombstones = null; private DeletionTime partitionDeletion = DeletionTime.LIVE; @@ -204,6 +205,14 @@ public abstract class SimpleBuilders return builder; } + public PartitionUpdate.SimpleBuilder addRangeTombstone(RangeTombstone rt) + { + if (rangeTombstones == null) + rangeTombstones = new ArrayList<>(); + rangeTombstones.add(rt); + return this; + } + public PartitionUpdate build() { // Collect all updated columns @@ -213,7 +222,7 @@ public abstract class SimpleBuilders // Note that rowBuilders.size() could include the static column so could be 1 off the really need capacity // of the final PartitionUpdate, but as that's just a sizing hint, we'll live. - PartitionUpdate update = new PartitionUpdate(metadata, key, columns.build(), rowBuilders.size()); + PartitionUpdate.Builder update = new PartitionUpdate.Builder(metadata, key, columns.build(), rowBuilders.size()); update.addPartitionDeletion(partitionDeletion); if (rangeBuilders != null) @@ -222,10 +231,16 @@ public abstract class SimpleBuilders update.add(builder.build()); } + if (rangeTombstones != null) + { + for (RangeTombstone rt : rangeTombstones) + update.add(rt); + } + for (RowBuilder builder : rowBuilders.values()) update.add(builder.build()); - return update; + return update.build(); } public Mutation buildAsMutation() http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 9da0f6b..4469384 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -1025,7 +1025,7 @@ public final class SystemKeyspace UntypedResultSet.Row row = results.one(); Commit promised = row.has("in_progress_ballot") - ? new Commit(row.getUUID("in_progress_ballot"), new PartitionUpdate(metadata, key, metadata.regularAndStaticColumns(), 1)) + ? new Commit(row.getUUID("in_progress_ballot"), new PartitionUpdate.Builder(metadata, key, metadata.regularAndStaticColumns(), 1).build()) : Commit.emptyCommit(key, metadata); // either we have both a recently accepted ballot and update or we have neither Commit accepted = row.has("proposal_version") && row.has("proposal") @@ -1135,9 +1135,7 @@ public final class SystemKeyspace public static void updateSizeEstimates(String keyspace, String table, Map<Range<Token>, Pair<Long, Long>> estimates) { long timestamp = FBUtilities.timestampMicros(); - PartitionUpdate update = new PartitionUpdate(SizeEstimates, UTF8Type.instance.decompose(keyspace), SizeEstimates.regularAndStaticColumns(), estimates.size()); - Mutation mutation = new Mutation(update); - + PartitionUpdate.Builder update = new PartitionUpdate.Builder(SizeEstimates, UTF8Type.instance.decompose(keyspace), SizeEstimates.regularAndStaticColumns(), estimates.size()); // delete all previous values with a single range tombstone. int nowInSec = FBUtilities.nowInSeconds(); update.add(new RangeTombstone(Slice.make(SizeEstimates.comparator, table), new DeletionTime(timestamp - 1, nowInSec))); @@ -1153,8 +1151,7 @@ public final class SystemKeyspace .add("mean_partition_size", values.right) .build()); } - - mutation.apply(); + new Mutation(update.build()).apply(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index ce185b6..2947222 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -26,7 +26,11 @@ import java.util.concurrent.atomic.AtomicInteger; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; -import com.google.common.collect.*; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; +import com.google.common.collect.Ordering; + import org.apache.commons.lang3.StringUtils; import org.cliffc.high_scale_lib.NonBlockingHashSet; import org.slf4j.Logger; @@ -249,7 +253,7 @@ public class CommitLogReplayer implements CommitLogReadHandler // b) have already been flushed, // or c) are part of a cf that was dropped. // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. - Mutation newMutation = null; + Mutation.PartitionUpdateCollector newPUCollector = null; for (PartitionUpdate update : commitLogReplayer.replayFilter.filter(mutation)) { if (Schema.instance.getTableMetadata(update.metadata().id) == null) @@ -259,17 +263,17 @@ public class CommitLogReplayer implements CommitLogReadHandler // if it is the last known segment, if we are after the commit log segment position if (commitLogReplayer.shouldReplay(update.metadata().id, new CommitLogPosition(segmentId, entryLocation))) { - if (newMutation == null) - newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key()); - newMutation.add(update); + if (newPUCollector == null) + newPUCollector = new Mutation.PartitionUpdateCollector(mutation.getKeyspaceName(), mutation.key()); + newPUCollector.add(update); commitLogReplayer.replayedCount.incrementAndGet(); } } - if (newMutation != null) + if (newPUCollector != null) { - assert !newMutation.isEmpty(); + assert !newPUCollector.isEmpty(); - Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false, true, false); + Keyspace.open(newPUCollector.getKeyspaceName()).apply(newPUCollector.build(), false, true, false); commitLogReplayer.keyspacesReplayed.add(keyspace); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index 7a0cefe..a549458 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db.partitions; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -60,37 +61,11 @@ public class PartitionUpdate extends AbstractBTreePartition public static final PartitionUpdateSerializer serializer = new PartitionUpdateSerializer(); - private final int createdAtInSec = FBUtilities.nowInSeconds(); - - // Records whether this update is "built", i.e. if the build() method has been called, which - // happens when the update is read. Further writing is then rejected though a manual call - // to allowNewUpdates() allow new writes. We could make that more implicit but only triggers - // really requires that so we keep it simple for now). - private volatile boolean isBuilt; - private boolean canReOpen = true; - - private Holder holder; - private BTree.Builder<Row> rowBuilder; - private MutableDeletionInfo deletionInfo; - - private final boolean canHaveShadowedData; - + private final Holder holder; + private final DeletionInfo deletionInfo; private final TableMetadata metadata; - private PartitionUpdate(TableMetadata metadata, - DecoratedKey key, - RegularAndStaticColumns columns, - MutableDeletionInfo deletionInfo, - int initialRowCapacity, - boolean canHaveShadowedData) - { - super(key); - this.metadata = metadata; - this.deletionInfo = deletionInfo; - this.holder = new Holder(columns, BTree.empty(), deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS); - this.canHaveShadowedData = canHaveShadowedData; - rowBuilder = builder(initialRowCapacity); - } + private final boolean canHaveShadowedData; private PartitionUpdate(TableMetadata metadata, DecoratedKey key, @@ -102,29 +77,9 @@ public class PartitionUpdate extends AbstractBTreePartition this.metadata = metadata; this.holder = holder; this.deletionInfo = deletionInfo; - this.isBuilt = true; this.canHaveShadowedData = canHaveShadowedData; } - public PartitionUpdate(TableMetadata metadata, - DecoratedKey key, - RegularAndStaticColumns columns, - int initialRowCapacity) - { - this(metadata, key, columns, MutableDeletionInfo.live(), initialRowCapacity, true); - } - - public PartitionUpdate(TableMetadata metadata, - ByteBuffer key, - RegularAndStaticColumns columns, - int initialRowCapacity) - { - this(metadata, - metadata.partitioner.decorateKey(key), - columns, - initialRowCapacity); - } - /** * Creates a empty immutable partition update. * @@ -329,30 +284,6 @@ public class PartitionUpdate extends AbstractBTreePartition } /** - * Modify this update to set every timestamp for live data to {@code newTimestamp} and - * every deletion timestamp to {@code newTimestamp - 1}. - * - * There is no reason to use that expect on the Paxos code path, where we need ensure that - * anything inserted use the ballot timestamp (to respect the order of update decided by - * the Paxos algorithm). We use {@code newTimestamp - 1} for deletions because tombstones - * always win on timestamp equality and we don't want to delete our own insertions - * (typically, when we overwrite a collection, we first set a complex deletion to delete the - * previous collection before adding new elements. If we were to set that complex deletion - * to the same timestamp that the new elements, it would delete those elements). And since - * tombstones always wins on timestamp equality, using -1 guarantees our deletion will still - * delete anything from a previous update. - */ - public void updateAllTimestamp(long newTimestamp) - { - Holder holder = holder(); - deletionInfo.updateAllTimestamp(newTimestamp - 1); - Object[] tree = BTree.<Row>transformAndFilter(holder.tree, (x) -> x.updateAllTimestamp(newTimestamp)); - Row staticRow = holder.staticRow.updateAllTimestamp(newTimestamp); - EncodingStats newStats = EncodingStats.Collector.collect(staticRow, BTree.<Row>iterator(tree), deletionInfo); - this.holder = new Holder(holder.columns, tree, deletionInfo, staticRow, newStats); - } - - /** * The number of "operations" contained in the update. * <p> * This is used by {@code Memtable} to approximate how much work this update does. In practice, this @@ -401,7 +332,6 @@ public class PartitionUpdate extends AbstractBTreePartition protected Holder holder() { - maybeBuild(); return holder; } @@ -411,50 +341,6 @@ public class PartitionUpdate extends AbstractBTreePartition } /** - * If a partition update has been read (and is thus unmodifiable), a call to this method - * makes the update modifiable again. - * <p> - * Please note that calling this method won't result in optimal behavior in the sense that - * even if very little is added to the update after this call, the whole update will be sorted - * again on read. This should thus be used sparingly (and if it turns that we end up using - * this often, we should consider optimizing the behavior). - */ - public synchronized void allowNewUpdates() - { - if (!canReOpen) - throw new IllegalStateException("You cannot do more updates on collectCounterMarks has been called"); - - // This is synchronized to make extra sure things work properly even if this is - // called concurrently with sort() (which should be avoided in the first place, but - // better safe than sorry). - isBuilt = false; - if (rowBuilder == null) - rowBuilder = builder(16); - } - - private BTree.Builder<Row> builder(int initialCapacity) - { - return BTree.<Row>builder(metadata().comparator, initialCapacity) - .setQuickResolver((a, b) -> - Rows.merge(a, b, createdAtInSec)); - } - - /** - * Returns an iterator that iterates over the rows of this update in clustering order. - * <p> - * Note that this might trigger a sorting of the update, and as such the update will not - * be modifiable anymore after this call. - * - * @return an iterator over the rows of this update. - */ - @Override - public Iterator<Row> iterator() - { - maybeBuild(); - return super.iterator(); - } - - /** * Validates the data contained in this update. * * @throws org.apache.cassandra.serializers.MarshalException if some of the data contained in this update is corrupted. @@ -476,8 +362,6 @@ public class PartitionUpdate extends AbstractBTreePartition */ public long maxTimestamp() { - maybeBuild(); - long maxTimestamp = deletionInfo.maxTimestamp(); for (Row row : this) { @@ -509,11 +393,8 @@ public class PartitionUpdate extends AbstractBTreePartition public List<CounterMark> collectCounterMarks() { assert metadata().isCounter(); - maybeBuild(); // We will take aliases on the rows of this update, and update them in-place. So we should be sure the // update is now immutable for all intent and purposes. - canReOpen = false; - List<CounterMark> marks = new ArrayList<>(); addMarksForRow(staticRow(), marks); for (Row row : this) @@ -521,7 +402,7 @@ public class PartitionUpdate extends AbstractBTreePartition return marks; } - private void addMarksForRow(Row row, List<CounterMark> marks) + private static void addMarksForRow(Row row, List<CounterMark> marks) { for (Cell cell : row.cells()) { @@ -530,109 +411,6 @@ public class PartitionUpdate extends AbstractBTreePartition } } - private void assertNotBuilt() - { - if (isBuilt) - throw new IllegalStateException("An update should not be written again once it has been read"); - } - - public void addPartitionDeletion(DeletionTime deletionTime) - { - assertNotBuilt(); - deletionInfo.add(deletionTime); - } - - public void add(RangeTombstone range) - { - assertNotBuilt(); - deletionInfo.add(range, metadata().comparator); - } - - /** - * Adds a row to this update. - * - * There is no particular assumption made on the order of row added to a partition update. It is further - * allowed to add the same row (more precisely, multiple row objects for the same clustering). - * - * Note however that the columns contained in the added row must be a subset of the columns used when - * creating this update. - * - * @param row the row to add. - */ - public void add(Row row) - { - if (row.isEmpty()) - return; - - assertNotBuilt(); - - if (row.isStatic()) - { - // this assert is expensive, and possibly of limited value; we should consider removing it - // or introducing a new class of assertions for test purposes - assert columns().statics.containsAll(row.columns()) : columns().statics + " is not superset of " + row.columns(); - Row staticRow = holder.staticRow.isEmpty() - ? row - : Rows.merge(holder.staticRow, row, createdAtInSec); - holder = new Holder(holder.columns, holder.tree, holder.deletionInfo, staticRow, holder.stats); - } - else - { - // this assert is expensive, and possibly of limited value; we should consider removing it - // or introducing a new class of assertions for test purposes - assert columns().regulars.containsAll(row.columns()) : columns().regulars + " is not superset of " + row.columns(); - rowBuilder.add(row); - } - } - - private void maybeBuild() - { - if (isBuilt) - return; - - build(); - } - - private synchronized void build() - { - if (isBuilt) - return; - - Holder holder = this.holder; - Object[] cur = holder.tree; - Object[] add = rowBuilder.build(); - Object[] merged = BTree.<Row>merge(cur, add, metadata().comparator, - UpdateFunction.Simple.of((a, b) -> Rows.merge(a, b, createdAtInSec))); - - assert deletionInfo == holder.deletionInfo; - EncodingStats newStats = EncodingStats.Collector.collect(holder.staticRow, BTree.<Row>iterator(merged), deletionInfo); - - this.holder = new Holder(holder.columns, merged, holder.deletionInfo, holder.staticRow, newStats); - rowBuilder = null; - isBuilt = true; - } - - @Override - public String toString() - { - if (isBuilt) - return super.toString(); - - // We intentionally override AbstractBTreePartition#toString() to avoid iterating over the rows in the - // partition, which can result in build() being triggered and lead to errors if the PartitionUpdate is later - // modified. - - StringBuilder sb = new StringBuilder(); - sb.append(String.format("[%s] key=%s columns=%s", - metadata.toString(), - metadata.partitionKeyType.getString(partitionKey().getKey()), - columns())); - - sb.append("\n deletionInfo=").append(deletionInfo); - sb.append(" (not built)"); - return sb.toString(); - } - /** * Creates a new simple partition update builder. * @@ -647,6 +425,11 @@ public class PartitionUpdate extends AbstractBTreePartition return new SimpleBuilders.PartitionUpdateBuilder(metadata, partitionKeyValues); } + public void validateIndexedColumns() + { + Keyspace.openAndGetStore(metadata()).indexManager.validate(this); + } + /** * Interface for building partition updates geared towards human. * <p> @@ -712,6 +495,13 @@ public class PartitionUpdate extends AbstractBTreePartition public RangeTombstoneBuilder addRangeTombstone(); /** + * Adds a new range tombstone to this update + * + * @return this builder + */ + public SimpleBuilder addRangeTombstone(RangeTombstone rt); + + /** * Build the update represented by this builder. * * @return the built update. @@ -892,4 +682,208 @@ public class PartitionUpdate extends AbstractBTreePartition ((BTreeRow)row).setValue(column, path, value); } } + + /** + * Builder for PartitionUpdates + * + * This class is not thread safe, but the PartitionUpdate it produces is (since it is immutable). + */ + public static class Builder + { + private final TableMetadata metadata; + private final DecoratedKey key; + private final MutableDeletionInfo deletionInfo; + private final boolean canHaveShadowedData; + private Object[] tree = BTree.empty(); + private final BTree.Builder<Row> rowBuilder; + private final int createdAtInSec = FBUtilities.nowInSeconds(); + private Row staticRow = Rows.EMPTY_STATIC_ROW; + private final RegularAndStaticColumns columns; + private boolean isBuilt = false; + + public Builder(TableMetadata metadata, + DecoratedKey key, + RegularAndStaticColumns columns, + int initialRowCapacity, + boolean canHaveShadowedData) + { + this(metadata, key, columns, initialRowCapacity, canHaveShadowedData, Rows.EMPTY_STATIC_ROW, MutableDeletionInfo.live(), BTree.empty()); + } + + private Builder(TableMetadata metadata, + DecoratedKey key, + RegularAndStaticColumns columns, + int initialRowCapacity, + boolean canHaveShadowedData, + Holder holder) + { + this(metadata, key, columns, initialRowCapacity, canHaveShadowedData, holder.staticRow, holder.deletionInfo, holder.tree); + } + + private Builder(TableMetadata metadata, + DecoratedKey key, + RegularAndStaticColumns columns, + int initialRowCapacity, + boolean canHaveShadowedData, + Row staticRow, + DeletionInfo deletionInfo, + Object[] tree) + { + this.metadata = metadata; + this.key = key; + this.columns = columns; + this.rowBuilder = rowBuilder(initialRowCapacity); + this.canHaveShadowedData = canHaveShadowedData; + this.deletionInfo = deletionInfo.mutableCopy(); + this.staticRow = staticRow; + this.tree = tree; + } + + public Builder(TableMetadata metadata, DecoratedKey key, RegularAndStaticColumns columnDefinitions, int size) + { + this(metadata, key, columnDefinitions, size, true); + } + + public Builder(PartitionUpdate base, int initialRowCapacity) + { + this(base.metadata, base.partitionKey, base.columns(), initialRowCapacity, base.canHaveShadowedData, base.holder); + } + + public Builder(TableMetadata metadata, + ByteBuffer key, + RegularAndStaticColumns columns, + int initialRowCapacity) + { + this(metadata, metadata.partitioner.decorateKey(key), columns, initialRowCapacity, true); + } + + /** + * Adds a row to this update. + * + * There is no particular assumption made on the order of row added to a partition update. It is further + * allowed to add the same row (more precisely, multiple row objects for the same clustering). + * + * Note however that the columns contained in the added row must be a subset of the columns used when + * creating this update. + * + * @param row the row to add. + */ + public void add(Row row) + { + if (row.isEmpty()) + return; + + if (row.isStatic()) + { + // this assert is expensive, and possibly of limited value; we should consider removing it + // or introducing a new class of assertions for test purposes + assert columns().statics.containsAll(row.columns()) : columns().statics + " is not superset of " + row.columns(); + staticRow = staticRow.isEmpty() + ? row + : Rows.merge(staticRow, row, createdAtInSec); + } + else + { + // this assert is expensive, and possibly of limited value; we should consider removing it + // or introducing a new class of assertions for test purposes + assert columns().regulars.containsAll(row.columns()) : columns().regulars + " is not superset of " + row.columns(); + rowBuilder.add(row); + } + } + + public void addPartitionDeletion(DeletionTime deletionTime) + { + deletionInfo.add(deletionTime); + } + + public void add(RangeTombstone range) + { + deletionInfo.add(range, metadata.comparator); + } + + public DecoratedKey partitionKey() + { + return key; + } + + public TableMetadata metadata() + { + return metadata; + } + + public PartitionUpdate build() + { + // assert that we are not calling build() several times + assert !isBuilt : "A PartitionUpdate.Builder should only get built once"; + Object[] add = rowBuilder.build(); + Object[] merged = BTree.<Row>merge(tree, add, metadata.comparator, + UpdateFunction.Simple.of((a, b) -> Rows.merge(a, b, createdAtInSec))); + + EncodingStats newStats = EncodingStats.Collector.collect(staticRow, BTree.iterator(merged), deletionInfo); + + isBuilt = true; + return new PartitionUpdate(metadata, + partitionKey(), + new Holder(columns, + merged, + deletionInfo, + staticRow, + newStats), + deletionInfo, + canHaveShadowedData); + } + + public RegularAndStaticColumns columns() + { + return columns; + } + + public DeletionTime partitionLevelDeletion() + { + return deletionInfo.getPartitionDeletion(); + } + + private BTree.Builder<Row> rowBuilder(int initialCapacity) + { + return BTree.<Row>builder(metadata.comparator, initialCapacity) + .setQuickResolver((a, b) -> + Rows.merge(a, b, createdAtInSec)); + } + /** + * Modify this update to set every timestamp for live data to {@code newTimestamp} and + * every deletion timestamp to {@code newTimestamp - 1}. + * + * There is no reason to use that expect on the Paxos code path, where we need ensure that + * anything inserted use the ballot timestamp (to respect the order of update decided by + * the Paxos algorithm). We use {@code newTimestamp - 1} for deletions because tombstones + * always win on timestamp equality and we don't want to delete our own insertions + * (typically, when we overwrite a collection, we first set a complex deletion to delete the + * previous collection before adding new elements. If we were to set that complex deletion + * to the same timestamp that the new elements, it would delete those elements). And since + * tombstones always wins on timestamp equality, using -1 guarantees our deletion will still + * delete anything from a previous update. + */ + public Builder updateAllTimestamp(long newTimestamp) + { + deletionInfo.updateAllTimestamp(newTimestamp - 1); + tree = BTree.<Row>transformAndFilter(tree, (x) -> x.updateAllTimestamp(newTimestamp)); + staticRow = this.staticRow.updateAllTimestamp(newTimestamp); + return this; + } + + @Override + public String toString() + { + return "Builder{" + + "metadata=" + metadata + + ", key=" + key + + ", deletionInfo=" + deletionInfo + + ", canHaveShadowedData=" + canHaveShadowedData + + ", createdAtInSec=" + createdAtInSec + + ", staticRow=" + staticRow + + ", columns=" + columns + + ", isBuilt=" + isBuilt + + '}'; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/view/TableViews.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java b/src/java/org/apache/cassandra/db/view/TableViews.java index 7a1373c..298fcfd 100644 --- a/src/java/org/apache/cassandra/db/view/TableViews.java +++ b/src/java/org/apache/cassandra/db/view/TableViews.java @@ -21,7 +21,9 @@ import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; @@ -510,23 +512,23 @@ public class TableViews extends AbstractCollection<View> return mutations; } - Map<DecoratedKey, Mutation> mutations = new HashMap<>(); + Map<DecoratedKey, Mutation.PartitionUpdateCollector> mutations = new HashMap<>(); for (ViewUpdateGenerator generator : generators) { for (PartitionUpdate update : generator.generateViewUpdates()) { DecoratedKey key = update.partitionKey(); - Mutation mutation = mutations.get(key); - if (mutation == null) + Mutation.PartitionUpdateCollector collector = mutations.get(key); + if (collector == null) { - mutation = new Mutation(baseTableMetadata.keyspace, key); - mutations.put(key, mutation); + collector = new Mutation.PartitionUpdateCollector(baseTableMetadata.keyspace, key); + mutations.put(key, collector); } - mutation.add(update); + collector.add(update); } generator.clear(); } - return mutations.values(); + return mutations.values().stream().map(Mutation.PartitionUpdateCollector::build).collect(Collectors.toList()); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java index 794a6b7..73ca240 100644 --- a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java +++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.view; import java.nio.ByteBuffer; import java.util.*; +import java.util.stream.Collectors; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; @@ -53,7 +54,7 @@ public class ViewUpdateGenerator private final TableMetadata viewMetadata; private final boolean baseEnforceStrictLiveness; - private final Map<DecoratedKey, PartitionUpdate> updates = new HashMap<>(); + private final Map<DecoratedKey, PartitionUpdate.Builder> updates = new HashMap<>(); // Reused internally to build a new entry private final ByteBuffer[] currentViewEntryPartitionKey; @@ -143,7 +144,7 @@ public class ViewUpdateGenerator */ public Collection<PartitionUpdate> generateViewUpdates() { - return updates.values(); + return updates.values().stream().map(PartitionUpdate.Builder::build).collect(Collectors.toList()); } /** @@ -566,14 +567,13 @@ public class ViewUpdateGenerator return; DecoratedKey partitionKey = makeCurrentPartitionKey(); - PartitionUpdate update = updates.get(partitionKey); - if (update == null) - { - // We can't really know which columns of the view will be updated nor how many row will be updated for this key - // so we rely on hopefully sane defaults. - update = new PartitionUpdate(viewMetadata, partitionKey, viewMetadata.regularAndStaticColumns(), 4); - updates.put(partitionKey, update); - } + // We can't really know which columns of the view will be updated nor how many row will be updated for this key + // so we rely on hopefully sane defaults. + PartitionUpdate.Builder update = updates.computeIfAbsent(partitionKey, + k -> new PartitionUpdate.Builder(viewMetadata, + partitionKey, + viewMetadata.regularAndStaticColumns(), + 4)); update.add(row); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index 1fa5d8e..044a00b 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -115,7 +115,7 @@ abstract class AbstractSSTableSimpleWriter implements Closeable return maxGen; } - PartitionUpdate getUpdateFor(ByteBuffer key) throws IOException + PartitionUpdate.Builder getUpdateFor(ByteBuffer key) throws IOException { return getUpdateFor(metadata.get().partitioner.decorateKey(key)); } @@ -126,6 +126,6 @@ abstract class AbstractSSTableSimpleWriter implements Closeable * @param key they partition key for which the returned update will be. * @return an update on partition {@code key} that is tied to this writer. */ - abstract PartitionUpdate getUpdateFor(DecoratedKey key) throws IOException; + abstract PartitionUpdate.Builder getUpdateFor(DecoratedKey key) throws IOException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java index afb4461..369be12 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java @@ -68,16 +68,16 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter diskWriter.start(); } - PartitionUpdate getUpdateFor(DecoratedKey key) + PartitionUpdate.Builder getUpdateFor(DecoratedKey key) { assert key != null; - - PartitionUpdate previous = buffer.get(key); + PartitionUpdate.Builder previous = buffer.get(key); if (previous == null) { - previous = createPartitionUpdate(key); - currentSize += PartitionUpdate.serializer.serializedSize(previous, formatType.info.getLatestVersion().correspondingMessagingVersion()); - previous.allowNewUpdates(); + // todo: inefficient - we create and serialize a PU just to get its size, then recreate it + // todo: either allow PartitionUpdateBuilder to have .build() called several times or pre-calculate the size + currentSize += PartitionUpdate.serializer.serializedSize(createPartitionUpdateBuilder(key).build(), formatType.info.getLatestVersion().correspondingMessagingVersion()); + previous = createPartitionUpdateBuilder(key); buffer.put(key, previous); } return previous; @@ -108,9 +108,9 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter } } - private PartitionUpdate createPartitionUpdate(DecoratedKey key) + private PartitionUpdate.Builder createPartitionUpdateBuilder(DecoratedKey key) { - return new PartitionUpdate(metadata.get(), key, columns, 4) + return new PartitionUpdate.Builder(metadata.get(), key, columns, 4) { @Override public void add(Row row) @@ -188,7 +188,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter } //// typedef - static class Buffer extends TreeMap<DecoratedKey, PartitionUpdate> {} + static class Buffer extends TreeMap<DecoratedKey, PartitionUpdate.Builder> {} private class DiskWriter extends FastThreadLocalThread { @@ -206,8 +206,8 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter try (SSTableTxnWriter writer = createWriter()) { - for (Map.Entry<DecoratedKey, PartitionUpdate> entry : b.entrySet()) - writer.append(entry.getValue().unfilteredIterator()); + for (Map.Entry<DecoratedKey, PartitionUpdate.Builder> entry : b.entrySet()) + writer.append(entry.getValue().build().unfilteredIterator()); writer.finish(false); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org