This is an automated email from the ASF dual-hosted git repository. edimitrova pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 661f1aa Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318) Patch by Ekaterina Dimitrova and Branimir Lambov; reviewed by Benjamin Lerer and Branimir Lambov for CASSANDRA-16318 661f1aa is described below commit 661f1aab171dc3ef16075f69581e88ad4a133fae Author: Branimir Lambov <branimir.lam...@datastax.com> AuthorDate: Tue Dec 8 15:37:39 2020 +0200 Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318) Patch by Ekaterina Dimitrova and Branimir Lambov; reviewed by Benjamin Lerer and Branimir Lambov for CASSANDRA-16318 --- CHANGES.txt | 3 +- src/java/org/apache/cassandra/db/Columns.java | 10 + src/java/org/apache/cassandra/db/DeletionTime.java | 3 + src/java/org/apache/cassandra/db/LivenessInfo.java | 16 +- src/java/org/apache/cassandra/db/Memtable.java | 33 ++- .../apache/cassandra/db/MutableDeletionInfo.java | 3 + .../cassandra/db/RegularAndStaticColumns.java | 12 +- .../db/partitions/AbstractBTreePartition.java | 2 + .../db/partitions/AtomicBTreePartition.java | 9 +- .../org/apache/cassandra/db/rows/BTreeRow.java | 6 +- .../apache/cassandra/db/rows/EncodingStats.java | 15 +- src/java/org/apache/cassandra/db/rows/Row.java | 12 +- test/bin/jmh | 137 ++++++++++++ test/conf/logback-jmh.xml | 101 +++++++++ .../test/microbench/instance/ReadTest.java | 233 +++++++++++++++++++++ .../instance/ReadTestSmallPartitions.java | 60 ++++++ .../instance/ReadTestWidePartitions.java | 96 +++++++++ .../unit/org/apache/cassandra/ServerTestUtils.java | 5 +- test/unit/org/apache/cassandra/cql3/CQLTester.java | 4 +- .../apache/cassandra/cql3/MemtableSizeTest.java | 124 +++++++++++ 20 files changed, 858 insertions(+), 26 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 09b02d5..9a304b6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,5 @@ 4.0-beta5 - * Fix client notifications in CQL protocol v5 (CASSANDRA-16353) - * Too defensive check when picking sstables for preview repair (CASSANDRA-16284) + * Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318) * Ensure pre-negotiation native protocol responses have correct stream id (CASSANDRA-16376) * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279) * SSLFactory should initialize SSLContext before setting protocols (CASSANDRA-16362) diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java index aceb868..9e07375 100644 --- a/src/java/org/apache/cassandra/db/Columns.java +++ b/src/java/org/apache/cassandra/db/Columns.java @@ -36,6 +36,7 @@ import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.SearchIterator; import org.apache.cassandra.utils.btree.BTree; import org.apache.cassandra.utils.btree.BTreeSearchIterator; @@ -52,6 +53,7 @@ public class Columns extends AbstractCollection<ColumnMetadata> implements Colle { public static final Serializer serializer = new Serializer(); public static final Columns NONE = new Columns(BTree.empty(), 0); + static final long EMPTY_SIZE = ObjectSizes.measure(NONE); public static final ColumnMetadata FIRST_COMPLEX_STATIC = new ColumnMetadata("", @@ -406,6 +408,14 @@ public class Columns extends AbstractCollection<ColumnMetadata> implements Colle return Objects.hash(complexIdx, BTree.hashCode(columns)); } + public long unsharedHeapSize() + { + if(this == NONE) + return 0; + + return EMPTY_SIZE; + } + @Override public String toString() { diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java index f1471fd..d8ac91d 100644 --- a/src/java/org/apache/cassandra/db/DeletionTime.java +++ b/src/java/org/apache/cassandra/db/DeletionTime.java @@ -157,6 +157,9 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory public long unsharedHeapSize() { + if (this == LIVE) + return 0; + return EMPTY_SIZE; } diff --git a/src/java/org/apache/cassandra/db/LivenessInfo.java b/src/java/org/apache/cassandra/db/LivenessInfo.java index b1ea3f6..f3e6daa 100644 --- a/src/java/org/apache/cassandra/db/LivenessInfo.java +++ b/src/java/org/apache/cassandra/db/LivenessInfo.java @@ -19,8 +19,10 @@ package org.apache.cassandra.db; import java.util.Objects; +import org.apache.cassandra.cache.IMeasurableMemory; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.serializers.MarshalException; +import org.apache.cassandra.utils.ObjectSizes; /** * Stores the information relating to the liveness of the primary key columns of a row. @@ -35,7 +37,7 @@ import org.apache.cassandra.serializers.MarshalException; * unaffected (of course, the rest of said row data might be ttl'ed on its own but this is * separate). */ -public class LivenessInfo +public class LivenessInfo implements IMeasurableMemory { public static final long NO_TIMESTAMP = Long.MIN_VALUE; public static final int NO_TTL = Cell.NO_TTL; @@ -49,6 +51,7 @@ public class LivenessInfo public static final int NO_EXPIRATION_TIME = Cell.NO_DELETION_TIME; public static final LivenessInfo EMPTY = new LivenessInfo(NO_TIMESTAMP); + private static final long UNSHARED_HEAP_SIZE = ObjectSizes.measure(EMPTY); protected final long timestamp; @@ -255,6 +258,11 @@ public class LivenessInfo return Objects.hash(timestamp(), ttl(), localExpirationTime()); } + public long unsharedHeapSize() + { + return this == EMPTY ? 0 : UNSHARED_HEAP_SIZE; + } + /** * Effectively acts as a PK tombstone. This is used for Materialized Views to shadow * updated entries while co-existing with row tombstones. @@ -294,6 +302,7 @@ public class LivenessInfo { private final int ttl; private final int localExpirationTime; + private static final long UNSHARED_HEAP_SIZE = ObjectSizes.measure(new ExpiringLivenessInfo(-1, -1, -1)); private ExpiringLivenessInfo(long timestamp, int ttl, int localExpirationTime) { @@ -364,5 +373,10 @@ public class LivenessInfo { return String.format("[ts=%d ttl=%d, let=%d]", timestamp, ttl, localExpirationTime); } + + public long unsharedHeapSize() + { + return UNSHARED_HEAP_SIZE; + } } } diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index c969616..cdbe163 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -17,38 +17,52 @@ */ package org.apache.cassandra.db; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.schema.ColumnMetadata; -import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.commitlog.IntervalSet; import org.apache.cassandra.db.filter.ClusteringIndexFilter; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; -import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.partitions.AbstractBTreePartition; +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.AtomicBTreePartition; +import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.rows.UnfilteredRowIterator; -import org.apache.cassandra.dht.*; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.IncludingExcludingBounds; import org.apache.cassandra.dht.Murmur3Partitioner.LongToken; +import org.apache.cassandra.dht.Range; import org.apache.cassandra.index.transactions.UpdateTransaction; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; -import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -529,6 +543,7 @@ public class Memtable implements Comparable<Memtable> rowOverhead = (int) ((avgSize - Math.floor(avgSize)) < 0.05 ? Math.floor(avgSize) : Math.ceil(avgSize)); rowOverhead -= ObjectSizes.measureDeep(new LongToken(0)); rowOverhead += AtomicBTreePartition.EMPTY_SIZE; + rowOverhead += AbstractBTreePartition.HOLDER_UNSHARED_HEAP_SIZE; allocator.setDiscarding(); allocator.setDiscarded(); return rowOverhead; diff --git a/src/java/org/apache/cassandra/db/MutableDeletionInfo.java b/src/java/org/apache/cassandra/db/MutableDeletionInfo.java index 8544b78..bfe4d4c 100644 --- a/src/java/org/apache/cassandra/db/MutableDeletionInfo.java +++ b/src/java/org/apache/cassandra/db/MutableDeletionInfo.java @@ -248,6 +248,9 @@ public class MutableDeletionInfo implements DeletionInfo @Override public long unsharedHeapSize() { + if (this == LIVE) + return 0; + return EMPTY_SIZE + partitionDeletion.unsharedHeapSize() + (ranges == null ? 0 : ranges.unsharedHeapSize()); } diff --git a/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java b/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java index fab7730..1501345 100644 --- a/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java +++ b/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java @@ -22,6 +22,7 @@ import java.util.*; import com.google.common.collect.Iterators; import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.btree.BTreeSet; import static java.util.Comparator.naturalOrder; @@ -33,6 +34,7 @@ import static java.util.Comparator.naturalOrder; public class RegularAndStaticColumns implements Iterable<ColumnMetadata> { public static RegularAndStaticColumns NONE = new RegularAndStaticColumns(Columns.NONE, Columns.NONE); + static final long EMPTY_SIZE = ObjectSizes.measure(NONE); public final Columns statics; public final Columns regulars; @@ -105,11 +107,19 @@ public class RegularAndStaticColumns implements Iterable<ColumnMetadata> return regulars.size() + statics.size(); } + public long unsharedHeapSize() + { + if(this == NONE) + return 0; + + return EMPTY_SIZE + regulars.unsharedHeapSize() + statics.unsharedHeapSize(); + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("[").append(statics).append(" | ").append(regulars).append("]"); + sb.append('[').append(statics).append(" | ").append(regulars).append(']'); return sb.toString(); } diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java index 44dc0b0..1d6603e 100644 --- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java @@ -28,6 +28,7 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.SearchIterator; import org.apache.cassandra.utils.btree.BTree; @@ -36,6 +37,7 @@ import static org.apache.cassandra.utils.btree.BTree.Dir.desc; public abstract class AbstractBTreePartition implements Partition, Iterable<Row> { protected static final Holder EMPTY = new Holder(RegularAndStaticColumns.NONE, BTree.empty(), DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS); + public static final long HOLDER_UNSHARED_HEAP_SIZE = ObjectSizes.measure(EMPTY); protected final DecoratedKey partitionKey; diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java index ed635cd..801d9e2 100644 --- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java @@ -135,12 +135,14 @@ public final class AtomicBTreePartition extends AbstractBTreePartition } RegularAndStaticColumns columns = update.columns().mergeTo(current.columns); + updater.allocated(columns.unsharedHeapSize() - current.columns.unsharedHeapSize()); Row newStatic = update.staticRow(); Row staticRow = newStatic.isEmpty() ? current.staticRow : (current.staticRow.isEmpty() ? updater.apply(newStatic) : updater.apply(current.staticRow, newStatic)); Object[] tree = BTree.update(current.tree, update.metadata().comparator, update, update.rowCount(), updater); EncodingStats newStats = current.stats.mergeWith(update.stats()); + updater.allocated(newStats.unsharedHeapSize() - current.stats.unsharedHeapSize()); if (tree != null && refUpdater.compareAndSet(this, current, new Holder(columns, tree, deletionInfo, staticRow, newStats))) { @@ -274,8 +276,7 @@ public final class AtomicBTreePartition extends AbstractBTreePartition if (!writeOp.isOldestLiveGroup()) { Thread.yield(); - if (!writeOp.isOldestLiveGroup()) - return false; + return writeOp.isOldestLiveGroup(); } return true; @@ -370,7 +371,7 @@ public final class AtomicBTreePartition extends AbstractBTreePartition indexer.onInserted(insert); this.dataSize += data.dataSize(); - this.heapSize += data.unsharedHeapSizeExcludingData(); + allocated(data.unsharedHeapSizeExcludingData()); if (inserted == null) inserted = new ArrayList<>(); inserted.add(data); @@ -387,7 +388,7 @@ public final class AtomicBTreePartition extends AbstractBTreePartition indexer.onUpdated(existing, reconciled); dataSize += reconciled.dataSize() - existing.dataSize(); - heapSize += reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData(); + allocated(reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData()); if (inserted == null) inserted = new ArrayList<>(); inserted.add(reconciled); diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java index e8476dd..b971307 100644 --- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java +++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java @@ -494,8 +494,10 @@ public class BTreeRow extends AbstractRow public long unsharedHeapSizeExcludingData() { long heapSize = EMPTY_SIZE - + clustering.unsharedHeapSizeExcludingData() - + BTree.sizeOfStructureOnHeap(btree); + + clustering.unsharedHeapSizeExcludingData() + + primaryKeyLivenessInfo.unsharedHeapSize() + + deletion.unsharedHeapSize() + + BTree.sizeOfStructureOnHeap(btree); return accumulate((cd, v) -> v + cd.unsharedHeapSizeExcludingData(), heapSize); } diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java b/src/java/org/apache/cassandra/db/rows/EncodingStats.java index f2fb340..37dd34e 100644 --- a/src/java/org/apache/cassandra/db/rows/EncodingStats.java +++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java @@ -21,12 +21,12 @@ import java.io.IOException; import java.util.*; import java.util.function.Function; -import com.google.common.collect.Iterables; - +import org.apache.cassandra.cache.IMeasurableMemory; import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.ObjectSizes; /** * Stats used for the encoding of the rows and tombstones of a given source. @@ -40,13 +40,14 @@ import org.apache.cassandra.io.util.DataOutputPlus; * this shouldn't have too huge an impact on performance) and in fact they will not always be * accurate for reasons explained in {@link SerializationHeader#make}. */ -public class EncodingStats +public class EncodingStats implements IMeasurableMemory { // Default values for the timestamp, deletion time and ttl. We use this both for NO_STATS, but also to serialize // an EncodingStats. Basically, we encode the diff of each value of to these epoch, which give values with better vint encoding. public static final long TIMESTAMP_EPOCH; private static final int DELETION_TIME_EPOCH; private static final int TTL_EPOCH = 0; + static { // We want a fixed epoch, but that provide small values when substracted from our timestamp and deletion time. @@ -66,6 +67,7 @@ public class EncodingStats // We should use this sparingly obviously public static final EncodingStats NO_STATS = new EncodingStats(TIMESTAMP_EPOCH, DELETION_TIME_EPOCH, TTL_EPOCH); + public static long HEAP_SIZE = ObjectSizes.measure(NO_STATS); public static final Serializer serializer = new Serializer(); @@ -152,6 +154,13 @@ public class EncodingStats return Objects.hash(minTimestamp, minLocalDeletionTime, minTTL); } + public long unsharedHeapSize() + { + if (this == NO_STATS) + return 0; + return HEAP_SIZE; + } + @Override public String toString() { diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java index ec43783..5c28cd1 100644 --- a/src/java/org/apache/cassandra/db/rows/Row.java +++ b/src/java/org/apache/cassandra/db/rows/Row.java @@ -29,6 +29,7 @@ import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.utils.BiLongAccumulator; import org.apache.cassandra.utils.LongAccumulator; import org.apache.cassandra.utils.MergeIterator; +import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.SearchIterator; import org.apache.cassandra.utils.btree.BTree; import org.apache.cassandra.utils.btree.UpdateFunction; @@ -323,6 +324,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData> public static class Deletion { public static final Deletion LIVE = new Deletion(DeletionTime.LIVE, false); + private static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionTime(0, 0)); private final DeletionTime time; private final boolean isShadowable; @@ -422,6 +424,14 @@ public interface Row extends Unfiltered, Iterable<ColumnData> return this.time.equals(that.time) && this.isShadowable == that.isShadowable; } + public long unsharedHeapSize() + { + if(this == LIVE) + return 0; + + return EMPTY_SIZE + time().unsharedHeapSize(); + } + @Override public final int hashCode() { @@ -722,7 +732,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData> // Because some data might have been shadowed by the 'activeDeletion', we could have an empty row return rowInfo.isEmpty() && rowDeletion.isLive() && dataBuffer.isEmpty() ? null - : BTreeRow.create(clustering, rowInfo, rowDeletion, BTree.build(dataBuffer, UpdateFunction.<ColumnData>noOp())); + : BTreeRow.create(clustering, rowInfo, rowDeletion, BTree.build(dataBuffer, UpdateFunction.noOp())); } public Clustering<?> mergedClustering() diff --git a/test/bin/jmh b/test/bin/jmh new file mode 100755 index 0000000..c56145f --- /dev/null +++ b/test/bin/jmh @@ -0,0 +1,137 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +jvmoptions_variant="-server" +export CASSANDRA_HOME=`dirname "$0"`/../../ +. $CASSANDRA_HOME/bin/cassandra.in.sh + +# Use JAVA_HOME if set, otherwise look for java in PATH +if [ -n "$JAVA_HOME" ]; then + # Why we can't have nice things: Solaris combines x86 and x86_64 + # installations in the same tree, using an unconventional path for the + # 64bit JVM. Since we prefer 64bit, search the alternate path first, + # (see https://issues.apache.org/jira/browse/CASSANDRA-4638). + for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do + if [ -x "$java" ]; then + JAVA="$java" + break + fi + done +else + JAVA=java +fi + +if [ -z $JAVA ] ; then + echo Unable to find java executable. Check JAVA_HOME and PATH environment variables. >&2 + exit 1; +fi + +# If numactl is available, use it. For Cassandra, the priority is to +# avoid disk I/O. Even for the purpose of CPU efficiency, we don't +# really have CPU<->data affinity anyway. Also, empirically test that numactl +# works before trying to use it (CASSANDRA-3245). +NUMACTL_ARGS=${NUMACTL_ARGS:-"--localalloc"} +if which numactl >/dev/null 2>/dev/null && numactl $NUMACTL_ARGS ls / >/dev/null 2>/dev/null +then + NUMACTL="numactl $NUMACTL_ARGS" +else + NUMACTL="" +fi + +if [ -z "$CASSANDRA_CONF" -o -z "$CLASSPATH" ]; then + echo "You must set the CASSANDRA_CONF and CLASSPATH vars" >&2 + exit 1 +fi + +if [ -f "$CASSANDRA_CONF/cassandra-env.sh" ]; then + . "$CASSANDRA_CONF/cassandra-env.sh" +fi + +# Special-case path variables. +case "`uname`" in + CYGWIN*) + CLASSPATH=`cygpath -p -w "$CLASSPATH"` + CASSANDRA_CONF=`cygpath -p -w "$CASSANDRA_CONF"` + ;; +esac + +# Cassandra uses an installed jemalloc via LD_PRELOAD / DYLD_INSERT_LIBRARIES by default to improve off-heap +# memory allocation performance. The following code searches for an installed libjemalloc.dylib/.so/.1.so using +# Linux and OS-X specific approaches. +# To specify your own libjemalloc in a different path, configure the fully qualified path in CASSANDRA_LIBJEMALLOC. +# To disable jemalloc preload at all, set CASSANDRA_LIBJEMALLOC=- +# +#CASSANDRA_LIBJEMALLOC= +# +find_library() +{ + pattern=$1 + path=$(echo ${2} | tr ":" " ") + + find $path -regex "$pattern" -print 2>/dev/null | head -n 1 +} +case "`uname -s`" in + Linux) + if [ -z $CASSANDRA_LIBJEMALLOC ] ; then + which ldconfig > /dev/null 2>&1 + if [ $? = 0 ] ; then + # e.g. for CentOS + dirs="/lib64 /lib /usr/lib64 /usr/lib `ldconfig -v 2>/dev/null | grep -v '^\s' | sed 's/^\([^:]*\):.*$/\1/'`" + else + # e.g. for Debian, OpenSUSE + dirs="/lib64 /lib /usr/lib64 /usr/lib `cat /etc/ld.so.conf /etc/ld.so.conf.d/*.conf | grep '^/'`" + fi + dirs=`echo $dirs | tr " " ":"` + CASSANDRA_LIBJEMALLOC=$(find_library '.*/libjemalloc\.so\(\.1\)*' $dirs) + fi + if [ ! -z $CASSANDRA_LIBJEMALLOC ] ; then + export JVM_OPTS="$JVM_OPTS -Dcassandra.libjemalloc=$CASSANDRA_LIBJEMALLOC" + if [ "-" != "$CASSANDRA_LIBJEMALLOC" ] ; then + export LD_PRELOAD=$CASSANDRA_LIBJEMALLOC + fi + fi + ;; + Darwin) + if [ -z $CASSANDRA_LIBJEMALLOC ] ; then + CASSANDRA_LIBJEMALLOC=$(find_library '.*/libjemalloc\.dylib' $DYLD_LIBRARY_PATH:${DYLD_FALLBACK_LIBRARY_PATH-$HOME/lib:/usr/local/lib:/lib:/usr/lib}) + fi + if [ ! -z $CASSANDRA_LIBJEMALLOC ] ; then + export JVM_OPTS="$JVM_OPTS -Dcassandra.libjemalloc=$CASSANDRA_LIBJEMALLOC" + if [ "-" != "$CASSANDRA_LIBJEMALLOC" ] ; then + export DYLD_INSERT_LIBRARIES=$CASSANDRA_LIBJEMALLOC + fi + fi + ;; +esac + +cassandra_parms="-Dlogback.configurationFile=$CASSANDRA_HOME/test/conf/logback-jmh.xml" +cassandra_parms="$cassandra_parms -Dcassandra.logdir=$CASSANDRA_HOME/logs" +cassandra_parms="$cassandra_parms -Dcassandra.storagedir=$cassandra_storagedir" +cassandra_parms="$cassandra_parms -Dcassandra-foreground=yes" +cassandra_parms="$cassandra_parms -XX:+PreserveFramePointer" + +# Create log directory, some tests require that +mkdir -p $CASSANDRA_HOME/logs + +if [ ! -f $CASSANDRA_HOME/build/test/benchmarks.jar ] ; then + echo "$CASSANDRA_HOME/build/test/benchmarks.jar does not exist - execute 'ant build-jmh'" + exit 1 +fi + +exec $NUMACTL "$JAVA" -cp "$CLASSPATH:$CASSANDRA_HOME/build/test/benchmarks.jar:$CASSANDRA_HOME/build/test/deps.jar" org.openjdk.jmh.Main -jvmArgs="$cassandra_parms $JVM_OPTS" $@ + +# vi:ai sw=4 ts=4 tw=0 et diff --git a/test/conf/logback-jmh.xml b/test/conf/logback-jmh.xml new file mode 100644 index 0000000..4138f19 --- /dev/null +++ b/test/conf/logback-jmh.xml @@ -0,0 +1,101 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<!-- +In order to disable debug.log, comment-out the ASYNCDEBUGLOG +appender reference in the root level section below. +--> + +<configuration scan="false" scanPeriod="60 seconds"> + <jmxConfigurator /> + <!-- No shutdown hook; we run it ourselves in StorageService after shutdown --> + + <!-- SYSTEMLOG rolling file appender to system.log (INFO level) --> + + <appender name="SYSTEMLOG" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>INFO</level> + </filter> + <file>${cassandra.logdir}/system.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> + <!-- rollover daily --> + <fileNamePattern>${cassandra.logdir}/system.log.%d{yyyy-MM-dd}.%i.zip</fileNamePattern> + <!-- each file should be at most 50MB, keep 7 days worth of history, but at most 5GB --> + <maxFileSize>50MB</maxFileSize> + <maxHistory>7</maxHistory> + <totalSizeCap>5GB</totalSizeCap> + </rollingPolicy> + <encoder> + <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern> + </encoder> + </appender> + + <!-- DEBUGLOG rolling file appender to debug.log (all levels) --> + + <appender name="DEBUGLOG" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>${cassandra.logdir}/debug.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> + <!-- rollover daily --> + <fileNamePattern>${cassandra.logdir}/debug.log.%d{yyyy-MM-dd}.%i.zip</fileNamePattern> + <!-- each file should be at most 50MB, keep 7 days worth of history, but at most 5GB --> + <maxFileSize>50MB</maxFileSize> + <maxHistory>7</maxHistory> + <totalSizeCap>5GB</totalSizeCap> + </rollingPolicy> + <encoder> + <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern> + </encoder> + </appender> + + <!-- ASYNCLOG assynchronous appender to debug.log (all levels) --> + <appender name="ASYNCDEBUGLOG" class="ch.qos.logback.classic.AsyncAppender"> + <queueSize>1024</queueSize> + <discardingThreshold>0</discardingThreshold> + <includeCallerData>true</includeCallerData> + <appender-ref ref="DEBUGLOG" /> + </appender> + + <!-- NOTE: For JMH we disable the STDOUT logging to avoid noise in output --> + <!-- STDOUT console appender to stdout (INFO level) + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>INFO</level> + </filter> + <encoder> + <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern> + </encoder> + </appender--> + + <!-- Uncomment bellow and corresponding appender-ref to activate logback metrics + <appender name="LogbackMetrics" class="com.codahale.metrics.logback.InstrumentedAppender" /> + --> + + <root level="INFO"> + <appender-ref ref="SYSTEMLOG" /> + <!-- appender-ref ref="STDOUT" /--> + <appender-ref ref="ASYNCDEBUGLOG" /> <!-- Comment this line to disable debug.log --> + <!-- + <appender-ref ref="LogbackMetrics" /> + --> + </root> + + <logger name="org.apache.cassandra" level="DEBUG"/> + <logger name="com.datastax.bdp.db" level="DEBUG"/> +</configuration> diff --git a/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTest.java b/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTest.java new file mode 100644 index 0000000..789ca00 --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench.instance; + + +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.IntStream; + +import com.google.common.base.Throwables; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.utils.FBUtilities; +import org.openjdk.jmh.annotations.*; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Warmup(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 15, time = 2, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@Threads(1) +@State(Scope.Benchmark) +public abstract class ReadTest extends CQLTester +{ + static String keyspace; + String table; + ColumnFamilyStore cfs; + Random rand; + + @Param({"1000"}) + int BATCH = 1_000; + + public enum Flush + { + INMEM, NO, YES + } + + @Param({"1000000"}) + int count = 1_000_000; + + @Param({"INMEM", "YES"}) + Flush flush = Flush.INMEM; + + public enum Execution + { + SERIAL, + SERIAL_NET, + PARALLEL, + PARALLEL_NET, + } + + @Param({"PARALLEL"}) + Execution async = Execution.PARALLEL; + + @Setup(Level.Trial) + public void setup() throws Throwable + { + rand = new Random(1); + CQLTester.setUpClass(); + CQLTester.prepareServer(); + System.err.println("setupClass done."); + keyspace = createKeyspace("CREATE KEYSPACE %s with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } and durable_writes = false"); + table = createTable(keyspace, "CREATE TABLE %s ( userid bigint, picid bigint, commentid bigint, PRIMARY KEY(userid, picid)) with compression = {'enabled': false}"); + execute("use "+keyspace+";"); + switch (async) + { + case SERIAL_NET: + case PARALLEL_NET: + CQLTester.requireNetwork(); + executeNet(getDefaultVersion(), "use " + keyspace + ";"); + } + String writeStatement = "INSERT INTO "+table+"(userid,picid,commentid)VALUES(?,?,?)"; + System.err.println("Prepared, batch " + BATCH + " flush " + flush); + System.err.println("Disk access mode " + DatabaseDescriptor.getDiskAccessMode() + " index " + DatabaseDescriptor.getIndexAccessMode()); + + cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + cfs.disableAutoCompaction(); + cfs.forceBlockingFlush(); + + //Warm up + System.err.println("Writing " + count); + long i; + for (i = 0; i <= count - BATCH; i += BATCH) + performWrite(writeStatement, i, BATCH); + if (i < count) + performWrite(writeStatement, i, count - i); + + Memtable memtable = cfs.getTracker().getView().getCurrentMemtable(); + System.err.format("Memtable in %s mode: %d ops, %s serialized bytes, %s (%.0f%%) on heap, %s (%.0f%%) off-heap\n", + DatabaseDescriptor.getMemtableAllocationType(), + memtable.getOperations(), + FBUtilities.prettyPrintMemory(memtable.getLiveDataSize()), + FBUtilities.prettyPrintMemory(memtable.getAllocator().onHeap().owns()), + 100 * memtable.getAllocator().onHeap().ownershipRatio(), + FBUtilities.prettyPrintMemory(memtable.getAllocator().offHeap().owns()), + 100 * memtable.getAllocator().offHeap().ownershipRatio()); + + switch (flush) + { + case YES: + cfs.forceBlockingFlush(); + break; + case INMEM: + if (!cfs.getLiveSSTables().isEmpty()) + throw new AssertionError("SSTables created for INMEM test."); + default: + // don't flush + } + + // Needed to stabilize sstable count for off-cache sized tests (e.g. count = 100_000_000) + while (cfs.getLiveSSTables().size() >= 15) + { + cfs.enableAutoCompaction(true); + cfs.disableAutoCompaction(); + } + } + + abstract Object[] writeArguments(long i); + + public void performWrite(String writeStatement, long ofs, long count) throws Throwable + { + for (long i = ofs; i < ofs + count; ++i) + execute(writeStatement, writeArguments(i)); + } + + + @TearDown(Level.Trial) + public void teardown() throws InterruptedException + { + if (flush == Flush.INMEM && !cfs.getLiveSSTables().isEmpty()) + throw new AssertionError("SSTables created for INMEM test."); + + // do a flush to print sizes + cfs.forceBlockingFlush(); + + CommitLog.instance.shutdownBlocking(); + CQLTester.tearDownClass(); + CQLTester.cleanup(); + } + + public Object performReadSerial(String readStatement, Supplier<Object[]> supplier) throws Throwable + { + long sum = 0; + for (int i = 0; i < BATCH; ++i) + sum += execute(readStatement, supplier.get()).size(); + return sum; + } + + public Object performReadThreads(String readStatement, Supplier<Object[]> supplier) throws Throwable + { + return IntStream.range(0, BATCH) + .parallel() + .mapToLong(i -> + { + try + { + return execute(readStatement, supplier.get()).size(); + } + catch (Throwable throwable) + { + throw Throwables.propagate(throwable); + } + }) + .sum(); + } + + public Object performReadSerialNet(String readStatement, Supplier<Object[]> supplier) throws Throwable + { + long sum = 0; + for (int i = 0; i < BATCH; ++i) + sum += executeNet(getDefaultVersion(), readStatement, supplier.get()) + .getAvailableWithoutFetching(); + return sum; + } + + public long performReadThreadsNet(String readStatement, Supplier<Object[]> supplier) throws Throwable + { + return IntStream.range(0, BATCH) + .parallel() + .mapToLong(i -> + { + try + { + return executeNet(getDefaultVersion(), readStatement, supplier.get()) + .getAvailableWithoutFetching(); + } + catch (Throwable throwable) + { + throw Throwables.propagate(throwable); + } + }) + .sum(); + } + + + public Object performRead(String readStatement, Supplier<Object[]> supplier) throws Throwable + { + switch (async) + { + case SERIAL: + return performReadSerial(readStatement, supplier); + case SERIAL_NET: + return performReadSerialNet(readStatement, supplier); + case PARALLEL: + return performReadThreads(readStatement, supplier); + case PARALLEL_NET: + return performReadThreadsNet(readStatement, supplier); + } + return null; + } +} diff --git a/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTestSmallPartitions.java b/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTestSmallPartitions.java new file mode 100644 index 0000000..b36cfd1 --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTestSmallPartitions.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench.instance; + + +import org.openjdk.jmh.annotations.Benchmark; + +public class ReadTestSmallPartitions extends ReadTest +{ + String readStatement() + { + return "SELECT * from "+table+" where userid=?"; + } + + @Override + public Object[] writeArguments(long i) + { + return new Object[] { i, i, i }; + } + + @Benchmark + public Object readRandomInside() throws Throwable + { + return performRead(readStatement(), () -> new Object[] { (long) rand.nextInt(count) }); + } + + @Benchmark + public Object readRandomWOutside() throws Throwable + { + return performRead(readStatement(), () -> new Object[] { (long) rand.nextInt(count + count / 6) }); + } + + @Benchmark + public Object readFixed() throws Throwable + { + return performRead(readStatement(), () -> new Object[] { 1234567890123L % count }); + } + + @Benchmark + public Object readOutside() throws Throwable + { + return performRead(readStatement(), () -> new Object[] { count + 1234567L }); + } +} diff --git a/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTestWidePartitions.java b/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTestWidePartitions.java new file mode 100644 index 0000000..c36e09f --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTestWidePartitions.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench.instance; + + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Param; + +public class ReadTestWidePartitions extends ReadTest +{ + @Param({"1000", "4"}) // wide and very wide partitions + int partitions = 4; + + @Override + public Object[] writeArguments(long i) + { + return new Object[] { i % partitions, i, i }; + } + + Object[] readArguments(long i, long offset) + { + return new Object[] { (i + offset) % partitions, i }; + } + + @Benchmark + public Object readRandomInside() throws Throwable + { + return performRead("SELECT * from " + table + " where userid=? and picid=?", + () -> readArguments(rand.nextInt(count),0)); + } + + @Benchmark + public Object readRandomWOutside() throws Throwable + { + return performRead("SELECT * from " + table + " where userid=? and picid=?", + () -> readArguments(rand.nextInt(count), rand.nextInt(6) == 1 ? 1 : 0)); + } + + @Benchmark + public Object readFixed() throws Throwable + { + return performRead("SELECT * from " + table + " where userid=? and picid=?", + () -> readArguments(1234567890123L % count, 0)); + } + + @Benchmark + public Object readOutside() throws Throwable + { + return performRead("SELECT * from " + table + " where userid=? and picid=?", + () -> readArguments(1234567890123L % count, 1)); + } + + @Benchmark + public Object readGreaterMatch() throws Throwable + { + return performRead("SELECT * from "+table+" where userid=? and picid>? limit 1", + () -> readArguments(rand.nextInt(count), 0)); + } + + @Benchmark + public Object readReversedMatch() throws Throwable + { + return performRead("SELECT * from "+table+" where userid=? and picid<? order by picid desc limit 1", + () -> readArguments(rand.nextInt(count), 0)); + } + + @Benchmark + public Object readGreater() throws Throwable + { + return performRead("SELECT * from "+table+" where userid=? and picid>? limit 1", + () -> readArguments(rand.nextInt(count), 1)); + } + + @Benchmark + public Object readReversed() throws Throwable + { + return performRead("SELECT * from "+table+" where userid=? and picid<? order by picid desc limit 1", + () -> readArguments(rand.nextInt(count), -1)); + } +} diff --git a/test/unit/org/apache/cassandra/ServerTestUtils.java b/test/unit/org/apache/cassandra/ServerTestUtils.java index 5794dcb..221a23a 100644 --- a/test/unit/org/apache/cassandra/ServerTestUtils.java +++ b/test/unit/org/apache/cassandra/ServerTestUtils.java @@ -149,7 +149,10 @@ public final class ServerTestUtils { // clean up commitlog cleanupDirectory(DatabaseDescriptor.getCommitLogLocation()); - cleanupDirectory(DatabaseDescriptor.getCDCLogLocation()); + + String cdcDir = DatabaseDescriptor.getCDCLogLocation(); + if (cdcDir != null) + cleanupDirectory(cdcDir); cleanupDirectory(DatabaseDescriptor.getHintsDirectory()); cleanupSavedCaches(); diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 625fbe1..beff1e4 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -632,9 +632,9 @@ public abstract class CQLTester this.usePrepared = USE_PREPARED_VALUES; } - protected void disablePreparedReuseForTest() + public static void disablePreparedReuseForTest() { - this.reusePrepared = false; + reusePrepared = false; } protected String createType(String query) diff --git a/test/unit/org/apache/cassandra/cql3/MemtableSizeTest.java b/test/unit/org/apache/cassandra/cql3/MemtableSizeTest.java new file mode 100644 index 0000000..dbe0498 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/MemtableSizeTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.ObjectSizes; + +public class MemtableSizeTest extends CQLTester +{ + static String keyspace; + String table; + ColumnFamilyStore cfs; + + int partitions = 50_000; + int rowsPerPartition = 4; + + int deletedPartitions = 10_000; + int deletedRows = 5_000; + + // must be within 50 bytes per partition of the actual size + final int MAX_DIFFERENCE = (partitions + deletedPartitions + deletedRows) * 50; + + @BeforeClass + public static void setUp() + { + CQLTester.setUpClass(); + CQLTester.prepareServer(); + CQLTester.disablePreparedReuseForTest(); + System.err.println("setupClass done."); + } + + @Test + public void testSize() throws Throwable + { + keyspace = createKeyspace("CREATE KEYSPACE %s with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } and durable_writes = false"); + table = createTable(keyspace, "CREATE TABLE %s ( userid bigint, picid bigint, commentid bigint, PRIMARY KEY(userid, picid)) with compression = {'enabled': false}"); + execute("use " + keyspace + ';'); + + String writeStatement = "INSERT INTO "+table+"(userid,picid,commentid)VALUES(?,?,?)"; + + cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + cfs.disableAutoCompaction(); + cfs.forceBlockingFlush(); + + long deepSizeBefore = ObjectSizes.measureDeep(cfs.getTracker().getView().getCurrentMemtable()); + System.out.printf("Memtable deep size before %s\n%n", + FBUtilities.prettyPrintMemory(deepSizeBefore)); + long i; + long limit = partitions; + System.out.println("Writing " + partitions + " partitions of " + rowsPerPartition + " rows"); + for (i = 0; i < limit; ++i) + { + for (long j = 0; j < rowsPerPartition; ++j) + execute(writeStatement, i, j, i + j); + } + + System.out.println("Deleting " + deletedPartitions + " partitions"); + limit += deletedPartitions; + for (; i < limit; ++i) + { + // no partition exists, but we will create a tombstone + execute("DELETE FROM " + table + " WHERE userid = ?", i); + } + + System.out.println("Deleting " + deletedRows + " rows"); + limit += deletedRows; + for (; i < limit; ++i) + { + // no row exists, but we will create a tombstone (and partition) + execute("DELETE FROM " + table + " WHERE userid = ? AND picid = ?", i, 0L); + } + + + if (!cfs.getLiveSSTables().isEmpty()) + System.out.println("Warning: " + cfs.getLiveSSTables().size() + " sstables created."); + + Memtable memtable = cfs.getTracker().getView().getCurrentMemtable(); + long actualHeap = memtable.getAllocator().onHeap().owns(); + System.out.printf("Memtable in %s mode: %d ops, %s serialized bytes, %s (%.0f%%) on heap, %s (%.0f%%) off-heap%n", + DatabaseDescriptor.getMemtableAllocationType(), + memtable.getOperations(), + FBUtilities.prettyPrintMemory(memtable.getLiveDataSize()), + FBUtilities.prettyPrintMemory(actualHeap), + 100 * memtable.getAllocator().onHeap().ownershipRatio(), + FBUtilities.prettyPrintMemory(memtable.getAllocator().offHeap().owns()), + 100 * memtable.getAllocator().offHeap().ownershipRatio()); + + long deepSizeAfter = ObjectSizes.measureDeep(memtable); + System.out.printf("Memtable deep size %s\n%n", + FBUtilities.prettyPrintMemory(deepSizeAfter)); + + long expectedHeap = deepSizeAfter - deepSizeBefore; + String message = String.format("Expected heap usage close to %s, got %s.\n", + FBUtilities.prettyPrintMemory(expectedHeap), + FBUtilities.prettyPrintMemory(actualHeap)); + System.out.println(message); + Assert.assertTrue(message, Math.abs(actualHeap - expectedHeap) <= MAX_DIFFERENCE); + } + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org