This is an automated email from the ASF dual-hosted git repository. clohfink pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 7df67ef Reduce heap pressure during compactions Patch by Chris Lohfink; Reviewed by Dinesh Joshi and Benedict for CASSANDRA-14654 7df67ef is described below commit 7df67eff2d66dba4bed2b4f6aeabf05144d9b057 Author: Chris Lohfink <clohf...@apple.com> AuthorDate: Wed May 15 08:55:31 2019 -0500 Reduce heap pressure during compactions Patch by Chris Lohfink; Reviewed by Dinesh Joshi and Benedict for CASSANDRA-14654 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/config/Config.java | 4 +- .../cassandra/config/DatabaseDescriptor.java | 14 +- .../apache/cassandra/db/rows/EncodingStats.java | 39 ++++- .../cassandra/db/rows/UnfilteredRowIterators.java | 10 +- .../io/sstable/SSTableIdentityIterator.java | 2 +- .../cassandra/io/sstable/SSTableRewriter.java | 20 +-- .../cassandra/io/sstable/format/SSTableReader.java | 24 +-- .../io/sstable/format/big/BigTableReader.java | 6 +- .../io/sstable/metadata/StatsMetadata.java | 4 + .../apache/cassandra/service/StorageService.java | 21 ++- .../cassandra/service/StorageServiceMBean.java | 6 + .../unit/org/apache/cassandra/db/KeyCacheTest.java | 2 +- .../apache/cassandra/db/lifecycle/TrackerTest.java | 13 +- .../cassandra/db/rows/EncodingStatsTest.java | 170 +++++++++++++++++++++ .../db/streaming/CassandraStreamManagerTest.java | 6 +- .../CompressedSequentialWriterReopenTest.java | 3 +- .../org/apache/cassandra/schema/MockSchema.java | 39 ++++- 18 files changed, 320 insertions(+), 64 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 960ed64..3a98fa5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Reduce heap pressure during compactions (CASSANDRA-14654) * Support building Cassandra with JDK 11 (CASSANDRA-15108) * Use quilt to patch cassandra.in.sh in Debian packaging (CASSANDRA-14710) * Take sstable references before calculating approximate key count (CASSANDRA-14647) diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 04ac608..a6050be 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -253,12 +253,14 @@ public class Config public int hints_flush_period_in_ms = 10000; public int max_hints_file_size_in_mb = 128; public ParameterizedClass hints_compression; - public int sstable_preemptive_open_interval_in_mb = 50; public volatile boolean incremental_backups = false; public boolean trickle_fsync = false; public int trickle_fsync_interval_in_kb = 10240; + public volatile int sstable_preemptive_open_interval_in_mb = 50; + + public volatile boolean key_cache_migrate_during_compaction = true; public Long key_cache_size_in_mb = null; public volatile int key_cache_save_period = 14400; public volatile int key_cache_keys_to_save = Integer.MAX_VALUE; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index e2c2ace..b3ab054 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -2241,11 +2241,21 @@ public class DatabaseDescriptor return conf.commitlog_total_space_in_mb; } - public static int getSSTablePreempiveOpenIntervalInMB() + public static boolean shouldMigrateKeycacheOnCompaction() + { + return conf.key_cache_migrate_during_compaction; + } + + public static void setMigrateKeycacheOnCompaction(boolean migrateCacheEntry) + { + conf.key_cache_migrate_during_compaction = migrateCacheEntry; + } + + public static int getSSTablePreemptiveOpenIntervalInMB() { return FBUtilities.isWindows ? -1 : conf.sstable_preemptive_open_interval_in_mb; } - public static void setSSTablePreempiveOpenIntervalInMB(int mb) + public static void setSSTablePreemptiveOpenIntervalInMB(int mb) { conf.sstable_preemptive_open_interval_in_mb = mb; } diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java b/src/java/org/apache/cassandra/db/rows/EncodingStats.java index 955ffc7..4a7bb19 100644 --- a/src/java/org/apache/cassandra/db/rows/EncodingStats.java +++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java @@ -19,6 +19,9 @@ package org.apache.cassandra.db.rows; import java.io.IOException; import java.util.*; +import java.util.function.Function; + +import com.google.common.collect.Iterables; import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; @@ -41,7 +44,7 @@ public class EncodingStats { // Default values for the timestamp, deletion time and ttl. We use this both for NO_STATS, but also to serialize // an EncodingStats. Basically, we encode the diff of each value of to these epoch, which give values with better vint encoding. - private static final long TIMESTAMP_EPOCH; + public static final long TIMESTAMP_EPOCH; private static final int DELETION_TIME_EPOCH; private static final int TTL_EPOCH = 0; static @@ -93,20 +96,42 @@ public class EncodingStats public EncodingStats mergeWith(EncodingStats that) { long minTimestamp = this.minTimestamp == TIMESTAMP_EPOCH - ? that.minTimestamp - : (that.minTimestamp == TIMESTAMP_EPOCH ? this.minTimestamp : Math.min(this.minTimestamp, that.minTimestamp)); + ? that.minTimestamp + : (that.minTimestamp == TIMESTAMP_EPOCH ? this.minTimestamp : Math.min(this.minTimestamp, that.minTimestamp)); int minDelTime = this.minLocalDeletionTime == DELETION_TIME_EPOCH - ? that.minLocalDeletionTime - : (that.minLocalDeletionTime == DELETION_TIME_EPOCH ? this.minLocalDeletionTime : Math.min(this.minLocalDeletionTime, that.minLocalDeletionTime)); + ? that.minLocalDeletionTime + : (that.minLocalDeletionTime == DELETION_TIME_EPOCH ? this.minLocalDeletionTime : Math.min(this.minLocalDeletionTime, that.minLocalDeletionTime)); int minTTL = this.minTTL == TTL_EPOCH - ? that.minTTL - : (that.minTTL == TTL_EPOCH ? this.minTTL : Math.min(this.minTTL, that.minTTL)); + ? that.minTTL + : (that.minTTL == TTL_EPOCH ? this.minTTL : Math.min(this.minTTL, that.minTTL)); return new EncodingStats(minTimestamp, minDelTime, minTTL); } + /** + * Merge one or more EncodingStats, that are lazily materialized from some list of arbitrary type by the provided function + */ + public static <V, F extends Function<V, EncodingStats>> EncodingStats merge(List<V> values, F function) + { + if (values.size() == 1) + return function.apply(values.get(0)); + + Collector collector = new Collector(); + for (V v : values) + { + EncodingStats stats = function.apply(v); + if (stats.minTimestamp != TIMESTAMP_EPOCH) + collector.updateTimestamp(stats.minTimestamp); + if(stats.minLocalDeletionTime != DELETION_TIME_EPOCH) + collector.updateLocalDeletionTime(stats.minLocalDeletionTime); + if(stats.minTTL != TTL_EPOCH) + collector.updateTTL(stats.minTTL); + } + return collector.get(); + } + @Override public boolean equals(Object o) { diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java index 42807a2..21e1954 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@ -403,7 +403,7 @@ public abstract class UnfilteredRowIterators columns, mergeStaticRows(iterators, columns.statics, listener, partitionDeletion), reversed, - mergeStats(iterators)); + EncodingStats.merge(iterators, UnfilteredRowIterator::stats)); this.mergeIterator = MergeIterator.get(iterators, reversed ? metadata.comparator.reversed() : metadata.comparator, @@ -512,14 +512,6 @@ public abstract class UnfilteredRowIterators : new RegularAndStaticColumns(statics, regulars); } - private static EncodingStats mergeStats(List<UnfilteredRowIterator> iterators) - { - EncodingStats stats = EncodingStats.NO_STATS; - for (UnfilteredRowIterator iter : iterators) - stats = stats.mergeWith(iter.stats()); - return stats; - } - protected Unfiltered computeNext() { while (mergeIterator.hasNext()) diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java index a49e7b4..1846aa5 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java @@ -190,7 +190,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat { // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see // SerializationHeader.make() for details) so we use the latter instead. - return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL()); + return sstable.stats(); } public int compareTo(SSTableIdentityIterator o) diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java index a71d1af..fb3aa2d 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java @@ -32,7 +32,6 @@ import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; -import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.utils.NativeLibrary; import org.apache.cassandra.utils.concurrent.Transactional; @@ -117,7 +116,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme private static long calculateOpenInterval(boolean shouldOpenEarly) { - long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20); + long interval = DatabaseDescriptor.getSSTablePreemptiveOpenIntervalInMB() * (1L << 20); if (disableEarlyOpeningForTests || !shouldOpenEarly || interval < 0) interval = Long.MAX_VALUE; return interval; @@ -134,14 +133,17 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme DecoratedKey key = partition.partitionKey(); maybeReopenEarly(key); RowIndexEntry index = writer.append(partition); - if (!transaction.isOffline() && index != null) + if (DatabaseDescriptor.shouldMigrateKeycacheOnCompaction()) { - for (SSTableReader reader : transaction.originals()) + if (!transaction.isOffline() && index != null) { - if (reader.getCachedPosition(key, false) != null) + for (SSTableReader reader : transaction.originals()) { - cachedKeys.put(key, index); - break; + if (reader.getCachedPosition(key, false) != null) + { + cachedKeys.put(key, index); + break; + } } } } @@ -223,9 +225,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme */ private void moveStarts(SSTableReader newReader, DecoratedKey lowerbound) { - if (transaction.isOffline()) - return; - if (preemptiveOpenInterval == Long.MAX_VALUE) + if (transaction.isOffline() || preemptiveOpenInterval == Long.MAX_VALUE) return; newReader.setupOnline(); diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index ca6eb85..e3059c8 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -699,17 +699,15 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS // under normal operation we can do this at any time, but SSTR is also used outside C* proper, // e.g. by BulkLoader, which does not initialize the cache. As a kludge, we set up the cache // here when we know we're being wired into the rest of the server infrastructure. - keyCache = CacheService.instance.keyCache; + InstrumentingCache<KeyCacheKey, RowIndexEntry> maybeKeyCache = CacheService.instance.keyCache; + if (maybeKeyCache.getCapacity() > 0) + keyCache = maybeKeyCache; + final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata().id); if (cfs != null) setCrcCheckChance(cfs.getCrcCheckChance()); } - public boolean isKeyCacheSetup() - { - return keyCache != null; - } - /** * See {@link #load(boolean, boolean)} * @param validation Metadata for SSTable being loaded @@ -1534,12 +1532,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats) { - return getCachedPosition(new KeyCacheKey(metadata(), descriptor, key.getKey()), updateStats); + if (isKeyCacheEnabled()) + return getCachedPosition(new KeyCacheKey(metadata(), descriptor, key.getKey()), updateStats); + return null; } protected RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats) { - if (keyCacheEnabled()) + if (isKeyCacheEnabled()) { if (updateStats) { @@ -1560,9 +1560,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS return null; } - private boolean keyCacheEnabled() + public boolean isKeyCacheEnabled() { - return keyCache != null && keyCache.getCapacity() > 0 && metadata().params.caching.cacheKeys(); + return keyCache != null && metadata().params.caching.cacheKeys(); } /** @@ -1830,7 +1830,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS // hint read path about key location if caching is enabled // this saves index summary lookup and index file iteration which whould be pretty costly // especially in presence of promoted column indexes - if (isKeyCacheSetup()) + if (isKeyCacheEnabled()) cacheKey(key, rowIndexEntrySerializer.deserialize(in, in.getFilePointer())); } @@ -2101,7 +2101,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS { // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see // SerializationHeader.make() for details) so we use the latter instead. - return new EncodingStats(getMinTimestamp(), getMinLocalDeletionTime(), getMinTTL()); + return sstableMetadata.encodingStats; } public Ref<SSTableReader> tryRef() diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java index a4787a2..c9ae431 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java @@ -24,7 +24,6 @@ import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.cache.KeyCacheKey; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.SSTableIterator; import org.apache.cassandra.db.columniterator.SSTableReversedIterator; @@ -155,9 +154,8 @@ public class BigTableReader extends SSTableReader // next, the key cache (only make sense for valid row key) if ((op == Operator.EQ || op == Operator.GE) && (key instanceof DecoratedKey)) { - DecoratedKey decoratedKey = (DecoratedKey)key; - KeyCacheKey cacheKey = new KeyCacheKey(metadata(), descriptor, decoratedKey.getKey()); - RowIndexEntry cachedPosition = getCachedPosition(cacheKey, updateCacheAndStats); + DecoratedKey decoratedKey = (DecoratedKey) key; + RowIndexEntry cachedPosition = getCachedPosition(decoratedKey, updateCacheAndStats); if (cachedPosition != null) { listener.onSSTableSelected(this, cachedPosition, SelectionReason.KEY_CACHE_HIT); diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java index 5b06ada..5d464fe 100755 --- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.io.sstable.format.Version; import org.apache.commons.lang3.builder.EqualsBuilder; @@ -65,6 +66,8 @@ public class StatsMetadata extends MetadataComponent public final long totalRows; public final UUID pendingRepair; public final boolean isTransient; + // just holds the current encoding stats to avoid allocating - it is not serialized + public final EncodingStats encodingStats; public StatsMetadata(EstimatedHistogram estimatedPartitionSize, EstimatedHistogram estimatedColumnCount, @@ -107,6 +110,7 @@ public class StatsMetadata extends MetadataComponent this.totalRows = totalRows; this.pendingRepair = pendingRepair; this.isTransient = isTransient; + this.encodingStats = new EncodingStats(minTimestamp, minLocalDeletionTime, minTTL); } public MetadataType getType() diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 2707b85..eade7dd 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -18,7 +18,6 @@ package org.apache.cassandra.service; import java.io.*; -import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; @@ -5347,6 +5346,26 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return DatabaseDescriptor.getPartitionerName(); } + public void setSSTablePreemptiveOpenIntervalInMB(int intervalInMB) + { + DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMB(intervalInMB); + } + + public int getSSTablePreemptiveOpenIntervalInMB() + { + return DatabaseDescriptor.getSSTablePreemptiveOpenIntervalInMB(); + } + + public boolean getMigrateKeycacheOnCompaction() + { + return DatabaseDescriptor.shouldMigrateKeycacheOnCompaction(); + } + + public void setMigrateKeycacheOnCompaction(boolean invalidateKeyCacheOnCompaction) + { + DatabaseDescriptor.setMigrateKeycacheOnCompaction(invalidateKeyCacheOnCompaction); + } + public int getTombstoneWarnThreshold() { return DatabaseDescriptor.getTombstoneWarnThreshold(); diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index e74f002..3fef7f6 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -577,6 +577,12 @@ public interface StorageServiceMBean extends NotificationEmitter public int getConcurrentValidators(); public void setConcurrentValidators(int value); + public int getSSTablePreemptiveOpenIntervalInMB(); + public void setSSTablePreemptiveOpenIntervalInMB(int intervalInMB); + + public boolean getMigrateKeycacheOnCompaction(); + public void setMigrateKeycacheOnCompaction(boolean invalidateKeyCacheOnCompaction); + public int getConcurrentViewBuilders(); public void setConcurrentViewBuilders(int value); diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java index d7a31d6..1819b18 100644 --- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java +++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java @@ -287,7 +287,7 @@ public class KeyCacheTest throw new IllegalStateException(); Util.compactAll(cfs, Integer.MAX_VALUE).get(); - boolean noEarlyOpen = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() < 0; + boolean noEarlyOpen = DatabaseDescriptor.getSSTablePreemptiveOpenIntervalInMB() < 0; // after compaction cache should have entries for new SSTables, // but since we have kept a reference to the old sstables, diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java index 2891126..522e59a 100644 --- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java @@ -43,6 +43,7 @@ import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.notifications.*; +import org.apache.cassandra.schema.CachingParams; import org.apache.cassandra.schema.MockSchema; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -146,7 +147,7 @@ public class TrackerTest @Test public void testAddInitialSSTables() { - ColumnFamilyStore cfs = MockSchema.newCFS(); + ColumnFamilyStore cfs = MockSchema.newCFS(metadata -> metadata.caching(CachingParams.CACHE_KEYS)); Tracker tracker = cfs.getTracker(); List<SSTableReader> readers = ImmutableList.of(MockSchema.sstable(0, 17, cfs), MockSchema.sstable(1, 121, cfs), @@ -156,7 +157,7 @@ public class TrackerTest Assert.assertEquals(3, tracker.view.get().sstables.size()); for (SSTableReader reader : readers) - Assert.assertTrue(reader.isKeyCacheSetup()); + Assert.assertTrue(reader.isKeyCacheEnabled()); Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount()); } @@ -166,7 +167,7 @@ public class TrackerTest { boolean backups = DatabaseDescriptor.isIncrementalBackupsEnabled(); DatabaseDescriptor.setIncrementalBackupsEnabled(false); - ColumnFamilyStore cfs = MockSchema.newCFS(); + ColumnFamilyStore cfs = MockSchema.newCFS(metadata -> metadata.caching(CachingParams.CACHE_KEYS)); Tracker tracker = cfs.getTracker(); MockListener listener = new MockListener(false); tracker.subscribe(listener); @@ -178,7 +179,7 @@ public class TrackerTest Assert.assertEquals(3, tracker.view.get().sstables.size()); for (SSTableReader reader : readers) - Assert.assertTrue(reader.isKeyCacheSetup()); + Assert.assertTrue(reader.isKeyCacheEnabled()); Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount()); Assert.assertEquals(1, listener.senders.size()); @@ -263,7 +264,7 @@ public class TrackerTest { boolean backups = DatabaseDescriptor.isIncrementalBackupsEnabled(); DatabaseDescriptor.setIncrementalBackupsEnabled(false); - ColumnFamilyStore cfs = MockSchema.newCFS(); + ColumnFamilyStore cfs = MockSchema.newCFS(metadata -> metadata.caching(CachingParams.CACHE_KEYS)); MockListener listener = new MockListener(false); Tracker tracker = cfs.getTracker(); tracker.subscribe(listener); @@ -308,7 +309,7 @@ public class TrackerTest Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(1)).added); Assert.assertEquals(Optional.of(prev2), ((SSTableAddedNotification) listener.received.get(1)).memtable()); listener.received.clear(); - Assert.assertTrue(reader.isKeyCacheSetup()); + Assert.assertTrue(reader.isKeyCacheEnabled()); Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount()); // test invalidated CFS diff --git a/test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java b/test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java new file mode 100644 index 0000000..1ac092a --- /dev/null +++ b/test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.rows; + +import java.util.function.Function; + +import com.google.common.collect.ImmutableList; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.db.LivenessInfo; + +import static org.quicktheories.QuickTheory.qt; +import static org.quicktheories.generators.SourceDSL.integers; +import static org.quicktheories.generators.SourceDSL.longs; + +public class EncodingStatsTest +{ + @Test + public void testCollectWithNoStats() + { + EncodingStats none = EncodingStats.merge(ImmutableList.of( + EncodingStats.NO_STATS, + EncodingStats.NO_STATS, + EncodingStats.NO_STATS + ), Function.identity()); + Assert.assertEquals(none, EncodingStats.NO_STATS); + } + + @Test + public void testCollectWithNoStatsWithEmpty() + { + EncodingStats none = EncodingStats.merge(ImmutableList.of( + EncodingStats.NO_STATS, + EncodingStats.NO_STATS, + new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME, 0) + ), Function.identity()); + Assert.assertEquals(none, EncodingStats.NO_STATS); + } + + @Test + public void testCollectWithNoStatsWithTimestamp() + { + EncodingStats single = new EncodingStats(1, LivenessInfo.NO_EXPIRATION_TIME, 0); + EncodingStats result = EncodingStats.merge(ImmutableList.of( + EncodingStats.NO_STATS, + EncodingStats.NO_STATS, + single, + EncodingStats.NO_STATS + ), Function.identity()); + Assert.assertEquals(single, result); + } + + @Test + public void testCollectWithNoStatsWithExpires() + { + EncodingStats single = new EncodingStats(LivenessInfo.NO_TIMESTAMP, 1, 0); + EncodingStats result = EncodingStats.merge(ImmutableList.of( + EncodingStats.NO_STATS, + single, + EncodingStats.NO_STATS + ), Function.identity()); + Assert.assertEquals(single, result); + } + + @Test + public void testCollectWithNoStatsWithTTL() + { + EncodingStats single = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME, 1); + EncodingStats result = EncodingStats.merge(ImmutableList.of( + EncodingStats.NO_STATS, + single, + EncodingStats.NO_STATS + ), Function.identity()); + Assert.assertEquals(single, result); + } + + @Test + public void testCollectOneEach() + { + EncodingStats tsp = new EncodingStats(1, LivenessInfo.NO_EXPIRATION_TIME, 0); + EncodingStats exp = new EncodingStats(LivenessInfo.NO_TIMESTAMP, 1, 0); + EncodingStats ttl = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME, 1); + EncodingStats result = EncodingStats.merge(ImmutableList.of( + tsp, + exp, + ttl + ), Function.identity()); + Assert.assertEquals(new EncodingStats(1, 1, 1), result); + } + + @Test + public void testTimestamp() + { + EncodingStats one = new EncodingStats(1, LivenessInfo.NO_EXPIRATION_TIME, 0); + EncodingStats two = new EncodingStats(2, LivenessInfo.NO_EXPIRATION_TIME, 0); + EncodingStats thr = new EncodingStats(3, LivenessInfo.NO_EXPIRATION_TIME, 0); + EncodingStats result = EncodingStats.merge(ImmutableList.of( + one, + two, + thr + ), Function.identity()); + Assert.assertEquals(one, result); + } + + @Test + public void testExpires() + { + EncodingStats one = new EncodingStats(LivenessInfo.NO_TIMESTAMP,1, 0); + EncodingStats two = new EncodingStats(LivenessInfo.NO_TIMESTAMP,2, 0); + EncodingStats thr = new EncodingStats(LivenessInfo.NO_TIMESTAMP,3, 0); + EncodingStats result = EncodingStats.merge(ImmutableList.of( + one, + two, + thr + ), Function.identity()); + Assert.assertEquals(one, result); + } + + @Test + public void testTTL() + { + EncodingStats one = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME,1); + EncodingStats two = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME,2); + EncodingStats thr = new EncodingStats(LivenessInfo.NO_TIMESTAMP, LivenessInfo.NO_EXPIRATION_TIME,3); + EncodingStats result = EncodingStats.merge(ImmutableList.of( + thr, + one, + two + ), Function.identity()); + Assert.assertEquals(one, result); + } + + @Test + public void testEncodingStatsCollectWithNone() + { + qt().forAll(longs().between(Long.MIN_VALUE+1, Long.MAX_VALUE), + integers().between(0, Integer.MAX_VALUE-1), + integers().allPositive()) + .asWithPrecursor(EncodingStats::new) + .check((timestamp, expires, ttl, stats) -> + { + EncodingStats result = EncodingStats.merge(ImmutableList.of( + EncodingStats.NO_STATS, + stats, + EncodingStats.NO_STATS + ), Function.identity()); + return result.minTTL == ttl + && result.minLocalDeletionTime == expires + && result.minTimestamp == timestamp; + }); + } + +} diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java index b597bfe..eb15e9a 100644 --- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java +++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java @@ -33,7 +33,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; -import org.apache.cassandra.locator.EndpointsForRange; + import org.apache.cassandra.locator.RangesAtEndpoint; import org.junit.Assert; import org.junit.Before; @@ -195,7 +195,7 @@ public class CassandraStreamManagerTest Collection<SSTableReader> allSSTables = cfs.getLiveSSTables(); Assert.assertEquals(1, allSSTables.size()); final Token firstToken = allSSTables.iterator().next().first.getToken(); - DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1); + DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMB(1); Set<SSTableReader> sstablesBeforeRewrite = getReadersForRange(new Range<>(firstToken, firstToken)); Assert.assertEquals(1, sstablesBeforeRewrite.size()); @@ -227,7 +227,7 @@ public class CassandraStreamManagerTest } finally { - DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(50); + DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMB(50); done.set(true); t.join(20); } diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java index 1bc3454..461c13c 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java @@ -28,7 +28,6 @@ import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQLTester; -import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.ByteBufferUtil; import static org.junit.Assert.assertEquals; @@ -85,7 +84,7 @@ public class CompressedSequentialWriterReopenTest extends CQLTester execute("insert into %s (id, t) values (?, ?)", i, ByteBuffer.wrap(blob)); } getCurrentColumnFamilyStore().forceBlockingFlush(); - DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1); + DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMB(1); getCurrentColumnFamilyStore().forceMajorCompaction(); } diff --git a/test/unit/org/apache/cassandra/schema/MockSchema.java b/test/unit/org/apache/cassandra/schema/MockSchema.java index 9ca2d6e..dfa8731 100644 --- a/test/unit/org/apache/cassandra/schema/MockSchema.java +++ b/test/unit/org/apache/cassandra/schema/MockSchema.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; import com.google.common.collect.ImmutableSet; @@ -148,20 +150,47 @@ public class MockSchema public static ColumnFamilyStore newCFS(String ksname) { - String cfname = "mockcf" + (id.incrementAndGet()); - TableMetadata metadata = newTableMetadata(ksname, cfname); - return new ColumnFamilyStore(ks, cfname, 0, new TableMetadataRef(metadata), new Directories(metadata), false, false, false); + return newCFS(newTableMetadata(ksname)); + } + + public static ColumnFamilyStore newCFS(Function<TableMetadata.Builder, TableMetadata.Builder> options) + { + return newCFS(ks.getName(), options); + } + + public static ColumnFamilyStore newCFS(String ksname, Function<TableMetadata.Builder, TableMetadata.Builder> options) + { + return newCFS(options.apply(newTableMetadataBuilder(ksname)).build()); + } + + public static ColumnFamilyStore newCFS(TableMetadata metadata) + { + return new ColumnFamilyStore(ks, metadata.name, 0, new TableMetadataRef(metadata), new Directories(metadata), false, false, false); + } + + public static TableMetadata newTableMetadata(String ksname) + { + return newTableMetadata(ksname, "mockcf" + (id.incrementAndGet())); } public static TableMetadata newTableMetadata(String ksname, String cfname) { + return newTableMetadataBuilder(ksname, cfname).build(); + } + + public static TableMetadata.Builder newTableMetadataBuilder(String ksname) + { + return newTableMetadataBuilder(ksname, "mockcf" + (id.incrementAndGet())); + } + + public static TableMetadata.Builder newTableMetadataBuilder(String ksname, String cfname) + { return TableMetadata.builder(ksname, cfname) .partitioner(Murmur3Partitioner.instance) .addPartitionKeyColumn("key", UTF8Type.instance) .addClusteringColumn("col", UTF8Type.instance) .addRegularColumn("value", UTF8Type.instance) - .caching(CachingParams.CACHE_NOTHING) - .build(); + .caching(CachingParams.CACHE_NOTHING); } public static BufferDecoratedKey readerBounds(long generation) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org