This is an automated email from the ASF dual-hosted git repository.

bereng pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 438346aaa6aa8b57ae6ee31fd6ab18e6369292f3
Merge: a0c6191238 a83de9bcd2
Author: Bereng <berenguerbl...@gmail.com>
AuthorDate: Thu Jan 12 08:19:22 2023 +0100

    Merge branch 'cassandra-4.0' into cassandra-4.1

 .../cassandra/db/SinglePartitionReadCommand.java   |   3 +-
 .../db/compaction/CompactionController.java        |  14 +-
 .../cassandra/db/memtable/AbstractMemtable.java    |  19 ++-
 .../org/apache/cassandra/db/memtable/Memtable.java |   2 +
 .../db/memtable/ShardedSkipListMemtable.java       |   9 +-
 .../apache/cassandra/db/ColumnFamilyStoreTest.java | 152 +++++++++++++++++++++
 6 files changed, 192 insertions(+), 7 deletions(-)

diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 64136b6099,8ac26e8513..963b9fee1c
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@@ -661,19 -605,20 +661,20 @@@ public class SinglePartitionReadComman
          InputCollector<UnfilteredRowIterator> inputCollector = 
iteratorsForPartition(view, controller);
          try
          {
 +            SSTableReadMetricsCollector metricsCollector = new 
SSTableReadMetricsCollector();
 +
              for (Memtable memtable : view.memtables)
              {
 -                Partition partition = memtable.getPartition(partitionKey());
 -                if (partition == null)
 +                @SuppressWarnings("resource") // 'iter' is added to iterators 
which is closed on exception, or through the closing of the final merged 
iterator
 +                UnfilteredRowIterator iter = 
memtable.rowIterator(partitionKey(), filter.getSlices(metadata()), 
columnFilter(), filter.isReversed(), metricsCollector);
 +                if (iter == null)
                      continue;
  
-                 minTimestamp = Math.min(minTimestamp, 
memtable.getMinTimestamp());
+                 if (memtable.getMinTimestamp() != Memtable.NO_MIN_TIMESTAMP)
+                     minTimestamp = Math.min(minTimestamp, 
memtable.getMinTimestamp());
  
 -                @SuppressWarnings("resource") // 'iter' is added to iterators 
which is closed on exception, or through the closing of the final merged 
iterator
 -                UnfilteredRowIterator iter = 
filter.getUnfilteredRowIterator(columnFilter(), partition);
 -
                  // Memtable data is always considered unrepaired
 -                
controller.updateMinOldestUnrepairedTombstone(partition.stats().minLocalDeletionTime);
 +                
controller.updateMinOldestUnrepairedTombstone(memtable.getMinLocalDeletionTime());
                  
inputCollector.addMemtableIterator(RTBoundValidator.validate(iter, 
RTBoundValidator.Stage.MEMTABLE, false));
  
                  mostRecentPartitionTombstone = 
Math.max(mostRecentPartitionTombstone,
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 26dcdd39a6,bb2094f931..0e79fc1b59
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -263,10 -270,14 +266,13 @@@ public class CompactionController exten
  
          for (Memtable memtable : memtables)
          {
-             if (memtable.rowIterator(key) != null)
+             if (memtable.getMinTimestamp() != Memtable.NO_MIN_TIMESTAMP)
              {
-                 minTimestampSeen = Math.min(minTimestampSeen, 
memtable.getMinTimestamp());
-                 hasTimestamp = true;
 -                Partition partition = memtable.getPartition(key);
 -                if (partition != null)
++                if (memtable.rowIterator(key) != null)
+                 {
 -                    minTimestampSeen = Math.min(minTimestampSeen, 
partition.stats().minTimestamp);
++                    minTimestampSeen = Math.min(minTimestampSeen, 
memtable.getMinTimestamp());
+                     hasTimestamp = true;
+                 }
              }
          }
  
diff --cc src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java
index 0ac7482e4a,0000000000..ca6dbf6577
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java
+++ b/src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java
@@@ -1,221 -1,0 +1,238 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db.memtable;
 +
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.concurrent.ConcurrentSkipListSet;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.atomic.AtomicReference;
 +
++import com.google.common.annotations.VisibleForTesting;
++
 +import org.apache.cassandra.db.RegularAndStaticColumns;
 +import org.apache.cassandra.db.commitlog.CommitLogPosition;
 +import org.apache.cassandra.db.partitions.Partition;
 +import org.apache.cassandra.db.rows.EncodingStats;
 +import org.apache.cassandra.schema.ColumnMetadata;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.schema.TableMetadataRef;
 +
 +public abstract class AbstractMemtable implements Memtable
 +{
 +    protected final AtomicLong currentOperations = new AtomicLong(0);
 +    protected final ColumnsCollector columnsCollector;
 +    protected final StatsCollector statsCollector = new StatsCollector();
 +    // The smallest timestamp for all partitions stored in this memtable
 +    protected AtomicLong minTimestamp = new AtomicLong(Long.MAX_VALUE);
 +    // The smallest local deletion time for all partitions in this memtable
 +    protected AtomicInteger minLocalDeletionTime = new 
AtomicInteger(Integer.MAX_VALUE);
 +    // Note: statsCollector has corresponding statistics to the two above, 
but starts with an epoch value which is not
 +    // correct for their usage.
 +
 +    protected TableMetadataRef metadata;
 +
 +    public AbstractMemtable(TableMetadataRef metadataRef)
 +    {
 +        this.metadata = metadataRef;
 +        this.columnsCollector = new 
ColumnsCollector(metadata.get().regularAndStaticColumns());
 +    }
 +
++    @VisibleForTesting
++    public AbstractMemtable(TableMetadataRef metadataRef, long minTimestamp)
++    {
++        this.metadata = metadataRef;
++        this.columnsCollector = new 
ColumnsCollector(metadata.get().regularAndStaticColumns());
++        this.minTimestamp = new AtomicLong(minTimestamp);
++    }
++
 +    public TableMetadata metadata()
 +    {
 +        return metadata.get();
 +    }
 +
 +    public long operationCount()
 +    {
 +        return currentOperations.get();
 +    }
 +
++    /**
++     * Returns the minTS if one available, otherwise NO_MIN_TIMESTAMP.
++     *
++     * EncodingStats uses a synthetic epoch TS at 2015. We don't want to leak 
that (CASSANDRA-18118) so we return NO_MIN_TIMESTAMP instead.
++     *
++     * @return The minTS or NO_MIN_TIMESTAMP if none available
++     */
 +    public long getMinTimestamp()
 +    {
-         return minTimestamp.get();
++        return minTimestamp.get() != EncodingStats.NO_STATS.minTimestamp ? 
minTimestamp.get() : NO_MIN_TIMESTAMP;
 +    }
 +
 +    public int getMinLocalDeletionTime()
 +    {
 +        return minLocalDeletionTime.get();
 +    }
 +
 +    protected static void updateMin(AtomicLong minTracker, long newValue)
 +    {
 +        while (true)
 +        {
 +            long existing = minTracker.get();
 +            if (existing <= newValue)
 +                break;
 +            if (minTracker.compareAndSet(existing, newValue))
 +                break;
 +        }
 +    }
 +
 +    protected static void updateMin(AtomicInteger minTracker, int newValue)
 +    {
 +        while (true)
 +        {
 +            int existing = minTracker.get();
 +            if (existing <= newValue)
 +                break;
 +            if (minTracker.compareAndSet(existing, newValue))
 +                break;
 +        }
 +    }
 +
 +    RegularAndStaticColumns columns()
 +    {
 +        return columnsCollector.get();
 +    }
 +
 +    EncodingStats encodingStats()
 +    {
 +        return statsCollector.get();
 +    }
 +
 +    protected static class ColumnsCollector
 +    {
 +        private final HashMap<ColumnMetadata, AtomicBoolean> predefined = new 
HashMap<>();
 +        private final ConcurrentSkipListSet<ColumnMetadata> extra = new 
ConcurrentSkipListSet<>();
 +
 +        ColumnsCollector(RegularAndStaticColumns columns)
 +        {
 +            for (ColumnMetadata def : columns.statics)
 +                predefined.put(def, new AtomicBoolean());
 +            for (ColumnMetadata def : columns.regulars)
 +                predefined.put(def, new AtomicBoolean());
 +        }
 +
 +        public void update(RegularAndStaticColumns columns)
 +        {
 +            for (ColumnMetadata s : columns.statics)
 +                update(s);
 +            for (ColumnMetadata r : columns.regulars)
 +                update(r);
 +        }
 +
 +        public void update(ColumnsCollector other)
 +        {
 +            for (Map.Entry<ColumnMetadata, AtomicBoolean> v : 
other.predefined.entrySet())
 +                if (v.getValue().get())
 +                    update(v.getKey());
 +
 +            extra.addAll(other.extra);
 +        }
 +
 +        private void update(ColumnMetadata definition)
 +        {
 +            AtomicBoolean present = predefined.get(definition);
 +            if (present != null)
 +            {
 +                if (!present.get())
 +                    present.set(true);
 +            }
 +            else
 +            {
 +                extra.add(definition);
 +            }
 +        }
 +
 +        /**
 +         * Get the current state of the columns set.
 +         *
 +         * Note: If this is executed while mutations are still being 
performed on the table (e.g. to prepare
 +         * an sstable for streaming when 
Memtable.Factory.streamFromMemtable() is true), the resulting view may be
 +         * in a somewhat inconsistent state (it may include partial updates, 
as well as miss updates older than
 +         * ones it does include).
 +         */
 +        public RegularAndStaticColumns get()
 +        {
 +            RegularAndStaticColumns.Builder builder = 
RegularAndStaticColumns.builder();
 +            for (Map.Entry<ColumnMetadata, AtomicBoolean> e : 
predefined.entrySet())
 +                if (e.getValue().get())
 +                    builder.add(e.getKey());
 +            return builder.addAll(extra).build();
 +        }
 +    }
 +
 +    protected static class StatsCollector
 +    {
 +        private final AtomicReference<EncodingStats> stats = new 
AtomicReference<>(EncodingStats.NO_STATS);
 +
 +        public void update(EncodingStats newStats)
 +        {
 +            while (true)
 +            {
 +                EncodingStats current = stats.get();
 +                EncodingStats updated = current.mergeWith(newStats);
 +                if (stats.compareAndSet(current, updated))
 +                    return;
 +            }
 +        }
 +
 +        public EncodingStats get()
 +        {
 +            return stats.get();
 +        }
 +    }
 +
 +    protected abstract class AbstractFlushablePartitionSet<P extends 
Partition> implements FlushablePartitionSet<P>
 +    {
 +        public long dataSize()
 +        {
 +            return getLiveDataSize();
 +        }
 +
 +        public CommitLogPosition commitLogLowerBound()
 +        {
 +            return AbstractMemtable.this.getCommitLogLowerBound();
 +        }
 +
 +        public LastCommitLogPosition commitLogUpperBound()
 +        {
 +            return AbstractMemtable.this.getFinalCommitLogUpperBound();
 +        }
 +
 +        public EncodingStats encodingStats()
 +        {
 +            return AbstractMemtable.this.encodingStats();
 +        }
 +
 +        public RegularAndStaticColumns columns()
 +        {
 +            return AbstractMemtable.this.columns();
 +        }
 +    }
 +}
diff --cc src/java/org/apache/cassandra/db/memtable/Memtable.java
index 8db28533f8,0000000000..3a18a8c7d7
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/memtable/Memtable.java
+++ b/src/java/org/apache/cassandra/db/memtable/Memtable.java
@@@ -1,431 -1,0 +1,433 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db.memtable;
 +
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +import javax.annotation.concurrent.NotThreadSafe;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.PartitionPosition;
 +import org.apache.cassandra.db.RegularAndStaticColumns;
 +import org.apache.cassandra.db.commitlog.CommitLogPosition;
 +import org.apache.cassandra.db.partitions.Partition;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.EncodingStats;
 +import org.apache.cassandra.db.rows.UnfilteredSource;
 +import org.apache.cassandra.index.transactions.UpdateTransaction;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.metrics.TableMetrics;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.schema.TableMetadataRef;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.concurrent.Future;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +
 +/**
 + * Memtable interface. This defines the operations the ColumnFamilyStore can 
perform with memtables.
 + * They are of several types:
 + * - construction factory interface
 + * - write and read operations: put, rowIterator and partitionIterator
 + * - statistics and features, including partition counts, data size, encoding 
stats, written columns
 + * - memory usage tracking, including methods of retrieval and of adding 
extra allocated space (used non-CFS secondary
 + *   indexes)
 + * - flush functionality, preparing the set of partitions to flush for given 
ranges
 + * - lifecycle management, i.e. operations that prepare and execute switch to 
a different memtable, together
 + *   with ways of tracking the affected commit log spans
 + *
 + * See Memtable_API.md for details on implementing and using alternative 
memtable implementations.
 + */
 +public interface Memtable extends Comparable<Memtable>, UnfilteredSource
 +{
++    public static final long NO_MIN_TIMESTAMP = -1;
++
 +    // Construction
 +
 +    /**
 +     * Factory interface for constructing memtables, and querying write 
durability features.
 +     *
 +     * The factory is chosen using the MemtableParams class (passed as 
argument to
 +     * {@code CREATE TABLE ... WITH memtable = '<configuration_name>'} where 
the configuration definition is a map given
 +     * under {@code memtable_configurations} in cassandra.yaml). To make that 
possible, implementations must provide
 +     * either a static {@code FACTORY} field (if they accept no further 
option) or a static
 +     * {@code factory(Map<String, String>)} method. In the latter case, the 
method should avoid creating
 +     * multiple instances of the factory for the same parameters, or 
factories should at least implement hashCode and
 +     * equals.
 +     */
 +    interface Factory
 +    {
 +        /**
 +         * Create a memtable.
 +         *
 +         * @param commitLogLowerBound A commit log lower bound for the new 
memtable. This will be equal to the previous
 +         *                            memtable's upper bound and defines the 
span of positions that any flushed sstable
 +         *                            will cover.
 +         * @param metadaRef Pointer to the up-to-date table metadata.
 +         * @param owner Owning objects that will receive flush requests 
triggered by the memtable (e.g. on expiration).
 +         */
 +        Memtable create(AtomicReference<CommitLogPosition> 
commitLogLowerBound, TableMetadataRef metadaRef, Owner owner);
 +
 +        /**
 +         * If the memtable can achieve write durability directly (i.e. using 
some feature other than the commitlog, e.g.
 +         * persistent memory), it can return true here, in which case the 
commit log will not store mutations in this
 +         * table.
 +         * Note that doing so will prevent point-in-time restores and changed 
data capture, thus a durable memtable must
 +         * allow the option of turning commit log writing on even if it does 
not need it.
 +         */
 +        default boolean writesShouldSkipCommitLog()
 +        {
 +            return false;
 +        }
 +
 +        /**
 +         * This should be true if the memtable can achieve write durability 
for crash recovery directly (i.e. using some
 +         * feature other than the commitlog, e.g. persistent memory).
 +         * Setting this flag to true means that the commitlog should not 
replay mutations for this table on restart,
 +         * and that it should not try to preserve segments that contain 
relevant data.
 +         * Unless writesShouldSkipCommitLog() is also true, writes will be 
recorded in the commit log as they may be
 +         * needed for changed data capture or point-in-time restore.
 +         */
 +        default boolean writesAreDurable()
 +        {
 +            return false;
 +        }
 +
 +        /**
 +         * Normally we can receive streamed sstables directly, skipping the 
memtable stage (zero-copy-streaming). When
 +         * the memtable is the primary data store (e.g. persistent 
memtables), it will usually prefer to receive the
 +         * data instead.
 +         *
 +         * If this returns true, all streamed sstables's content will be read 
and replayed as mutations, disabling
 +         * zero-copy streaming.
 +         */
 +        default boolean streamToMemtable()
 +        {
 +            return false;
 +        }
 +
 +        /**
 +         * When we need to stream data, we usually flush and stream the 
resulting sstables. This will not work correctly
 +         * if the memtable does not want to flush for streaming (e.g. 
persistent memtables acting as primary data
 +         * store), because data (not just recent) will be missing from the 
streamed view. Such memtables must present
 +         * their data separately for streaming.
 +         * In other words if the memtable returns false on 
shouldSwitch(STREAMING/REPAIR), its factory must return true
 +         * here.
 +         *
 +         * If this flag returns true, streaming will write the relevant 
content that resides in the memtable to
 +         * temporary sstables, stream these sstables and then delete them.
 +         */
 +        default boolean streamFromMemtable()
 +        {
 +            return false;
 +        }
 +
 +        /**
 +         * Override this method to include implementation-specific memtable 
metrics in the table metrics.
 +         *
 +         * Memtable metrics lifecycle matches table lifecycle. It is the 
table that owns the metrics and
 +         * decides when to release them.
 +         */
 +        default TableMetrics.ReleasableMetric 
createMemtableMetrics(TableMetadataRef metadataRef)
 +        {
 +            return null;
 +        }
 +    }
 +
 +    /**
 +     * Interface for providing signals back and requesting information from 
the owner, i.e. the object that controls the
 +     * memtable. This is usually the ColumnFamilyStore; the interface is used 
to limit the dependency of memtables on
 +     * the details of its implementation.
 +     */
 +    interface Owner
 +    {
 +        /** Signal to the owner that a flush is required (e.g. in response to 
hitting space limits) */
 +        Future<CommitLogPosition> signalFlushRequired(Memtable memtable, 
ColumnFamilyStore.FlushReason reason);
 +
 +        /** Get the current memtable for this owner. Used to avoid capturing 
memtable in scheduled flush tasks. */
 +        Memtable getCurrentMemtable();
 +
 +        /**
 +         * Collect the index memtables flushed together with this. Used to 
accurately calculate memory that would be
 +         * freed by a flush.
 +         */
 +        Iterable<Memtable> getIndexMemtables();
 +
 +        /**
 +         * Construct a list of boundaries that split the locally-owned ranges 
into the given number of shards,
 +         * splitting the owned space evenly. It is up to the memtable to use 
this information.
 +         * Any changes in the ring structure (e.g. added or removed nodes) 
will invalidate the splits; in such a case
 +         * the memtable will be sent a {@link 
#shouldSwitch}(OWNED_RANGES_CHANGE) and, should that return false, a
 +         * {@link #localRangesUpdated()} call.
 +         */
 +        ShardBoundaries localRangeSplits(int shardCount);
 +    }
 +
 +    // Main write and read operations
 +
 +    /**
 +     * Put new data in the memtable. This operation may block until enough 
memory is available in the memory pool.
 +     *
 +     * @param update the partition update, may be a new partition or an 
update to an existing one
 +     * @param indexer receives information about the update's effect
 +     * @param opGroup write operation group, used to permit the operation to 
complete if it is needed to complete a
 +     *                flush to free space.
 +     *
 +     * @return the smallest timestamp delta between corresponding rows from 
existing and update. A
 +     * timestamp delta being computed as the difference between the cells and 
DeletionTimes from any existing partition
 +     * and those in {@code update}. See CASSANDRA-7979.
 +     */
 +    long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group 
opGroup);
 +
 +    // Read operations are provided by the UnfilteredSource interface.
 +
 +    // Statistics
 +
 +    /** Number of partitions stored in the memtable */
 +    long partitionCount();
 +
 +    /** Size of the data not accounting for any metadata / mapping overheads 
*/
 +    long getLiveDataSize();
 +
 +    /**
 +     * Number of "operations" (in the sense defined in {@link 
PartitionUpdate#operationCount()}) the memtable has
 +     * executed.
 +     */
 +    long operationCount();
 +
 +    /**
 +     * The table's definition metadata.
 +     *
 +     * Note that this tracks the current state of the table and is not 
necessarily the same as what was used to create
 +     * the memtable.
 +     */
 +    TableMetadata metadata();
 +
 +
 +    // Memory usage tracking
 +
 +    /**
 +     * Add this memtable's used memory to the given usage object. This can be 
used to retrieve a single memtable's usage
 +     * as well as to combine the ones of related sstables (e.g. a table and 
its table-based secondary indexes).
 +     */
 +    void addMemoryUsageTo(MemoryUsage usage);
 +
 +
 +    /**
 +     * Creates a holder for memory usage collection.
 +     *
 +     * This is used to track on- and off-heap memory, as well as the ratio to 
the total permitted memtable memory.
 +     */
 +    static MemoryUsage newMemoryUsage()
 +    {
 +        return new MemoryUsage();
 +    }
 +
 +    /**
 +     * Shorthand for the getting a given table's memory usage.
 +     * Implemented as a static to prevent implementations altering 
expectations by e.g. returning a cached object.
 +     */
 +    static MemoryUsage getMemoryUsage(Memtable memtable)
 +    {
 +        MemoryUsage usage = newMemoryUsage();
 +        memtable.addMemoryUsageTo(usage);
 +        return usage;
 +    }
 +
 +    @NotThreadSafe
 +    class MemoryUsage
 +    {
 +        /** On-heap memory used in bytes */
 +        public long ownsOnHeap = 0;
 +        /** Off-heap memory used in bytes */
 +        public long ownsOffHeap = 0;
 +        /** On-heap memory as ratio to permitted memtable space */
 +        public float ownershipRatioOnHeap = 0.0f;
 +        /** Off-heap memory as ratio to permitted memtable space */
 +        public float ownershipRatioOffHeap = 0.0f;
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("%s (%.0f%%) on-heap, %s (%.0f%%) off-heap",
 +                                 FBUtilities.prettyPrintMemory(ownsOnHeap),
 +                                 ownershipRatioOnHeap * 100,
 +                                 FBUtilities.prettyPrintMemory(ownsOffHeap),
 +                                 ownershipRatioOffHeap * 100);
 +        }
 +    }
 +
 +    /**
 +     * Adjust the used on-heap space by the given size (e.g. to reflect 
memory used by a non-table-based index).
 +     * This operation may block until enough memory is available in the 
memory pool.
 +     *
 +     * @param additionalSpace the number of allocated bytes
 +     * @param opGroup write operation group, used to permit the operation to 
complete if it is needed to complete a
 +     *                flush to free space.
 +     */
 +    void markExtraOnHeapUsed(long additionalSpace, OpOrder.Group opGroup);
 +
 +    /**
 +     * Adjust the used off-heap space by the given size (e.g. to reflect 
memory used by a non-table-based index).
 +     * This operation may block until enough memory is available in the 
memory pool.
 +     *
 +     * @param additionalSpace the number of allocated bytes
 +     * @param opGroup write operation group, used to permit the operation to 
complete if it is needed to complete a
 +     *                flush to free space.
 +     */
 +    void markExtraOffHeapUsed(long additionalSpace, OpOrder.Group opGroup);
 +
 +
 +    // Flushing
 +
 +    /**
 +     * Get the collection of data between the given partition boundaries in a 
form suitable for flushing.
 +     */
 +    FlushablePartitionSet<?> getFlushSet(PartitionPosition from, 
PartitionPosition to);
 +
 +    /**
 +     * A collection of partitions for flushing plus some information required 
for writing an sstable.
 +     *
 +     * Note that the listed entries must conform with the specified metadata. 
In particular, if the memtable is still
 +     * being written to, care must be taken to not list newer items as they 
may violate the bounds collected by the
 +     * encoding stats or refer to columns that don't exist in the collected 
columns set.
 +     */
 +    interface FlushablePartitionSet<P extends Partition> extends Iterable<P>, 
SSTableWriter.SSTableSizeParameters
 +    {
 +        Memtable memtable();
 +
 +        PartitionPosition from();
 +        PartitionPosition to();
 +
 +        /** The commit log position at the time that this memtable was 
created */
 +        CommitLogPosition commitLogLowerBound();
 +        /** The commit log position at the time that this memtable was 
switched out */
 +        CommitLogPosition commitLogUpperBound();
 +
 +        /** The set of all columns that have been written */
 +        RegularAndStaticColumns columns();
 +        /** Statistics required for writing an sstable efficiently */
 +        EncodingStats encodingStats();
 +
 +        default TableMetadata metadata()
 +        {
 +            return memtable().metadata();
 +        }
 +
 +        default boolean isEmpty()
 +        {
 +            return partitionCount() > 0;
 +        }
 +    }
 +
 +
 +    // Lifecycle management
 +
 +    /**
 +     * Called to tell the memtable that it is being switched out and will be 
flushed (or dropped) and discarded.
 +     * Will be followed by a {@link #getFlushSet} call (if the table is not 
truncated or dropped), and a
 +     * {@link #discard}.
 +     *
 +     * @param writeBarrier The barrier that will signal that all writes to 
this memtable have completed. That is, the
 +     *                     point after which writes cannot be accepted by 
this memtable (it is permitted for writes
 +     *                     before this barrier to go into the next; see 
{@link #accepts}).
 +     * @param commitLogUpperBound The upper commit log position for this 
memtable. The value may be modified after this
 +     *                            call and will match the next memtable's 
lower commit log bound.
 +     */
 +    void switchOut(OpOrder.Barrier writeBarrier, 
AtomicReference<CommitLogPosition> commitLogUpperBound);
 +
 +    /**
 +     * This memtable is no longer in use or required for outstanding flushes 
or operations.
 +     * All held memory must be released.
 +     */
 +    void discard();
 +
 +    /**
 +     * Decide if this memtable should take a write with the given parameters, 
or if the write should go to the next
 +     * memtable. This enforces that no writes after the barrier set by {@link 
#switchOut} can be accepted, and
 +     * is also used to define a shared commit log bound as the upper for this 
memtable and lower for the next.
 +     */
 +    boolean accepts(OpOrder.Group opGroup, CommitLogPosition 
commitLogPosition);
 +
 +    /** Approximate commit log lower bound, <= getCommitLogLowerBound, used 
as a time stamp for ordering */
 +    CommitLogPosition getApproximateCommitLogLowerBound();
 +
 +    /** The commit log position at the time that this memtable was created */
 +    CommitLogPosition getCommitLogLowerBound();
 +
 +    /** The commit log position at the time that this memtable was switched 
out */
 +    LastCommitLogPosition getFinalCommitLogUpperBound();
 +
 +    /** True if the memtable can contain any data that was written before the 
given commit log position */
 +    boolean mayContainDataBefore(CommitLogPosition position);
 +
 +    /** True if the memtable contains no data */
 +    boolean isClean();
 +
 +    /** Order memtables by time as reflected in the commit log position at 
time of construction */
 +    default int compareTo(Memtable that)
 +    {
 +        return 
this.getApproximateCommitLogLowerBound().compareTo(that.getApproximateCommitLogLowerBound());
 +    }
 +
 +    /**
 +     * Decides whether the memtable should be switched/flushed for the passed 
reason.
 +     * Normally this will return true, but e.g. persistent memtables may 
choose not to flush. Returning false will
 +     * trigger further action for some reasons:
 +     * - SCHEMA_CHANGE will be followed by metadataUpdated().
 +     * - OWNED_RANGES_CHANGE will be followed by localRangesUpdated().
 +     * - SNAPSHOT will be followed by performSnapshot().
 +     * - STREAMING/REPAIR will be followed by creating a FlushSet for the 
streamed/repaired ranges. This data will be
 +     *   used to create sstables, which will be streamed and then deleted.
 +     * This will not be called to perform truncation or drop (in that case 
the memtable is unconditionally dropped),
 +     * but a flush may nevertheless be requested in that case to prepare a 
snapshot.
 +     */
 +    boolean shouldSwitch(ColumnFamilyStore.FlushReason reason);
 +
 +    /**
 +     * Called when the table's metadata is updated. The memtable's metadata 
reference now points to the new version.
 +     * This will not be called if {@link #shouldSwitch)(SCHEMA_CHANGE) 
returns true, as the memtable will be swapped out
 +     * instead.
 +     */
 +    void metadataUpdated();
 +
 +    /**
 +     * Called when the known ranges have been updated and 
owner.localRangeSplits() may return different values.
 +     * This will not be called if {@link #shouldSwitch)(OWNED_RANGES_CHANGE) 
returns true, as the memtable will be
 +     * swapped out instead.
 +     */
 +    void localRangesUpdated();
 +
 +    /**
 +     * If the memtable needs to do some special action for snapshots (e.g. 
because it is persistent and does not want
 +     * to flush), it should return false on the above with reason SNAPSHOT 
and implement this method.
 +     */
 +    void performSnapshot(String snapshotName);
 +
 +    /**
 +     * Special commit log position marker used in the upper bound marker 
setting process
 +     * (see {@link 
org.apache.cassandra.db.ColumnFamilyStore#setCommitLogUpperBound} and {@link 
AbstractMemtable#accepts})
 +     */
 +    public static final class LastCommitLogPosition extends CommitLogPosition
 +    {
 +        public LastCommitLogPosition(CommitLogPosition copy)
 +        {
 +            super(copy.segmentId, copy.position);
 +        }
 +    }
 +}
diff --cc src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java
index e1a192ed70,0000000000..51cd5f2a14
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java
+++ b/src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java
@@@ -1,562 -1,0 +1,569 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.memtable;
 +
 +import java.util.Iterator;
 +import java.util.Map;
 +import java.util.Objects;
 +import java.util.concurrent.ConcurrentNavigableMap;
 +import java.util.concurrent.ConcurrentSkipListMap;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.Iterators;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.db.DataRange;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.PartitionPosition;
 +import org.apache.cassandra.db.RegularAndStaticColumns;
 +import org.apache.cassandra.db.Slices;
 +import org.apache.cassandra.db.commitlog.CommitLogPosition;
 +import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
 +import org.apache.cassandra.db.partitions.AtomicBTreePartition;
 +import org.apache.cassandra.db.partitions.Partition;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.rows.EncodingStats;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.dht.Bounds;
 +import org.apache.cassandra.dht.IncludingExcludingBounds;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.index.transactions.UpdateTransaction;
 +import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.schema.TableMetadataRef;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.memory.Cloner;
 +import org.apache.cassandra.utils.memory.MemtableAllocator;
 +import org.github.jamm.Unmetered;
 +
 +/**
 + * A proof-of-concept sharded memtable implementation. This implementation 
splits the partition skip-list into several
 + * independent skip-lists each covering a roughly equal part of the token 
space served by this node. This reduces
 + * congestion of the skip-list from concurrent writes and can lead to 
improved write throughput.
 + *
 + * The implementation takes two parameters:
 + * - shards: the number of shards to split into.
 + * - serialize_writes: if false, each shard may serve multiple writes in 
parallel; if true, writes to each shard are
 + *   synchronized.
 + *
 + * Also see Memtable_API.md.
 + */
 +public class ShardedSkipListMemtable extends AbstractAllocatorMemtable
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(ShardedSkipListMemtable.class);
 +
 +    public static final String SHARDS_OPTION = "shards";
 +    public static final String LOCKING_OPTION = "serialize_writes";
 +
 +    // The boundaries for the keyspace as they were calculated when the 
memtable is created.
 +    // The boundaries will be NONE for system keyspaces or if StorageService 
is not yet initialized.
 +    // The fact this is fixed for the duration of the memtable lifetime, 
guarantees we'll always pick the same shard
 +    // for a given key, even if we race with the StorageService 
initialization or with topology changes.
 +    @Unmetered
 +    final ShardBoundaries boundaries;
 +
 +    /**
 +     * Core-specific memtable regions. All writes must go through the 
specific core. The data structures used
 +     * are concurrent-read safe, thus reads can be carried out from any 
thread.
 +     */
 +    final MemtableShard[] shards;
 +
 +    @VisibleForTesting
 +    public static final String SHARD_COUNT_PROPERTY = 
"cassandra.memtable.shard.count";
 +
 +    // default shard count, used when a specific number of shards is not 
specified in the parameters
 +    private static final int SHARD_COUNT = 
Integer.getInteger(SHARD_COUNT_PROPERTY, FBUtilities.getAvailableProcessors());
 +
 +    private final Factory factory;
 +
 +    // only to be used by init(), to setup the very first memtable for the cfs
 +    ShardedSkipListMemtable(AtomicReference<CommitLogPosition> 
commitLogLowerBound,
 +                            TableMetadataRef metadataRef,
 +                            Owner owner,
 +                            Integer shardCountOption,
 +                            Factory factory)
 +    {
 +        super(commitLogLowerBound, metadataRef, owner);
 +        int shardCount = shardCountOption != null ? shardCountOption : 
SHARD_COUNT;
 +        this.boundaries = owner.localRangeSplits(shardCount);
 +        this.shards = generatePartitionShards(boundaries.shardCount(), 
allocator, metadataRef);
 +        this.factory = factory;
 +    }
 +
 +    private static MemtableShard[] generatePartitionShards(int splits,
 +                                                           MemtableAllocator 
allocator,
 +                                                           TableMetadataRef 
metadata)
 +    {
 +        MemtableShard[] partitionMapContainer = new MemtableShard[splits];
 +        for (int i = 0; i < splits; i++)
 +            partitionMapContainer[i] = new MemtableShard(metadata, allocator);
 +
 +        return partitionMapContainer;
 +    }
 +
 +    public boolean isClean()
 +    {
 +        for (MemtableShard shard : shards)
 +            if (!shard.isEmpty())
 +                return false;
 +        return true;
 +    }
 +
 +    @Override
 +    protected Memtable.Factory factory()
 +    {
 +        return factory;
 +    }
 +
 +    /**
 +     * Should only be called by ColumnFamilyStore.apply via Keyspace.apply, 
which supplies the appropriate
 +     * OpOrdering.
 +     *
 +     * commitLogSegmentPosition should only be null if this is a secondary 
index, in which case it is *expected* to be null
 +     */
 +    public long put(PartitionUpdate update, UpdateTransaction indexer, 
OpOrder.Group opGroup)
 +    {
 +        DecoratedKey key = update.partitionKey();
 +        MemtableShard shard = shards[boundaries.getShardForKey(key)];
 +        return shard.put(key, update, indexer, opGroup);
 +    }
 +
 +    /**
 +     * Technically we should scatter gather on all the core threads because 
the size in following calls are not
 +     * using volatile variables, but for metrics purpose this should be good 
enough.
 +     */
 +    @Override
 +    public long getLiveDataSize()
 +    {
 +        long total = 0L;
 +        for (MemtableShard shard : shards)
 +            total += shard.liveDataSize();
 +        return total;
 +    }
 +
 +    @Override
 +    public long operationCount()
 +    {
 +        long total = 0L;
 +        for (MemtableShard shard : shards)
 +            total += shard.currentOperations();
 +        return total;
 +    }
 +
 +    @Override
 +    public long partitionCount()
 +    {
 +        int total = 0;
 +        for (MemtableShard shard : shards)
 +            total += shard.size();
 +        return total;
 +    }
 +
++    /**
++     * Returns the minTS if one available, otherwise NO_MIN_TIMESTAMP.
++     *
++     * EncodingStats uses a synthetic epoch TS at 2015. We don't want to leak 
that (CASSANDRA-18118) so we return NO_MIN_TIMESTAMP instead.
++     *
++     * @return The minTS or NO_MIN_TIMESTAMP if none available
++     */
 +    @Override
 +    public long getMinTimestamp()
 +    {
 +        long min = Long.MAX_VALUE;
 +        for (MemtableShard shard : shards)
 +            min =  Long.min(min, shard.minTimestamp());
-         return min;
++        return min != EncodingStats.NO_STATS.minTimestamp ? min : 
NO_MIN_TIMESTAMP;
 +    }
 +
 +    @Override
 +    public int getMinLocalDeletionTime()
 +    {
 +        int min = Integer.MAX_VALUE;
 +        for (MemtableShard shard : shards)
 +            min = Integer.min(min, shard.minLocalDeletionTime());
 +        return min;
 +    }
 +
 +    @Override
 +    RegularAndStaticColumns columns()
 +    {
 +        for (MemtableShard shard : shards)
 +            columnsCollector.update(shard.columnsCollector);
 +        return columnsCollector.get();
 +    }
 +
 +    @Override
 +    EncodingStats encodingStats()
 +    {
 +        for (MemtableShard shard : shards)
 +            statsCollector.update(shard.statsCollector.get());
 +        return statsCollector.get();
 +    }
 +
 +    @Override
 +    public MemtableUnfilteredPartitionIterator partitionIterator(final 
ColumnFilter columnFilter,
 +                                                                 final 
DataRange dataRange,
 +                                                                 
SSTableReadsListener readsListener)
 +    {
 +        AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange();
 +
 +        PartitionPosition left = keyRange.left;
 +        PartitionPosition right = keyRange.right;
 +
 +        boolean isBound = keyRange instanceof Bounds;
 +        boolean includeStart = isBound || keyRange instanceof 
IncludingExcludingBounds;
 +        boolean includeStop = isBound || keyRange instanceof Range;
 +
 +        Iterator<AtomicBTreePartition> iterator = getPartitionIterator(left, 
includeStart, right, includeStop);
 +
 +        return new MemtableUnfilteredPartitionIterator(metadata(), iterator, 
columnFilter, dataRange);
 +        // readsListener is ignored as it only accepts sstable signals
 +    }
 +
 +    private Iterator<AtomicBTreePartition> 
getPartitionIterator(PartitionPosition left, boolean includeStart, 
PartitionPosition right, boolean includeStop)
 +    {
 +        int leftShard = left != null && !left.isMinimum() ? 
boundaries.getShardForKey(left) : 0;
 +        int rightShard = right != null && !right.isMinimum() ? 
boundaries.getShardForKey(right) : boundaries.shardCount() - 1;
 +        Iterator<AtomicBTreePartition> iterator;
 +        if (leftShard == rightShard)
 +            iterator = shards[leftShard].getPartitionsSubMap(left, 
includeStart, right, includeStop).values().iterator();
 +        else
 +        {
 +            Iterator<AtomicBTreePartition>[] iters = new Iterator[rightShard 
- leftShard + 1];
 +            int i = leftShard;
 +            iters[0] = shards[leftShard].getPartitionsSubMap(left, 
includeStart, null, true).values().iterator();
 +            for (++i; i < rightShard; ++i)
 +                iters[i - leftShard] = 
shards[i].partitions.values().iterator();
 +            iters[i - leftShard] = shards[i].getPartitionsSubMap(null, true, 
right, includeStop).values().iterator();
 +            iterator = Iterators.concat(iters);
 +        }
 +        return iterator;
 +    }
 +
 +    private Partition getPartition(DecoratedKey key)
 +    {
 +        int shardIndex = boundaries.getShardForKey(key);
 +        return shards[shardIndex].partitions.get(key);
 +    }
 +
 +    @Override
 +    public UnfilteredRowIterator rowIterator(DecoratedKey key, Slices slices, 
ColumnFilter selectedColumns, boolean reversed, SSTableReadsListener listener)
 +    {
 +        Partition p = getPartition(key);
 +        if (p == null)
 +            return null;
 +        else
 +            return p.unfilteredIterator(selectedColumns, slices, reversed);
 +    }
 +
 +    @Override
 +    public UnfilteredRowIterator rowIterator(DecoratedKey key)
 +    {
 +        Partition p = getPartition(key);
 +        return p != null ? p.unfilteredIterator() : null;
 +    }
 +
 +    public FlushablePartitionSet<AtomicBTreePartition> 
getFlushSet(PartitionPosition from, PartitionPosition to)
 +    {
 +        long keySize = 0;
 +        int keyCount = 0;
 +
 +        for (Iterator<AtomicBTreePartition> it = getPartitionIterator(from, 
true, to,false); it.hasNext();)
 +        {
 +            AtomicBTreePartition en = it.next();
 +            keySize += en.partitionKey().getKey().remaining();
 +            keyCount++;
 +        }
 +        long partitionKeySize = keySize;
 +        int partitionCount = keyCount;
 +        Iterator<AtomicBTreePartition> toFlush = getPartitionIterator(from, 
true, to,false);
 +
 +        return new AbstractFlushablePartitionSet<AtomicBTreePartition>()
 +        {
 +            public Memtable memtable()
 +            {
 +                return ShardedSkipListMemtable.this;
 +            }
 +
 +            public PartitionPosition from()
 +            {
 +                return from;
 +            }
 +
 +            public PartitionPosition to()
 +            {
 +                return to;
 +            }
 +
 +            public long partitionCount()
 +            {
 +                return partitionCount;
 +            }
 +
 +            public Iterator<AtomicBTreePartition> iterator()
 +            {
 +                return toFlush;
 +            }
 +
 +            public long partitionKeysSize()
 +            {
 +                return partitionKeySize;
 +            }
 +        };
 +    }
 +
 +    static class MemtableShard
 +    {
 +        // The following fields are volatile as we have to make sure that 
when we
 +        // collect results from all sub-ranges, the thread accessing the value
 +        // is guaranteed to see the changes to the values.
 +
 +        // The smallest timestamp for all partitions stored in this shard
 +        private final AtomicLong minTimestamp = new 
AtomicLong(Long.MAX_VALUE);
 +        private final AtomicInteger minLocalDeletionTime = new 
AtomicInteger(Integer.MAX_VALUE);
 +
 +        private final AtomicLong liveDataSize = new AtomicLong(0);
 +
 +        private final AtomicLong currentOperations = new AtomicLong(0);
 +
 +        // We index the memtable by PartitionPosition only for the purpose of 
being able
 +        // to select key range using Token.KeyBound. However put() ensures 
that we
 +        // actually only store DecoratedKey.
 +        private final ConcurrentNavigableMap<PartitionPosition, 
AtomicBTreePartition> partitions = new ConcurrentSkipListMap<>();
 +
 +        private final ColumnsCollector columnsCollector;
 +
 +        private final StatsCollector statsCollector;
 +
 +        @Unmetered  // total pool size should not be included in memtable's 
deep size
 +        private final MemtableAllocator allocator;
 +
 +        private final TableMetadataRef metadata;
 +
 +        @VisibleForTesting
 +        MemtableShard(TableMetadataRef metadata, MemtableAllocator allocator)
 +        {
 +            this.columnsCollector = new 
ColumnsCollector(metadata.get().regularAndStaticColumns());
 +            this.statsCollector = new StatsCollector();
 +            this.allocator = allocator;
 +            this.metadata = metadata;
 +        }
 +
 +        public long put(DecoratedKey key, PartitionUpdate update, 
UpdateTransaction indexer, OpOrder.Group opGroup)
 +        {
 +            Cloner cloner = allocator.cloner(opGroup);
 +            AtomicBTreePartition previous = partitions.get(key);
 +
 +            long initialSize = 0;
 +            if (previous == null)
 +            {
 +                final DecoratedKey cloneKey = cloner.clone(key);
 +                AtomicBTreePartition empty = new 
AtomicBTreePartition(metadata, cloneKey, allocator);
 +                // We'll add the columns later. This avoids wasting works if 
we get beaten in the putIfAbsent
 +                previous = partitions.putIfAbsent(cloneKey, empty);
 +                if (previous == null)
 +                {
 +                    previous = empty;
 +                    // allocate the row overhead after the fact; this saves 
over allocating and having to free after, but
 +                    // means we can overshoot our declared limit.
 +                    int overhead = (int) (cloneKey.getToken().getHeapSize() + 
SkipListMemtable.ROW_OVERHEAD_HEAP_SIZE);
 +                    allocator.onHeap().allocate(overhead, opGroup);
 +                    initialSize = 8;
 +                }
 +            }
 +
 +            long[] pair = previous.addAllWithSizeDelta(update, cloner, 
opGroup, indexer);
 +            updateMin(minTimestamp, update.stats().minTimestamp);
 +            updateMin(minLocalDeletionTime, 
update.stats().minLocalDeletionTime);
 +            liveDataSize.addAndGet(initialSize + pair[0]);
 +            columnsCollector.update(update.columns());
 +            statsCollector.update(update.stats());
 +            currentOperations.addAndGet(update.operationCount());
 +            return pair[1];
 +        }
 +
 +        private Map<PartitionPosition, AtomicBTreePartition> 
getPartitionsSubMap(PartitionPosition left,
 +                                                                              
   boolean includeLeft,
 +                                                                              
   PartitionPosition right,
 +                                                                              
   boolean includeRight)
 +        {
 +            if (left != null && left.isMinimum())
 +                left = null;
 +            if (right != null && right.isMinimum())
 +                right = null;
 +
 +            try
 +            {
 +                if (left == null)
 +                    return right == null ? partitions : 
partitions.headMap(right, includeRight);
 +                else
 +                    return right == null
 +                           ? partitions.tailMap(left, includeLeft)
 +                           : partitions.subMap(left, includeLeft, right, 
includeRight);
 +            }
 +            catch (IllegalArgumentException e)
 +            {
 +                logger.error("Invalid range requested {} - {}", left, right);
 +                throw e;
 +            }
 +        }
 +
 +        public boolean isEmpty()
 +        {
 +            return partitions.isEmpty();
 +        }
 +
 +        public int size()
 +        {
 +            return partitions.size();
 +        }
 +
 +        long minTimestamp()
 +        {
 +            return minTimestamp.get();
 +        }
 +
 +        long liveDataSize()
 +        {
 +            return liveDataSize.get();
 +        }
 +
 +        long currentOperations()
 +        {
 +            return currentOperations.get();
 +        }
 +
 +        public int minLocalDeletionTime()
 +        {
 +            return minLocalDeletionTime.get();
 +        }
 +    }
 +
 +    public static class MemtableUnfilteredPartitionIterator extends 
AbstractUnfilteredPartitionIterator implements UnfilteredPartitionIterator
 +    {
 +        private final TableMetadata metadata;
 +        private final Iterator<AtomicBTreePartition> iter;
 +        private final ColumnFilter columnFilter;
 +        private final DataRange dataRange;
 +
 +        public MemtableUnfilteredPartitionIterator(TableMetadata metadata, 
Iterator<AtomicBTreePartition> iterator, ColumnFilter columnFilter, DataRange 
dataRange)
 +        {
 +            this.metadata = metadata;
 +            this.iter = iterator;
 +            this.columnFilter = columnFilter;
 +            this.dataRange = dataRange;
 +        }
 +
 +        public TableMetadata metadata()
 +        {
 +            return metadata;
 +        }
 +
 +        public boolean hasNext()
 +        {
 +            return iter.hasNext();
 +        }
 +
 +        public UnfilteredRowIterator next()
 +        {
 +            AtomicBTreePartition entry = iter.next();
 +            DecoratedKey key = entry.partitionKey();
 +            ClusteringIndexFilter filter = 
dataRange.clusteringIndexFilter(key);
 +
 +            return filter.getUnfilteredRowIterator(columnFilter, entry);
 +        }
 +    }
 +
 +    static class Locking extends ShardedSkipListMemtable
 +    {
 +        Locking(AtomicReference<CommitLogPosition> commitLogLowerBound, 
TableMetadataRef metadataRef, Owner owner, Integer shardCountOption, Factory 
factory)
 +        {
 +            super(commitLogLowerBound, metadataRef, owner, shardCountOption, 
factory);
 +        }
 +
 +        /**
 +         * Should only be called by ColumnFamilyStore.apply via 
Keyspace.apply, which supplies the appropriate
 +         * OpOrdering.
 +         *
 +         * commitLogSegmentPosition should only be null if this is a 
secondary index, in which case it is *expected* to be null
 +         */
 +        public long put(PartitionUpdate update, UpdateTransaction indexer, 
OpOrder.Group opGroup)
 +        {
 +            DecoratedKey key = update.partitionKey();
 +            MemtableShard shard = shards[boundaries.getShardForKey(key)];
 +            synchronized (shard)
 +            {
 +                return shard.put(key, update, indexer, opGroup);
 +            }
 +        }
 +
 +    }
 +
 +    public static Factory factory(Map<String, String> optionsCopy)
 +    {
 +        String shardsString = optionsCopy.remove(SHARDS_OPTION);
 +        Integer shardCount = shardsString != null ? 
Integer.parseInt(shardsString) : null;
 +        boolean isLocking = 
Boolean.parseBoolean(optionsCopy.remove(LOCKING_OPTION));
 +        return new Factory(shardCount, isLocking);
 +    }
 +
 +    static class Factory implements Memtable.Factory
 +    {
 +        final Integer shardCount;
 +        final boolean isLocking;
 +
 +        Factory(Integer shardCount, boolean isLocking)
 +        {
 +            this.shardCount = shardCount;
 +            this.isLocking = isLocking;
 +        }
 +
 +        public Memtable create(AtomicReference<CommitLogPosition> 
commitLogLowerBound,
 +                               TableMetadataRef metadataRef,
 +                               Owner owner)
 +        {
 +            return isLocking
 +                   ? new Locking(commitLogLowerBound, metadataRef, owner, 
shardCount, this)
 +                   : new ShardedSkipListMemtable(commitLogLowerBound, 
metadataRef, owner, shardCount, this);
 +        }
 +
 +        public boolean equals(Object o)
 +        {
 +            if (this == o)
 +                return true;
 +            if (o == null || getClass() != o.getClass())
 +                return false;
 +            Factory factory = (Factory) o;
 +            return Objects.equals(shardCount, factory.shardCount);
 +        }
 +
 +        public int hashCode()
 +        {
 +            return Objects.hash(shardCount);
 +        }
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 96437db615,b1c6599b5b..4d871fce83
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@@ -22,39 -24,35 +22,51 @@@ import java.io.IOException
  import java.nio.ByteBuffer;
  import java.nio.file.Path;
  import java.nio.file.Paths;
 -import java.util.*;
 -
 +import java.util.Collection;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.AtomicReference;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.Iterators;
  import org.junit.Assert;
  import org.junit.Before;
 -import org.junit.Assume;
  import org.junit.BeforeClass;
  import org.junit.Test;
  
 -import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 -import org.apache.cassandra.utils.Pair;
 -import org.json.simple.JSONArray;
 -import org.json.simple.JSONObject;
 -import org.json.simple.parser.JSONParser;
 -
 -import static org.assertj.core.api.Assertions.assertThat;
 -import static org.junit.Assert.assertEquals;
 -import static org.junit.Assert.assertTrue;
 -
 -import com.google.common.collect.Iterators;
 -import org.apache.cassandra.*;
 +import com.googlecode.concurrenttrees.common.Iterables;
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.UpdateBuilder;
 +import org.apache.cassandra.Util;
  import org.apache.cassandra.cql3.Operator;
++import org.apache.cassandra.db.ColumnFamilyStore.FlushReason;
++import org.apache.cassandra.db.commitlog.CommitLogPosition;
++import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.apache.cassandra.db.lifecycle.SSTableSet;
 -import org.apache.cassandra.db.rows.*;
 -import org.apache.cassandra.db.partitions.*;
++import org.apache.cassandra.db.memtable.AbstractMemtable;
++import org.apache.cassandra.db.memtable.Memtable;
 +import org.apache.cassandra.db.partitions.FilteredPartition;
++import org.apache.cassandra.db.partitions.PartitionUpdate;
++import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.rows.Cell;
++import org.apache.cassandra.db.rows.EncodingStats;
 +import org.apache.cassandra.db.rows.Row;
++import org.apache.cassandra.db.rows.UnfilteredRowIterator;
  import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.index.transactions.UpdateTransaction;
  import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.Descriptor;
 -import org.apache.cassandra.io.sstable.format.SSTableFormat;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
++import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
 +import org.apache.cassandra.io.util.File;
 +import org.apache.cassandra.io.util.FileUtils;
  import org.apache.cassandra.metrics.ClearableHistogram;
  import org.apache.cassandra.schema.ColumnMetadata;
  import org.apache.cassandra.schema.KeyspaceParams;
@@@ -65,12 -59,7 +77,14 @@@ import org.apache.cassandra.service.sna
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.WrappedRunnable;
 -import static junit.framework.Assert.assertNotNull;
++import org.apache.cassandra.utils.concurrent.OpOrder.Barrier;
++import org.apache.cassandra.utils.concurrent.OpOrder.Group;
 +
 +import static org.assertj.core.api.Assertions.assertThat;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertTrue;
  
  public class ColumnFamilyStoreTest
  {
@@@ -110,6 -91,15 +124,13 @@@
          
Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD1).truncateBlocking();
      }
  
+     @Test
+     public void testMemtableTimestamp() throws Throwable
+     {
 -        assertEquals(Memtable.NO_MIN_TIMESTAMP,
 -                     (new 
Memtable(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata(),
 -                                   EncodingStats.NO_STATS.minTimestamp))
 -                     .getMinTimestamp());
++        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
++        assertEquals(Memtable.NO_MIN_TIMESTAMP, fakeMemTableWithMinTS(cfs, 
EncodingStats.NO_STATS.minTimestamp).getMinTimestamp());
+     }
+ 
      @Test
      // create two sstables, and verify that we only deserialize data from the 
most recent one
      public void testTimeSortedQuery()
@@@ -659,18 -556,4 +680,149 @@@
          assertEquals(0, ssTableFiles.size());
          cfs.clearUnsafe();
      }
 +
 +    @VisibleForTesting
 +    public static long getSnapshotManifestAndSchemaFileSizes(TableSnapshot 
snapshot) throws IOException
 +    {
 +        Optional<File> schemaFile = snapshot.getSchemaFile();
 +        Optional<File> manifestFile = snapshot.getManifestFile();
 +
 +        long schemaAndManifestFileSizes = 0;
 +
 +        schemaAndManifestFileSizes += schemaFile.isPresent() ? 
schemaFile.get().length() : 0;
 +        schemaAndManifestFileSizes += manifestFile.isPresent() ? 
manifestFile.get().length() : 0;
 +
 +        return schemaAndManifestFileSizes;
 +    }
++
++    private Memtable fakeMemTableWithMinTS(ColumnFamilyStore cfs, long minTS)
++    {
++        return new AbstractMemtable(cfs.metadata, minTS)
++        {
++
++            @Override
++            public UnfilteredRowIterator rowIterator(DecoratedKey key,
++                                                     Slices slices,
++                                                     ColumnFilter 
columnFilter,
++                                                     boolean reversed,
++                                                     SSTableReadsListener 
listener)
++            {
++                return null;
++            }
++
++            @Override
++            public UnfilteredPartitionIterator
++                   partitionIterator(ColumnFilter columnFilter, DataRange 
dataRange, SSTableReadsListener listener)
++            {
++                return null;
++            }
++
++            @Override
++            public void switchOut(Barrier writeBarrier, 
AtomicReference<CommitLogPosition> commitLogUpperBound)
++            {
++            }
++
++            @Override
++            public boolean shouldSwitch(FlushReason reason)
++            {
++                return false;
++            }
++
++            @Override
++            public long put(PartitionUpdate update, UpdateTransaction 
indexer, Group opGroup)
++            {
++                return 0;
++            }
++
++            @Override
++            public void performSnapshot(String snapshotName)
++            {
++            }
++
++            @Override
++            public long partitionCount()
++            {
++                return 0;
++            }
++
++            @Override
++            public void metadataUpdated()
++            {
++            }
++
++            @Override
++            public boolean mayContainDataBefore(CommitLogPosition position)
++            {
++                return false;
++            }
++
++            @Override
++            public void markExtraOnHeapUsed(long additionalSpace, Group 
opGroup)
++            {
++            }
++
++            @Override
++            public void markExtraOffHeapUsed(long additionalSpace, Group 
opGroup)
++            {
++            }
++
++            @Override
++            public void localRangesUpdated()
++            {
++            }
++
++            @Override
++            public boolean isClean()
++            {
++                return false;
++            }
++
++            @Override
++            public long getLiveDataSize()
++            {
++                return 0;
++            }
++
++            @Override
++            public FlushablePartitionSet<?> getFlushSet(PartitionPosition 
from, PartitionPosition to)
++            {
++                // TODO Auto-generated method stub
++                return null;
++            }
++
++            @Override
++            public LastCommitLogPosition getFinalCommitLogUpperBound()
++            {
++                return null;
++            }
++
++            @Override
++            public CommitLogPosition getCommitLogLowerBound()
++            {
++                return null;
++            }
++
++            @Override
++            public CommitLogPosition getApproximateCommitLogLowerBound()
++            {
++                return null;
++            }
++
++            @Override
++            public void discard()
++            {
++            }
++
++            @Override
++            public void addMemoryUsageTo(MemoryUsage usage)
++            {
++            }
++
++            @Override
++            public boolean accepts(Group opGroup, CommitLogPosition 
commitLogPosition)
++            {
++                return false;
++            }
++        };
++    }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to