This is an automated email from the ASF dual-hosted git repository. bereng pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 438346aaa6aa8b57ae6ee31fd6ab18e6369292f3 Merge: a0c6191238 a83de9bcd2 Author: Bereng <berenguerbl...@gmail.com> AuthorDate: Thu Jan 12 08:19:22 2023 +0100 Merge branch 'cassandra-4.0' into cassandra-4.1 .../cassandra/db/SinglePartitionReadCommand.java | 3 +- .../db/compaction/CompactionController.java | 14 +- .../cassandra/db/memtable/AbstractMemtable.java | 19 ++- .../org/apache/cassandra/db/memtable/Memtable.java | 2 + .../db/memtable/ShardedSkipListMemtable.java | 9 +- .../apache/cassandra/db/ColumnFamilyStoreTest.java | 152 +++++++++++++++++++++ 6 files changed, 192 insertions(+), 7 deletions(-) diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 64136b6099,8ac26e8513..963b9fee1c --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@@ -661,19 -605,20 +661,20 @@@ public class SinglePartitionReadComman InputCollector<UnfilteredRowIterator> inputCollector = iteratorsForPartition(view, controller); try { + SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector(); + for (Memtable memtable : view.memtables) { - Partition partition = memtable.getPartition(partitionKey()); - if (partition == null) + @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator + UnfilteredRowIterator iter = memtable.rowIterator(partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), metricsCollector); + if (iter == null) continue; - minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp()); + if (memtable.getMinTimestamp() != Memtable.NO_MIN_TIMESTAMP) + minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp()); - @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator - UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition); - // Memtable data is always considered unrepaired - controller.updateMinOldestUnrepairedTombstone(partition.stats().minLocalDeletionTime); + controller.updateMinOldestUnrepairedTombstone(memtable.getMinLocalDeletionTime()); inputCollector.addMemtableIterator(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false)); mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java index 26dcdd39a6,bb2094f931..0e79fc1b59 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@@ -263,10 -270,14 +266,13 @@@ public class CompactionController exten for (Memtable memtable : memtables) { - if (memtable.rowIterator(key) != null) + if (memtable.getMinTimestamp() != Memtable.NO_MIN_TIMESTAMP) { - minTimestampSeen = Math.min(minTimestampSeen, memtable.getMinTimestamp()); - hasTimestamp = true; - Partition partition = memtable.getPartition(key); - if (partition != null) ++ if (memtable.rowIterator(key) != null) + { - minTimestampSeen = Math.min(minTimestampSeen, partition.stats().minTimestamp); ++ minTimestampSeen = Math.min(minTimestampSeen, memtable.getMinTimestamp()); + hasTimestamp = true; + } } } diff --cc src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java index 0ac7482e4a,0000000000..ca6dbf6577 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java +++ b/src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java @@@ -1,221 -1,0 +1,238 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.memtable; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + ++import com.google.common.annotations.VisibleForTesting; ++ +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; + +public abstract class AbstractMemtable implements Memtable +{ + protected final AtomicLong currentOperations = new AtomicLong(0); + protected final ColumnsCollector columnsCollector; + protected final StatsCollector statsCollector = new StatsCollector(); + // The smallest timestamp for all partitions stored in this memtable + protected AtomicLong minTimestamp = new AtomicLong(Long.MAX_VALUE); + // The smallest local deletion time for all partitions in this memtable + protected AtomicInteger minLocalDeletionTime = new AtomicInteger(Integer.MAX_VALUE); + // Note: statsCollector has corresponding statistics to the two above, but starts with an epoch value which is not + // correct for their usage. + + protected TableMetadataRef metadata; + + public AbstractMemtable(TableMetadataRef metadataRef) + { + this.metadata = metadataRef; + this.columnsCollector = new ColumnsCollector(metadata.get().regularAndStaticColumns()); + } + ++ @VisibleForTesting ++ public AbstractMemtable(TableMetadataRef metadataRef, long minTimestamp) ++ { ++ this.metadata = metadataRef; ++ this.columnsCollector = new ColumnsCollector(metadata.get().regularAndStaticColumns()); ++ this.minTimestamp = new AtomicLong(minTimestamp); ++ } ++ + public TableMetadata metadata() + { + return metadata.get(); + } + + public long operationCount() + { + return currentOperations.get(); + } + ++ /** ++ * Returns the minTS if one available, otherwise NO_MIN_TIMESTAMP. ++ * ++ * EncodingStats uses a synthetic epoch TS at 2015. We don't want to leak that (CASSANDRA-18118) so we return NO_MIN_TIMESTAMP instead. ++ * ++ * @return The minTS or NO_MIN_TIMESTAMP if none available ++ */ + public long getMinTimestamp() + { - return minTimestamp.get(); ++ return minTimestamp.get() != EncodingStats.NO_STATS.minTimestamp ? minTimestamp.get() : NO_MIN_TIMESTAMP; + } + + public int getMinLocalDeletionTime() + { + return minLocalDeletionTime.get(); + } + + protected static void updateMin(AtomicLong minTracker, long newValue) + { + while (true) + { + long existing = minTracker.get(); + if (existing <= newValue) + break; + if (minTracker.compareAndSet(existing, newValue)) + break; + } + } + + protected static void updateMin(AtomicInteger minTracker, int newValue) + { + while (true) + { + int existing = minTracker.get(); + if (existing <= newValue) + break; + if (minTracker.compareAndSet(existing, newValue)) + break; + } + } + + RegularAndStaticColumns columns() + { + return columnsCollector.get(); + } + + EncodingStats encodingStats() + { + return statsCollector.get(); + } + + protected static class ColumnsCollector + { + private final HashMap<ColumnMetadata, AtomicBoolean> predefined = new HashMap<>(); + private final ConcurrentSkipListSet<ColumnMetadata> extra = new ConcurrentSkipListSet<>(); + + ColumnsCollector(RegularAndStaticColumns columns) + { + for (ColumnMetadata def : columns.statics) + predefined.put(def, new AtomicBoolean()); + for (ColumnMetadata def : columns.regulars) + predefined.put(def, new AtomicBoolean()); + } + + public void update(RegularAndStaticColumns columns) + { + for (ColumnMetadata s : columns.statics) + update(s); + for (ColumnMetadata r : columns.regulars) + update(r); + } + + public void update(ColumnsCollector other) + { + for (Map.Entry<ColumnMetadata, AtomicBoolean> v : other.predefined.entrySet()) + if (v.getValue().get()) + update(v.getKey()); + + extra.addAll(other.extra); + } + + private void update(ColumnMetadata definition) + { + AtomicBoolean present = predefined.get(definition); + if (present != null) + { + if (!present.get()) + present.set(true); + } + else + { + extra.add(definition); + } + } + + /** + * Get the current state of the columns set. + * + * Note: If this is executed while mutations are still being performed on the table (e.g. to prepare + * an sstable for streaming when Memtable.Factory.streamFromMemtable() is true), the resulting view may be + * in a somewhat inconsistent state (it may include partial updates, as well as miss updates older than + * ones it does include). + */ + public RegularAndStaticColumns get() + { + RegularAndStaticColumns.Builder builder = RegularAndStaticColumns.builder(); + for (Map.Entry<ColumnMetadata, AtomicBoolean> e : predefined.entrySet()) + if (e.getValue().get()) + builder.add(e.getKey()); + return builder.addAll(extra).build(); + } + } + + protected static class StatsCollector + { + private final AtomicReference<EncodingStats> stats = new AtomicReference<>(EncodingStats.NO_STATS); + + public void update(EncodingStats newStats) + { + while (true) + { + EncodingStats current = stats.get(); + EncodingStats updated = current.mergeWith(newStats); + if (stats.compareAndSet(current, updated)) + return; + } + } + + public EncodingStats get() + { + return stats.get(); + } + } + + protected abstract class AbstractFlushablePartitionSet<P extends Partition> implements FlushablePartitionSet<P> + { + public long dataSize() + { + return getLiveDataSize(); + } + + public CommitLogPosition commitLogLowerBound() + { + return AbstractMemtable.this.getCommitLogLowerBound(); + } + + public LastCommitLogPosition commitLogUpperBound() + { + return AbstractMemtable.this.getFinalCommitLogUpperBound(); + } + + public EncodingStats encodingStats() + { + return AbstractMemtable.this.encodingStats(); + } + + public RegularAndStaticColumns columns() + { + return AbstractMemtable.this.columns(); + } + } +} diff --cc src/java/org/apache/cassandra/db/memtable/Memtable.java index 8db28533f8,0000000000..3a18a8c7d7 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/memtable/Memtable.java +++ b/src/java/org/apache/cassandra/db/memtable/Memtable.java @@@ -1,431 -1,0 +1,433 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.memtable; + +import java.util.concurrent.atomic.AtomicReference; + +import javax.annotation.concurrent.NotThreadSafe; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.UnfilteredSource; +import org.apache.cassandra.index.transactions.UpdateTransaction; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.OpOrder; + +/** + * Memtable interface. This defines the operations the ColumnFamilyStore can perform with memtables. + * They are of several types: + * - construction factory interface + * - write and read operations: put, rowIterator and partitionIterator + * - statistics and features, including partition counts, data size, encoding stats, written columns + * - memory usage tracking, including methods of retrieval and of adding extra allocated space (used non-CFS secondary + * indexes) + * - flush functionality, preparing the set of partitions to flush for given ranges + * - lifecycle management, i.e. operations that prepare and execute switch to a different memtable, together + * with ways of tracking the affected commit log spans + * + * See Memtable_API.md for details on implementing and using alternative memtable implementations. + */ +public interface Memtable extends Comparable<Memtable>, UnfilteredSource +{ ++ public static final long NO_MIN_TIMESTAMP = -1; ++ + // Construction + + /** + * Factory interface for constructing memtables, and querying write durability features. + * + * The factory is chosen using the MemtableParams class (passed as argument to + * {@code CREATE TABLE ... WITH memtable = '<configuration_name>'} where the configuration definition is a map given + * under {@code memtable_configurations} in cassandra.yaml). To make that possible, implementations must provide + * either a static {@code FACTORY} field (if they accept no further option) or a static + * {@code factory(Map<String, String>)} method. In the latter case, the method should avoid creating + * multiple instances of the factory for the same parameters, or factories should at least implement hashCode and + * equals. + */ + interface Factory + { + /** + * Create a memtable. + * + * @param commitLogLowerBound A commit log lower bound for the new memtable. This will be equal to the previous + * memtable's upper bound and defines the span of positions that any flushed sstable + * will cover. + * @param metadaRef Pointer to the up-to-date table metadata. + * @param owner Owning objects that will receive flush requests triggered by the memtable (e.g. on expiration). + */ + Memtable create(AtomicReference<CommitLogPosition> commitLogLowerBound, TableMetadataRef metadaRef, Owner owner); + + /** + * If the memtable can achieve write durability directly (i.e. using some feature other than the commitlog, e.g. + * persistent memory), it can return true here, in which case the commit log will not store mutations in this + * table. + * Note that doing so will prevent point-in-time restores and changed data capture, thus a durable memtable must + * allow the option of turning commit log writing on even if it does not need it. + */ + default boolean writesShouldSkipCommitLog() + { + return false; + } + + /** + * This should be true if the memtable can achieve write durability for crash recovery directly (i.e. using some + * feature other than the commitlog, e.g. persistent memory). + * Setting this flag to true means that the commitlog should not replay mutations for this table on restart, + * and that it should not try to preserve segments that contain relevant data. + * Unless writesShouldSkipCommitLog() is also true, writes will be recorded in the commit log as they may be + * needed for changed data capture or point-in-time restore. + */ + default boolean writesAreDurable() + { + return false; + } + + /** + * Normally we can receive streamed sstables directly, skipping the memtable stage (zero-copy-streaming). When + * the memtable is the primary data store (e.g. persistent memtables), it will usually prefer to receive the + * data instead. + * + * If this returns true, all streamed sstables's content will be read and replayed as mutations, disabling + * zero-copy streaming. + */ + default boolean streamToMemtable() + { + return false; + } + + /** + * When we need to stream data, we usually flush and stream the resulting sstables. This will not work correctly + * if the memtable does not want to flush for streaming (e.g. persistent memtables acting as primary data + * store), because data (not just recent) will be missing from the streamed view. Such memtables must present + * their data separately for streaming. + * In other words if the memtable returns false on shouldSwitch(STREAMING/REPAIR), its factory must return true + * here. + * + * If this flag returns true, streaming will write the relevant content that resides in the memtable to + * temporary sstables, stream these sstables and then delete them. + */ + default boolean streamFromMemtable() + { + return false; + } + + /** + * Override this method to include implementation-specific memtable metrics in the table metrics. + * + * Memtable metrics lifecycle matches table lifecycle. It is the table that owns the metrics and + * decides when to release them. + */ + default TableMetrics.ReleasableMetric createMemtableMetrics(TableMetadataRef metadataRef) + { + return null; + } + } + + /** + * Interface for providing signals back and requesting information from the owner, i.e. the object that controls the + * memtable. This is usually the ColumnFamilyStore; the interface is used to limit the dependency of memtables on + * the details of its implementation. + */ + interface Owner + { + /** Signal to the owner that a flush is required (e.g. in response to hitting space limits) */ + Future<CommitLogPosition> signalFlushRequired(Memtable memtable, ColumnFamilyStore.FlushReason reason); + + /** Get the current memtable for this owner. Used to avoid capturing memtable in scheduled flush tasks. */ + Memtable getCurrentMemtable(); + + /** + * Collect the index memtables flushed together with this. Used to accurately calculate memory that would be + * freed by a flush. + */ + Iterable<Memtable> getIndexMemtables(); + + /** + * Construct a list of boundaries that split the locally-owned ranges into the given number of shards, + * splitting the owned space evenly. It is up to the memtable to use this information. + * Any changes in the ring structure (e.g. added or removed nodes) will invalidate the splits; in such a case + * the memtable will be sent a {@link #shouldSwitch}(OWNED_RANGES_CHANGE) and, should that return false, a + * {@link #localRangesUpdated()} call. + */ + ShardBoundaries localRangeSplits(int shardCount); + } + + // Main write and read operations + + /** + * Put new data in the memtable. This operation may block until enough memory is available in the memory pool. + * + * @param update the partition update, may be a new partition or an update to an existing one + * @param indexer receives information about the update's effect + * @param opGroup write operation group, used to permit the operation to complete if it is needed to complete a + * flush to free space. + * + * @return the smallest timestamp delta between corresponding rows from existing and update. A + * timestamp delta being computed as the difference between the cells and DeletionTimes from any existing partition + * and those in {@code update}. See CASSANDRA-7979. + */ + long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup); + + // Read operations are provided by the UnfilteredSource interface. + + // Statistics + + /** Number of partitions stored in the memtable */ + long partitionCount(); + + /** Size of the data not accounting for any metadata / mapping overheads */ + long getLiveDataSize(); + + /** + * Number of "operations" (in the sense defined in {@link PartitionUpdate#operationCount()}) the memtable has + * executed. + */ + long operationCount(); + + /** + * The table's definition metadata. + * + * Note that this tracks the current state of the table and is not necessarily the same as what was used to create + * the memtable. + */ + TableMetadata metadata(); + + + // Memory usage tracking + + /** + * Add this memtable's used memory to the given usage object. This can be used to retrieve a single memtable's usage + * as well as to combine the ones of related sstables (e.g. a table and its table-based secondary indexes). + */ + void addMemoryUsageTo(MemoryUsage usage); + + + /** + * Creates a holder for memory usage collection. + * + * This is used to track on- and off-heap memory, as well as the ratio to the total permitted memtable memory. + */ + static MemoryUsage newMemoryUsage() + { + return new MemoryUsage(); + } + + /** + * Shorthand for the getting a given table's memory usage. + * Implemented as a static to prevent implementations altering expectations by e.g. returning a cached object. + */ + static MemoryUsage getMemoryUsage(Memtable memtable) + { + MemoryUsage usage = newMemoryUsage(); + memtable.addMemoryUsageTo(usage); + return usage; + } + + @NotThreadSafe + class MemoryUsage + { + /** On-heap memory used in bytes */ + public long ownsOnHeap = 0; + /** Off-heap memory used in bytes */ + public long ownsOffHeap = 0; + /** On-heap memory as ratio to permitted memtable space */ + public float ownershipRatioOnHeap = 0.0f; + /** Off-heap memory as ratio to permitted memtable space */ + public float ownershipRatioOffHeap = 0.0f; + + @Override + public String toString() + { + return String.format("%s (%.0f%%) on-heap, %s (%.0f%%) off-heap", + FBUtilities.prettyPrintMemory(ownsOnHeap), + ownershipRatioOnHeap * 100, + FBUtilities.prettyPrintMemory(ownsOffHeap), + ownershipRatioOffHeap * 100); + } + } + + /** + * Adjust the used on-heap space by the given size (e.g. to reflect memory used by a non-table-based index). + * This operation may block until enough memory is available in the memory pool. + * + * @param additionalSpace the number of allocated bytes + * @param opGroup write operation group, used to permit the operation to complete if it is needed to complete a + * flush to free space. + */ + void markExtraOnHeapUsed(long additionalSpace, OpOrder.Group opGroup); + + /** + * Adjust the used off-heap space by the given size (e.g. to reflect memory used by a non-table-based index). + * This operation may block until enough memory is available in the memory pool. + * + * @param additionalSpace the number of allocated bytes + * @param opGroup write operation group, used to permit the operation to complete if it is needed to complete a + * flush to free space. + */ + void markExtraOffHeapUsed(long additionalSpace, OpOrder.Group opGroup); + + + // Flushing + + /** + * Get the collection of data between the given partition boundaries in a form suitable for flushing. + */ + FlushablePartitionSet<?> getFlushSet(PartitionPosition from, PartitionPosition to); + + /** + * A collection of partitions for flushing plus some information required for writing an sstable. + * + * Note that the listed entries must conform with the specified metadata. In particular, if the memtable is still + * being written to, care must be taken to not list newer items as they may violate the bounds collected by the + * encoding stats or refer to columns that don't exist in the collected columns set. + */ + interface FlushablePartitionSet<P extends Partition> extends Iterable<P>, SSTableWriter.SSTableSizeParameters + { + Memtable memtable(); + + PartitionPosition from(); + PartitionPosition to(); + + /** The commit log position at the time that this memtable was created */ + CommitLogPosition commitLogLowerBound(); + /** The commit log position at the time that this memtable was switched out */ + CommitLogPosition commitLogUpperBound(); + + /** The set of all columns that have been written */ + RegularAndStaticColumns columns(); + /** Statistics required for writing an sstable efficiently */ + EncodingStats encodingStats(); + + default TableMetadata metadata() + { + return memtable().metadata(); + } + + default boolean isEmpty() + { + return partitionCount() > 0; + } + } + + + // Lifecycle management + + /** + * Called to tell the memtable that it is being switched out and will be flushed (or dropped) and discarded. + * Will be followed by a {@link #getFlushSet} call (if the table is not truncated or dropped), and a + * {@link #discard}. + * + * @param writeBarrier The barrier that will signal that all writes to this memtable have completed. That is, the + * point after which writes cannot be accepted by this memtable (it is permitted for writes + * before this barrier to go into the next; see {@link #accepts}). + * @param commitLogUpperBound The upper commit log position for this memtable. The value may be modified after this + * call and will match the next memtable's lower commit log bound. + */ + void switchOut(OpOrder.Barrier writeBarrier, AtomicReference<CommitLogPosition> commitLogUpperBound); + + /** + * This memtable is no longer in use or required for outstanding flushes or operations. + * All held memory must be released. + */ + void discard(); + + /** + * Decide if this memtable should take a write with the given parameters, or if the write should go to the next + * memtable. This enforces that no writes after the barrier set by {@link #switchOut} can be accepted, and + * is also used to define a shared commit log bound as the upper for this memtable and lower for the next. + */ + boolean accepts(OpOrder.Group opGroup, CommitLogPosition commitLogPosition); + + /** Approximate commit log lower bound, <= getCommitLogLowerBound, used as a time stamp for ordering */ + CommitLogPosition getApproximateCommitLogLowerBound(); + + /** The commit log position at the time that this memtable was created */ + CommitLogPosition getCommitLogLowerBound(); + + /** The commit log position at the time that this memtable was switched out */ + LastCommitLogPosition getFinalCommitLogUpperBound(); + + /** True if the memtable can contain any data that was written before the given commit log position */ + boolean mayContainDataBefore(CommitLogPosition position); + + /** True if the memtable contains no data */ + boolean isClean(); + + /** Order memtables by time as reflected in the commit log position at time of construction */ + default int compareTo(Memtable that) + { + return this.getApproximateCommitLogLowerBound().compareTo(that.getApproximateCommitLogLowerBound()); + } + + /** + * Decides whether the memtable should be switched/flushed for the passed reason. + * Normally this will return true, but e.g. persistent memtables may choose not to flush. Returning false will + * trigger further action for some reasons: + * - SCHEMA_CHANGE will be followed by metadataUpdated(). + * - OWNED_RANGES_CHANGE will be followed by localRangesUpdated(). + * - SNAPSHOT will be followed by performSnapshot(). + * - STREAMING/REPAIR will be followed by creating a FlushSet for the streamed/repaired ranges. This data will be + * used to create sstables, which will be streamed and then deleted. + * This will not be called to perform truncation or drop (in that case the memtable is unconditionally dropped), + * but a flush may nevertheless be requested in that case to prepare a snapshot. + */ + boolean shouldSwitch(ColumnFamilyStore.FlushReason reason); + + /** + * Called when the table's metadata is updated. The memtable's metadata reference now points to the new version. + * This will not be called if {@link #shouldSwitch)(SCHEMA_CHANGE) returns true, as the memtable will be swapped out + * instead. + */ + void metadataUpdated(); + + /** + * Called when the known ranges have been updated and owner.localRangeSplits() may return different values. + * This will not be called if {@link #shouldSwitch)(OWNED_RANGES_CHANGE) returns true, as the memtable will be + * swapped out instead. + */ + void localRangesUpdated(); + + /** + * If the memtable needs to do some special action for snapshots (e.g. because it is persistent and does not want + * to flush), it should return false on the above with reason SNAPSHOT and implement this method. + */ + void performSnapshot(String snapshotName); + + /** + * Special commit log position marker used in the upper bound marker setting process + * (see {@link org.apache.cassandra.db.ColumnFamilyStore#setCommitLogUpperBound} and {@link AbstractMemtable#accepts}) + */ + public static final class LastCommitLogPosition extends CommitLogPosition + { + public LastCommitLogPosition(CommitLogPosition copy) + { + super(copy.segmentId, copy.position); + } + } +} diff --cc src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java index e1a192ed70,0000000000..51cd5f2a14 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java +++ b/src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java @@@ -1,562 -1,0 +1,569 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.memtable; + +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.Slices; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.AtomicBTreePartition; +import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.IncludingExcludingBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.index.transactions.UpdateTransaction; +import org.apache.cassandra.io.sstable.format.SSTableReadsListener; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.Cloner; +import org.apache.cassandra.utils.memory.MemtableAllocator; +import org.github.jamm.Unmetered; + +/** + * A proof-of-concept sharded memtable implementation. This implementation splits the partition skip-list into several + * independent skip-lists each covering a roughly equal part of the token space served by this node. This reduces + * congestion of the skip-list from concurrent writes and can lead to improved write throughput. + * + * The implementation takes two parameters: + * - shards: the number of shards to split into. + * - serialize_writes: if false, each shard may serve multiple writes in parallel; if true, writes to each shard are + * synchronized. + * + * Also see Memtable_API.md. + */ +public class ShardedSkipListMemtable extends AbstractAllocatorMemtable +{ + private static final Logger logger = LoggerFactory.getLogger(ShardedSkipListMemtable.class); + + public static final String SHARDS_OPTION = "shards"; + public static final String LOCKING_OPTION = "serialize_writes"; + + // The boundaries for the keyspace as they were calculated when the memtable is created. + // The boundaries will be NONE for system keyspaces or if StorageService is not yet initialized. + // The fact this is fixed for the duration of the memtable lifetime, guarantees we'll always pick the same shard + // for a given key, even if we race with the StorageService initialization or with topology changes. + @Unmetered + final ShardBoundaries boundaries; + + /** + * Core-specific memtable regions. All writes must go through the specific core. The data structures used + * are concurrent-read safe, thus reads can be carried out from any thread. + */ + final MemtableShard[] shards; + + @VisibleForTesting + public static final String SHARD_COUNT_PROPERTY = "cassandra.memtable.shard.count"; + + // default shard count, used when a specific number of shards is not specified in the parameters + private static final int SHARD_COUNT = Integer.getInteger(SHARD_COUNT_PROPERTY, FBUtilities.getAvailableProcessors()); + + private final Factory factory; + + // only to be used by init(), to setup the very first memtable for the cfs + ShardedSkipListMemtable(AtomicReference<CommitLogPosition> commitLogLowerBound, + TableMetadataRef metadataRef, + Owner owner, + Integer shardCountOption, + Factory factory) + { + super(commitLogLowerBound, metadataRef, owner); + int shardCount = shardCountOption != null ? shardCountOption : SHARD_COUNT; + this.boundaries = owner.localRangeSplits(shardCount); + this.shards = generatePartitionShards(boundaries.shardCount(), allocator, metadataRef); + this.factory = factory; + } + + private static MemtableShard[] generatePartitionShards(int splits, + MemtableAllocator allocator, + TableMetadataRef metadata) + { + MemtableShard[] partitionMapContainer = new MemtableShard[splits]; + for (int i = 0; i < splits; i++) + partitionMapContainer[i] = new MemtableShard(metadata, allocator); + + return partitionMapContainer; + } + + public boolean isClean() + { + for (MemtableShard shard : shards) + if (!shard.isEmpty()) + return false; + return true; + } + + @Override + protected Memtable.Factory factory() + { + return factory; + } + + /** + * Should only be called by ColumnFamilyStore.apply via Keyspace.apply, which supplies the appropriate + * OpOrdering. + * + * commitLogSegmentPosition should only be null if this is a secondary index, in which case it is *expected* to be null + */ + public long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup) + { + DecoratedKey key = update.partitionKey(); + MemtableShard shard = shards[boundaries.getShardForKey(key)]; + return shard.put(key, update, indexer, opGroup); + } + + /** + * Technically we should scatter gather on all the core threads because the size in following calls are not + * using volatile variables, but for metrics purpose this should be good enough. + */ + @Override + public long getLiveDataSize() + { + long total = 0L; + for (MemtableShard shard : shards) + total += shard.liveDataSize(); + return total; + } + + @Override + public long operationCount() + { + long total = 0L; + for (MemtableShard shard : shards) + total += shard.currentOperations(); + return total; + } + + @Override + public long partitionCount() + { + int total = 0; + for (MemtableShard shard : shards) + total += shard.size(); + return total; + } + ++ /** ++ * Returns the minTS if one available, otherwise NO_MIN_TIMESTAMP. ++ * ++ * EncodingStats uses a synthetic epoch TS at 2015. We don't want to leak that (CASSANDRA-18118) so we return NO_MIN_TIMESTAMP instead. ++ * ++ * @return The minTS or NO_MIN_TIMESTAMP if none available ++ */ + @Override + public long getMinTimestamp() + { + long min = Long.MAX_VALUE; + for (MemtableShard shard : shards) + min = Long.min(min, shard.minTimestamp()); - return min; ++ return min != EncodingStats.NO_STATS.minTimestamp ? min : NO_MIN_TIMESTAMP; + } + + @Override + public int getMinLocalDeletionTime() + { + int min = Integer.MAX_VALUE; + for (MemtableShard shard : shards) + min = Integer.min(min, shard.minLocalDeletionTime()); + return min; + } + + @Override + RegularAndStaticColumns columns() + { + for (MemtableShard shard : shards) + columnsCollector.update(shard.columnsCollector); + return columnsCollector.get(); + } + + @Override + EncodingStats encodingStats() + { + for (MemtableShard shard : shards) + statsCollector.update(shard.statsCollector.get()); + return statsCollector.get(); + } + + @Override + public MemtableUnfilteredPartitionIterator partitionIterator(final ColumnFilter columnFilter, + final DataRange dataRange, + SSTableReadsListener readsListener) + { + AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange(); + + PartitionPosition left = keyRange.left; + PartitionPosition right = keyRange.right; + + boolean isBound = keyRange instanceof Bounds; + boolean includeStart = isBound || keyRange instanceof IncludingExcludingBounds; + boolean includeStop = isBound || keyRange instanceof Range; + + Iterator<AtomicBTreePartition> iterator = getPartitionIterator(left, includeStart, right, includeStop); + + return new MemtableUnfilteredPartitionIterator(metadata(), iterator, columnFilter, dataRange); + // readsListener is ignored as it only accepts sstable signals + } + + private Iterator<AtomicBTreePartition> getPartitionIterator(PartitionPosition left, boolean includeStart, PartitionPosition right, boolean includeStop) + { + int leftShard = left != null && !left.isMinimum() ? boundaries.getShardForKey(left) : 0; + int rightShard = right != null && !right.isMinimum() ? boundaries.getShardForKey(right) : boundaries.shardCount() - 1; + Iterator<AtomicBTreePartition> iterator; + if (leftShard == rightShard) + iterator = shards[leftShard].getPartitionsSubMap(left, includeStart, right, includeStop).values().iterator(); + else + { + Iterator<AtomicBTreePartition>[] iters = new Iterator[rightShard - leftShard + 1]; + int i = leftShard; + iters[0] = shards[leftShard].getPartitionsSubMap(left, includeStart, null, true).values().iterator(); + for (++i; i < rightShard; ++i) + iters[i - leftShard] = shards[i].partitions.values().iterator(); + iters[i - leftShard] = shards[i].getPartitionsSubMap(null, true, right, includeStop).values().iterator(); + iterator = Iterators.concat(iters); + } + return iterator; + } + + private Partition getPartition(DecoratedKey key) + { + int shardIndex = boundaries.getShardForKey(key); + return shards[shardIndex].partitions.get(key); + } + + @Override + public UnfilteredRowIterator rowIterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, SSTableReadsListener listener) + { + Partition p = getPartition(key); + if (p == null) + return null; + else + return p.unfilteredIterator(selectedColumns, slices, reversed); + } + + @Override + public UnfilteredRowIterator rowIterator(DecoratedKey key) + { + Partition p = getPartition(key); + return p != null ? p.unfilteredIterator() : null; + } + + public FlushablePartitionSet<AtomicBTreePartition> getFlushSet(PartitionPosition from, PartitionPosition to) + { + long keySize = 0; + int keyCount = 0; + + for (Iterator<AtomicBTreePartition> it = getPartitionIterator(from, true, to,false); it.hasNext();) + { + AtomicBTreePartition en = it.next(); + keySize += en.partitionKey().getKey().remaining(); + keyCount++; + } + long partitionKeySize = keySize; + int partitionCount = keyCount; + Iterator<AtomicBTreePartition> toFlush = getPartitionIterator(from, true, to,false); + + return new AbstractFlushablePartitionSet<AtomicBTreePartition>() + { + public Memtable memtable() + { + return ShardedSkipListMemtable.this; + } + + public PartitionPosition from() + { + return from; + } + + public PartitionPosition to() + { + return to; + } + + public long partitionCount() + { + return partitionCount; + } + + public Iterator<AtomicBTreePartition> iterator() + { + return toFlush; + } + + public long partitionKeysSize() + { + return partitionKeySize; + } + }; + } + + static class MemtableShard + { + // The following fields are volatile as we have to make sure that when we + // collect results from all sub-ranges, the thread accessing the value + // is guaranteed to see the changes to the values. + + // The smallest timestamp for all partitions stored in this shard + private final AtomicLong minTimestamp = new AtomicLong(Long.MAX_VALUE); + private final AtomicInteger minLocalDeletionTime = new AtomicInteger(Integer.MAX_VALUE); + + private final AtomicLong liveDataSize = new AtomicLong(0); + + private final AtomicLong currentOperations = new AtomicLong(0); + + // We index the memtable by PartitionPosition only for the purpose of being able + // to select key range using Token.KeyBound. However put() ensures that we + // actually only store DecoratedKey. + private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> partitions = new ConcurrentSkipListMap<>(); + + private final ColumnsCollector columnsCollector; + + private final StatsCollector statsCollector; + + @Unmetered // total pool size should not be included in memtable's deep size + private final MemtableAllocator allocator; + + private final TableMetadataRef metadata; + + @VisibleForTesting + MemtableShard(TableMetadataRef metadata, MemtableAllocator allocator) + { + this.columnsCollector = new ColumnsCollector(metadata.get().regularAndStaticColumns()); + this.statsCollector = new StatsCollector(); + this.allocator = allocator; + this.metadata = metadata; + } + + public long put(DecoratedKey key, PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup) + { + Cloner cloner = allocator.cloner(opGroup); + AtomicBTreePartition previous = partitions.get(key); + + long initialSize = 0; + if (previous == null) + { + final DecoratedKey cloneKey = cloner.clone(key); + AtomicBTreePartition empty = new AtomicBTreePartition(metadata, cloneKey, allocator); + // We'll add the columns later. This avoids wasting works if we get beaten in the putIfAbsent + previous = partitions.putIfAbsent(cloneKey, empty); + if (previous == null) + { + previous = empty; + // allocate the row overhead after the fact; this saves over allocating and having to free after, but + // means we can overshoot our declared limit. + int overhead = (int) (cloneKey.getToken().getHeapSize() + SkipListMemtable.ROW_OVERHEAD_HEAP_SIZE); + allocator.onHeap().allocate(overhead, opGroup); + initialSize = 8; + } + } + + long[] pair = previous.addAllWithSizeDelta(update, cloner, opGroup, indexer); + updateMin(minTimestamp, update.stats().minTimestamp); + updateMin(minLocalDeletionTime, update.stats().minLocalDeletionTime); + liveDataSize.addAndGet(initialSize + pair[0]); + columnsCollector.update(update.columns()); + statsCollector.update(update.stats()); + currentOperations.addAndGet(update.operationCount()); + return pair[1]; + } + + private Map<PartitionPosition, AtomicBTreePartition> getPartitionsSubMap(PartitionPosition left, + boolean includeLeft, + PartitionPosition right, + boolean includeRight) + { + if (left != null && left.isMinimum()) + left = null; + if (right != null && right.isMinimum()) + right = null; + + try + { + if (left == null) + return right == null ? partitions : partitions.headMap(right, includeRight); + else + return right == null + ? partitions.tailMap(left, includeLeft) + : partitions.subMap(left, includeLeft, right, includeRight); + } + catch (IllegalArgumentException e) + { + logger.error("Invalid range requested {} - {}", left, right); + throw e; + } + } + + public boolean isEmpty() + { + return partitions.isEmpty(); + } + + public int size() + { + return partitions.size(); + } + + long minTimestamp() + { + return minTimestamp.get(); + } + + long liveDataSize() + { + return liveDataSize.get(); + } + + long currentOperations() + { + return currentOperations.get(); + } + + public int minLocalDeletionTime() + { + return minLocalDeletionTime.get(); + } + } + + public static class MemtableUnfilteredPartitionIterator extends AbstractUnfilteredPartitionIterator implements UnfilteredPartitionIterator + { + private final TableMetadata metadata; + private final Iterator<AtomicBTreePartition> iter; + private final ColumnFilter columnFilter; + private final DataRange dataRange; + + public MemtableUnfilteredPartitionIterator(TableMetadata metadata, Iterator<AtomicBTreePartition> iterator, ColumnFilter columnFilter, DataRange dataRange) + { + this.metadata = metadata; + this.iter = iterator; + this.columnFilter = columnFilter; + this.dataRange = dataRange; + } + + public TableMetadata metadata() + { + return metadata; + } + + public boolean hasNext() + { + return iter.hasNext(); + } + + public UnfilteredRowIterator next() + { + AtomicBTreePartition entry = iter.next(); + DecoratedKey key = entry.partitionKey(); + ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(key); + + return filter.getUnfilteredRowIterator(columnFilter, entry); + } + } + + static class Locking extends ShardedSkipListMemtable + { + Locking(AtomicReference<CommitLogPosition> commitLogLowerBound, TableMetadataRef metadataRef, Owner owner, Integer shardCountOption, Factory factory) + { + super(commitLogLowerBound, metadataRef, owner, shardCountOption, factory); + } + + /** + * Should only be called by ColumnFamilyStore.apply via Keyspace.apply, which supplies the appropriate + * OpOrdering. + * + * commitLogSegmentPosition should only be null if this is a secondary index, in which case it is *expected* to be null + */ + public long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup) + { + DecoratedKey key = update.partitionKey(); + MemtableShard shard = shards[boundaries.getShardForKey(key)]; + synchronized (shard) + { + return shard.put(key, update, indexer, opGroup); + } + } + + } + + public static Factory factory(Map<String, String> optionsCopy) + { + String shardsString = optionsCopy.remove(SHARDS_OPTION); + Integer shardCount = shardsString != null ? Integer.parseInt(shardsString) : null; + boolean isLocking = Boolean.parseBoolean(optionsCopy.remove(LOCKING_OPTION)); + return new Factory(shardCount, isLocking); + } + + static class Factory implements Memtable.Factory + { + final Integer shardCount; + final boolean isLocking; + + Factory(Integer shardCount, boolean isLocking) + { + this.shardCount = shardCount; + this.isLocking = isLocking; + } + + public Memtable create(AtomicReference<CommitLogPosition> commitLogLowerBound, + TableMetadataRef metadataRef, + Owner owner) + { + return isLocking + ? new Locking(commitLogLowerBound, metadataRef, owner, shardCount, this) + : new ShardedSkipListMemtable(commitLogLowerBound, metadataRef, owner, shardCount, this); + } + + public boolean equals(Object o) + { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + Factory factory = (Factory) o; + return Objects.equals(shardCount, factory.shardCount); + } + + public int hashCode() + { + return Objects.hash(shardCount); + } + } +} diff --cc test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index 96437db615,b1c6599b5b..4d871fce83 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@@ -22,39 -24,35 +22,51 @@@ import java.io.IOException import java.nio.ByteBuffer; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.*; - +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; ++import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterators; import org.junit.Assert; import org.junit.Before; -import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Test; -import org.apache.cassandra.db.lifecycle.LifecycleTransaction; -import org.apache.cassandra.utils.Pair; -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import com.google.common.collect.Iterators; -import org.apache.cassandra.*; +import com.googlecode.concurrenttrees.common.Iterables; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.UpdateBuilder; +import org.apache.cassandra.Util; import org.apache.cassandra.cql3.Operator; ++import org.apache.cassandra.db.ColumnFamilyStore.FlushReason; ++import org.apache.cassandra.db.commitlog.CommitLogPosition; ++import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.lifecycle.SSTableSet; -import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.db.partitions.*; ++import org.apache.cassandra.db.memtable.AbstractMemtable; ++import org.apache.cassandra.db.memtable.Memtable; +import org.apache.cassandra.db.partitions.FilteredPartition; ++import org.apache.cassandra.db.partitions.PartitionUpdate; ++import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.Cell; ++import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.Row; ++import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.exceptions.ConfigurationException; ++import org.apache.cassandra.index.transactions.UpdateTransaction; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.SSTableReader; ++import org.apache.cassandra.io.sstable.format.SSTableReadsListener; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.ClearableHistogram; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.KeyspaceParams; @@@ -65,12 -59,7 +77,14 @@@ import org.apache.cassandra.service.sna import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; -import static junit.framework.Assert.assertNotNull; ++import org.apache.cassandra.utils.concurrent.OpOrder.Barrier; ++import org.apache.cassandra.utils.concurrent.OpOrder.Group; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class ColumnFamilyStoreTest { @@@ -110,6 -91,15 +124,13 @@@ Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD1).truncateBlocking(); } + @Test + public void testMemtableTimestamp() throws Throwable + { - assertEquals(Memtable.NO_MIN_TIMESTAMP, - (new Memtable(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata(), - EncodingStats.NO_STATS.minTimestamp)) - .getMinTimestamp()); ++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1); ++ assertEquals(Memtable.NO_MIN_TIMESTAMP, fakeMemTableWithMinTS(cfs, EncodingStats.NO_STATS.minTimestamp).getMinTimestamp()); + } + @Test // create two sstables, and verify that we only deserialize data from the most recent one public void testTimeSortedQuery() @@@ -659,18 -556,4 +680,149 @@@ assertEquals(0, ssTableFiles.size()); cfs.clearUnsafe(); } + + @VisibleForTesting + public static long getSnapshotManifestAndSchemaFileSizes(TableSnapshot snapshot) throws IOException + { + Optional<File> schemaFile = snapshot.getSchemaFile(); + Optional<File> manifestFile = snapshot.getManifestFile(); + + long schemaAndManifestFileSizes = 0; + + schemaAndManifestFileSizes += schemaFile.isPresent() ? schemaFile.get().length() : 0; + schemaAndManifestFileSizes += manifestFile.isPresent() ? manifestFile.get().length() : 0; + + return schemaAndManifestFileSizes; + } ++ ++ private Memtable fakeMemTableWithMinTS(ColumnFamilyStore cfs, long minTS) ++ { ++ return new AbstractMemtable(cfs.metadata, minTS) ++ { ++ ++ @Override ++ public UnfilteredRowIterator rowIterator(DecoratedKey key, ++ Slices slices, ++ ColumnFilter columnFilter, ++ boolean reversed, ++ SSTableReadsListener listener) ++ { ++ return null; ++ } ++ ++ @Override ++ public UnfilteredPartitionIterator ++ partitionIterator(ColumnFilter columnFilter, DataRange dataRange, SSTableReadsListener listener) ++ { ++ return null; ++ } ++ ++ @Override ++ public void switchOut(Barrier writeBarrier, AtomicReference<CommitLogPosition> commitLogUpperBound) ++ { ++ } ++ ++ @Override ++ public boolean shouldSwitch(FlushReason reason) ++ { ++ return false; ++ } ++ ++ @Override ++ public long put(PartitionUpdate update, UpdateTransaction indexer, Group opGroup) ++ { ++ return 0; ++ } ++ ++ @Override ++ public void performSnapshot(String snapshotName) ++ { ++ } ++ ++ @Override ++ public long partitionCount() ++ { ++ return 0; ++ } ++ ++ @Override ++ public void metadataUpdated() ++ { ++ } ++ ++ @Override ++ public boolean mayContainDataBefore(CommitLogPosition position) ++ { ++ return false; ++ } ++ ++ @Override ++ public void markExtraOnHeapUsed(long additionalSpace, Group opGroup) ++ { ++ } ++ ++ @Override ++ public void markExtraOffHeapUsed(long additionalSpace, Group opGroup) ++ { ++ } ++ ++ @Override ++ public void localRangesUpdated() ++ { ++ } ++ ++ @Override ++ public boolean isClean() ++ { ++ return false; ++ } ++ ++ @Override ++ public long getLiveDataSize() ++ { ++ return 0; ++ } ++ ++ @Override ++ public FlushablePartitionSet<?> getFlushSet(PartitionPosition from, PartitionPosition to) ++ { ++ // TODO Auto-generated method stub ++ return null; ++ } ++ ++ @Override ++ public LastCommitLogPosition getFinalCommitLogUpperBound() ++ { ++ return null; ++ } ++ ++ @Override ++ public CommitLogPosition getCommitLogLowerBound() ++ { ++ return null; ++ } ++ ++ @Override ++ public CommitLogPosition getApproximateCommitLogLowerBound() ++ { ++ return null; ++ } ++ ++ @Override ++ public void discard() ++ { ++ } ++ ++ @Override ++ public void addMemoryUsageTo(MemoryUsage usage) ++ { ++ } ++ ++ @Override ++ public boolean accepts(Group opGroup, CommitLogPosition commitLogPosition) ++ { ++ return false; ++ } ++ }; ++ } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org