This is an automated email from the ASF dual-hosted git repository. jolynch pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push: new 1911a88 Fix slow keycache load which blocks startup for tables with many sstables. 1911a88 is described below commit 1911a887e8316d343c9bfe3aca3f9d143e9f4a61 Author: Venkata Harikrishna Nukala <n.v.harikrishna.apa...@gmail.com> AuthorDate: Sat Oct 23 00:03:45 2021 +0530 Fix slow keycache load which blocks startup for tables with many sstables. Patch by Venkata Harikrishna Nukala; reviewed by Marcus Eriksson and Joseph Lynch for CASSANDRA-14898 --- CHANGES.txt | 1 + build.xml | 4 +- conf/cassandra.yaml | 7 +- .../apache/cassandra/cache/AutoSavingCache.java | 8 +- src/java/org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 11 ++ .../org/apache/cassandra/db/lifecycle/View.java | 2 +- .../org/apache/cassandra/service/CacheService.java | 38 ++++-- .../test/microbench/CacheLoaderBench.java | 137 +++++++++++++++++++++ .../unit/org/apache/cassandra/db/KeyCacheTest.java | 135 +++++++++++++++++++- 10 files changed, 330 insertions(+), 15 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index cf6a956..085e1ff 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.26: + * Fix slow keycache load which blocks startup for tables with many sstables (CASSANDRA-14898) * Fix rare NPE caused by batchlog replay / node decomission races (CASSANDRA-17049) * Allow users to view permissions of the roles they created (CASSANDRA-16902) * Fix failure handling in inter-node communication (CASSANDRA-16334) diff --git a/build.xml b/build.xml index b42c8ea..41f530d 100644 --- a/build.xml +++ b/build.xml @@ -375,8 +375,8 @@ <dependency groupId="net.bytebuddy" artifactId="byte-buddy" version="${bytebuddy.version}" /> <dependency groupId="net.bytebuddy" artifactId="byte-buddy-agent" version="${bytebuddy.version}" /> - <dependency groupId="org.openjdk.jmh" artifactId="jmh-core" version="1.1.1" scope="test"/> - <dependency groupId="org.openjdk.jmh" artifactId="jmh-generator-annprocess" version="1.1.1" scope="test"/> + <dependency groupId="org.openjdk.jmh" artifactId="jmh-core" version="1.21" scope="test"/> + <dependency groupId="org.openjdk.jmh" artifactId="jmh-generator-annprocess" version="1.21" scope="test"/> <dependency groupId="org.apache.cassandra" artifactId="cassandra-all" version="${version}" /> <!--dependency groupId="org.apache.cassandra" artifactId="cassandra-thrift" version="${version}" scope="provided"/--> diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index cc2a369..ec2157b 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -286,7 +286,12 @@ counter_cache_save_period: 7200 # If not set, the default directory is $CASSANDRA_HOME/data/saved_caches. # saved_caches_directory: /var/lib/cassandra/saved_caches -# commitlog_sync may be either "periodic" or "batch." +# Number of seconds the server will wait for each cache (row, key, etc ...) to load while starting +# the Cassandra process. Setting this to a negative value is equivalent to disabling all cache loading on startup +# while still having the cache during runtime. +# cache_load_timeout_seconds: 30 + +# commitlog_sync may be either "periodic" or "batch." # # When in batch mode, Cassandra won't ack writes until the commit log # has been fsynced to disk. It will wait diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index 3da6352..3b7a6b8 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -198,8 +198,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K + " does not match current schema version " + Schema.instance.getVersion()); - ArrayDeque<Future<Pair<K, V>>> futures = new ArrayDeque<Future<Pair<K, V>>>(); - while (in.available() > 0) + ArrayDeque<Future<Pair<K, V>>> futures = new ArrayDeque<>(); + long loadByNanos = start + TimeUnit.SECONDS.toNanos(DatabaseDescriptor.getCacheLoadTimeout()); + while (System.nanoTime() < loadByNanos && in.available() > 0) { //ksname and cfname are serialized by the serializers in CacheService //That is delegated there because there are serializer specific conditions @@ -257,6 +258,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K finally { FileUtils.closeQuietly(in); + cacheLoader.cleanupAfterDeserialize(); } } if (logger.isTraceEnabled()) @@ -433,5 +435,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K void serialize(K key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException; Future<Pair<K, V>> deserialize(DataInputPlus in, ColumnFamilyStore cfs) throws IOException; + + default void cleanupAfterDeserialize() { } } } diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index a835e5a..9e30306 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -254,6 +254,8 @@ public class Config public volatile int counter_cache_save_period = 7200; public volatile int counter_cache_keys_to_save = Integer.MAX_VALUE; + public int cache_load_timeout_seconds = 30; + private static boolean isClientMode = false; private static Supplier<Config> overrideLoadConfig = null; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index b7708cd..1795c98 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1979,6 +1979,17 @@ public class DatabaseDescriptor conf.counter_cache_save_period = counterCacheSavePeriod; } + public static int getCacheLoadTimeout() + { + return conf.cache_load_timeout_seconds; + } + + @VisibleForTesting + public static void setCacheLoadTimeout(int seconds) + { + conf.cache_load_timeout_seconds = seconds; + } + public static int getCounterCacheKeysToSave() { return conf.counter_cache_keys_to_save; diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java index 4b3aae0..c33f305 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/View.java +++ b/src/java/org/apache/cassandra/db/lifecycle/View.java @@ -137,7 +137,7 @@ public class View case NONCOMPACTING: return filter(sstables, (s) -> !compacting.contains(s)); case CANONICAL: - Set<SSTableReader> canonicalSSTables = new HashSet<>(); + Set<SSTableReader> canonicalSSTables = new HashSet<>(sstables.size() + compacting.size()); for (SSTableReader sstable : compacting) if (sstable.openReason != SSTableReader.OpenReason.EARLY) canonicalSSTables.add(sstable); diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index c4d1722..873a319 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -20,9 +20,12 @@ package org.apache.cassandra.service; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -441,6 +444,11 @@ public class CacheService implements CacheServiceMBean public static class KeyCacheSerializer implements CacheSerializer<KeyCacheKey, RowIndexEntry> { + // For column families with many SSTables the linear nature of getSSTables slowed down KeyCache loading + // by orders of magnitude. So we cache the sstables once and rely on cleanupAfterDeserialize to cleanup any + // cached state we may have accumulated during the load. + Map<Pair<String, String>, Map<Integer, SSTableReader>> cachedSSTableReaders = new ConcurrentHashMap<>(); + public void serialize(KeyCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException { //Don't serialize old format entries since we didn't bother to implement serialization of both for simplicity @@ -460,6 +468,8 @@ public class CacheService implements CacheServiceMBean public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputPlus input, ColumnFamilyStore cfs) throws IOException { + boolean skipEntry = cfs == null || !cfs.isKeyCacheEnabled(); + //Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a //parameter so they aren't deserialized here, even though they are serialized by this serializer int keyLength = input.readInt(); @@ -472,7 +482,24 @@ public class CacheService implements CacheServiceMBean int generation = input.readInt(); input.readBoolean(); // backwards compatibility for "promoted indexes" boolean SSTableReader reader = null; - if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = findDesc(generation, cfs.getSSTables(SSTableSet.CANONICAL))) == null) + if (!skipEntry) + { + Pair<String, String> qualifiedName = Pair.create(cfs.metadata.ksName, cfs.metadata.cfName); + Map<Integer, SSTableReader> generationToSSTableReader = cachedSSTableReaders.get(qualifiedName); + if (generationToSSTableReader == null) + { + generationToSSTableReader = new HashMap<>(cfs.getLiveSSTables().size()); + for (SSTableReader ssTableReader : cfs.getSSTables(SSTableSet.CANONICAL)) + { + generationToSSTableReader.put(ssTableReader.descriptor.generation, ssTableReader); + } + + cachedSSTableReaders.putIfAbsent(qualifiedName, generationToSSTableReader); + } + reader = generationToSSTableReader.get(generation); + } + + if (skipEntry || reader == null) { // The sstable doesn't exist anymore, so we can't be sure of the exact version and assume its the current version. The only case where we'll be // wrong is during upgrade, in which case we fail at deserialization. This is not a huge deal however since 1) this is unlikely enough that @@ -488,14 +515,9 @@ public class CacheService implements CacheServiceMBean return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.ksAndCFName, reader.descriptor, key), entry)); } - private SSTableReader findDesc(int generation, Iterable<SSTableReader> collection) + public void cleanupAfterDeserialize() { - for (SSTableReader sstable : collection) - { - if (sstable.descriptor.generation == generation) - return sstable; - } - return null; + cachedSSTableReaders.clear(); } } } diff --git a/test/microbench/org/apache/cassandra/test/microbench/CacheLoaderBench.java b/test/microbench/org/apache/cassandra/test/microbench/CacheLoaderBench.java new file mode 100644 index 0000000..b3a7136 --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/CacheLoaderBench.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; + +import org.apache.cassandra.Util; +import org.apache.cassandra.cache.AutoSavingCache; +import org.apache.cassandra.cache.KeyCacheKey; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.service.CacheService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +@SuppressWarnings("unused") +@BenchmarkMode(Mode.SampleTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 2, time = 2, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@Threads(1) +@State(Scope.Benchmark) +public class CacheLoaderBench extends CQLTester +{ + private static final int numSSTables = 1000; + private final Random random = new Random(); + + @Setup(Level.Trial) + public void setup() throws Throwable + { + CQLTester.prepareServer(); + String keyspace = createKeyspace("CREATE KEYSPACE %s with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } and durable_writes = false"); + String table1 = createTable(keyspace, "CREATE TABLE %s (key text PRIMARY KEY, val text)"); + String table2 = createTable(keyspace, "CREATE TABLE %s (key text PRIMARY KEY, val text)"); + + + Keyspace.system().forEach(k -> k.getColumnFamilyStores().forEach(ColumnFamilyStore::disableAutoCompaction)); + + ColumnFamilyStore cfs1 = Keyspace.open(keyspace).getColumnFamilyStore(table1); + ColumnFamilyStore cfs2 = Keyspace.open(keyspace).getColumnFamilyStore(table2); + cfs1.disableAutoCompaction(); + cfs2.disableAutoCompaction(); + + // Write a bunch of sstables to both cfs1 and cfs2 + + List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(2); + columnFamilyStores.add(cfs1); + columnFamilyStores.add(cfs2); + + logger.info("Creating {} sstables", numSSTables); + for (ColumnFamilyStore cfs: columnFamilyStores) + { + cfs.truncateBlocking(); + for (int i = 0; i < numSSTables ; i++) + { + ColumnDefinition colDef = ColumnDefinition.regularDef(cfs.metadata, ByteBufferUtil.bytes("val"), AsciiType.instance); + RowUpdateBuilder rowBuilder = new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis() + random.nextInt(), "key"); + rowBuilder.add(colDef, "val1"); + rowBuilder.build().apply(); + cfs.forceBlockingFlush(); + } + + Assert.assertEquals(numSSTables, cfs.getLiveSSTables().size()); + + // preheat key cache + for (SSTableReader sstable : cfs.getLiveSSTables()) + sstable.getPosition(Util.dk("key"), SSTableReader.Operator.EQ); + } + + AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = CacheService.instance.keyCache; + + // serialize to file + keyCache.submitWrite(keyCache.size()).get(); + } + + @Setup(Level.Invocation) + public void setupKeyCache() + { + AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = CacheService.instance.keyCache; + keyCache.clear(); + } + + @TearDown(Level.Trial) + public void teardown() + { + CQLTester.cleanup(); + CQLTester.tearDownClass(); + } + + @Benchmark + public void keyCacheLoadTest() throws Throwable + { + AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = CacheService.instance.keyCache; + + keyCache.loadSavedAsync().get(); + } +} diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java index 515d30e..9cb06b9 100644 --- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java +++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java @@ -17,9 +17,12 @@ */ package org.apache.cassandra.db; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -33,6 +36,8 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.cache.AutoSavingCache; +import org.apache.cassandra.cache.ICache; import org.apache.cassandra.cache.KeyCacheKey; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; @@ -42,18 +47,32 @@ import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.CacheService; +import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.Refs; +import org.mockito.Mockito; +import org.mockito.internal.stubbing.answers.AnswersWithDelay; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; public class KeyCacheTest { private static final String KEYSPACE1 = "KeyCacheTest1"; + private static final String KEYSPACE2 = "KeyCacheTest2"; private static final String COLUMN_FAMILY1 = "Standard1"; private static final String COLUMN_FAMILY2 = "Standard2"; private static final String COLUMN_FAMILY3 = "Standard3"; + private static final String COLUMN_FAMILY7 = "Standard7"; + private static final String COLUMN_FAMILY8 = "Standard8"; + private static final String COLUMN_FAMILY9 = "Standard9"; + + private static final String COLUMN_FAMILY_K2_1 = "Standard1"; @BeforeClass @@ -64,7 +83,15 @@ public class KeyCacheTest KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE1, COLUMN_FAMILY1), SchemaLoader.standardCFMD(KEYSPACE1, COLUMN_FAMILY2), - SchemaLoader.standardCFMD(KEYSPACE1, COLUMN_FAMILY3)); + SchemaLoader.standardCFMD(KEYSPACE1, COLUMN_FAMILY3), + SchemaLoader.standardCFMD(KEYSPACE1, COLUMN_FAMILY7), + SchemaLoader.standardCFMD(KEYSPACE1, COLUMN_FAMILY8), + SchemaLoader.standardCFMD(KEYSPACE1, COLUMN_FAMILY9)); + + SchemaLoader.createKeyspace(KEYSPACE2, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE2, COLUMN_FAMILY_K2_1)); + } @AfterClass @@ -231,6 +258,112 @@ public class KeyCacheTest assertKeyCacheSize(noEarlyOpen ? 4 : 2, KEYSPACE1, COLUMN_FAMILY1); } + @Test + public void testKeyCacheLoadNegativeCacheLoadTime() throws Exception + { + DatabaseDescriptor.setCacheLoadTimeout(-1); + String cf = COLUMN_FAMILY7; + + createAndInvalidateCache(Collections.singletonList(Pair.create(KEYSPACE1, cf)), 100); + + CacheService.instance.keyCache.loadSaved(); + + // Here max time to load cache is negative which means no time left to load cache. So the keyCache size should + // be zero after loadSaved(). + assertKeyCacheSize(0, KEYSPACE1, cf); + assertEquals(0, CacheService.instance.keyCache.size()); + } + + @Test + public void testKeyCacheLoadTwoTablesTime() throws Exception + { + DatabaseDescriptor.setCacheLoadTimeout(60); + String columnFamily1 = COLUMN_FAMILY8; + String columnFamily2 = COLUMN_FAMILY_K2_1; + int numberOfRows = 100; + List<Pair<String, String>> tables = new ArrayList<>(2); + tables.add(Pair.create(KEYSPACE1, columnFamily1)); + tables.add(Pair.create(KEYSPACE2, columnFamily2)); + + createAndInvalidateCache(tables, numberOfRows); + + CacheService.instance.keyCache.loadSaved(); + + // Here max time to load cache is negative which means no time left to load cache. So the keyCache size should + // be zero after load. + assertKeyCacheSize(numberOfRows, KEYSPACE1, columnFamily1); + assertKeyCacheSize(numberOfRows, KEYSPACE2, columnFamily2); + assertEquals(numberOfRows * tables.size(), CacheService.instance.keyCache.size()); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testKeyCacheLoadCacheLoadTimeExceedingLimit() throws Exception + { + DatabaseDescriptor.setCacheLoadTimeout(2); + int delayMillis = 1000; + int numberOfRows = 100; + + String cf = COLUMN_FAMILY9; + + createAndInvalidateCache(Collections.singletonList(Pair.create(KEYSPACE1, cf)), numberOfRows); + + // Testing cache load. Here using custom built AutoSavingCache instance as simulating delay is not possible with + // 'CacheService.instance.keyCache'. 'AutoSavingCache.loadSaved()' is returning no.of entries loaded so we don't need + // to instantiate ICache.class. + CacheService.KeyCacheSerializer keyCacheSerializer = new CacheService.KeyCacheSerializer(); + CacheService.KeyCacheSerializer keyCacheSerializerSpy = Mockito.spy(keyCacheSerializer); + AutoSavingCache autoSavingCache = new AutoSavingCache(mock(ICache.class), + CacheService.CacheType.KEY_CACHE, + keyCacheSerializerSpy); + + doAnswer(new AnswersWithDelay(delayMillis, answer -> keyCacheSerializer.deserialize(answer.getArgument(0), + answer.getArgument(1)) )) + .when(keyCacheSerializerSpy).deserialize(any(DataInputPlus.class), any(ColumnFamilyStore.class)); + + long maxExpectedKeyCache = Math.min(numberOfRows, + 1 + TimeUnit.SECONDS.toMillis(DatabaseDescriptor.getCacheLoadTimeout()) / delayMillis); + + long keysLoaded = autoSavingCache.loadSaved(); + assertTrue(keysLoaded < maxExpectedKeyCache); + assertTrue(0 != keysLoaded); + Mockito.verify(keyCacheSerializerSpy, Mockito.times(1)).cleanupAfterDeserialize(); + } + + private void createAndInvalidateCache(List<Pair<String, String>> tables, int numberOfRows) throws ExecutionException, InterruptedException + { + CompactionManager.instance.disableAutoCompaction(); + + // empty the cache + CacheService.instance.invalidateKeyCache(); + assertEquals(0, CacheService.instance.keyCache.size()); + + for(Pair<String, String> entry : tables) + { + String keyspace = entry.left; + String cf = entry.right; + ColumnFamilyStore store = Keyspace.open(keyspace).getColumnFamilyStore(cf); + + // insert data and force to disk + SchemaLoader.insertData(keyspace, cf, 0, numberOfRows); + store.forceBlockingFlush(); + } + for(Pair<String, String> entry : tables) + { + String keyspace = entry.left; + String cf = entry.right; + // populate the cache + readData(keyspace, cf, 0, numberOfRows); + assertKeyCacheSize(numberOfRows, keyspace, cf); + } + + // force the cache to disk + CacheService.instance.keyCache.submitWrite(CacheService.instance.keyCache.size()).get(); + + CacheService.instance.invalidateKeyCache(); + assertEquals(0, CacheService.instance.keyCache.size()); + } + private static void readData(String keyspace, String columnFamily, int startRow, int numberOfRows) { ColumnFamilyStore store = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org