This is an automated email from the ASF dual-hosted git repository.

jolynch pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new 1911a88  Fix slow keycache load which blocks startup for tables with 
many sstables.
1911a88 is described below

commit 1911a887e8316d343c9bfe3aca3f9d143e9f4a61
Author: Venkata Harikrishna Nukala <n.v.harikrishna.apa...@gmail.com>
AuthorDate: Sat Oct 23 00:03:45 2021 +0530

    Fix slow keycache load which blocks startup for tables with many sstables.
    
    Patch by Venkata Harikrishna Nukala; reviewed by Marcus Eriksson and Joseph 
Lynch for CASSANDRA-14898
---
 CHANGES.txt                                        |   1 +
 build.xml                                          |   4 +-
 conf/cassandra.yaml                                |   7 +-
 .../apache/cassandra/cache/AutoSavingCache.java    |   8 +-
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../cassandra/config/DatabaseDescriptor.java       |  11 ++
 .../org/apache/cassandra/db/lifecycle/View.java    |   2 +-
 .../org/apache/cassandra/service/CacheService.java |  38 ++++--
 .../test/microbench/CacheLoaderBench.java          | 137 +++++++++++++++++++++
 .../unit/org/apache/cassandra/db/KeyCacheTest.java | 135 +++++++++++++++++++-
 10 files changed, 330 insertions(+), 15 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index cf6a956..085e1ff 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.26:
+ * Fix slow keycache load which blocks startup for tables with many sstables 
(CASSANDRA-14898)
  * Fix rare NPE caused by batchlog replay / node decomission races 
(CASSANDRA-17049)
  * Allow users to view permissions of the roles they created (CASSANDRA-16902)
  * Fix failure handling in inter-node communication (CASSANDRA-16334)
diff --git a/build.xml b/build.xml
index b42c8ea..41f530d 100644
--- a/build.xml
+++ b/build.xml
@@ -375,8 +375,8 @@
           <dependency groupId="net.bytebuddy" artifactId="byte-buddy" 
version="${bytebuddy.version}" />
           <dependency groupId="net.bytebuddy" artifactId="byte-buddy-agent" 
version="${bytebuddy.version}" />
 
-          <dependency groupId="org.openjdk.jmh" artifactId="jmh-core" 
version="1.1.1" scope="test"/>
-          <dependency groupId="org.openjdk.jmh" 
artifactId="jmh-generator-annprocess" version="1.1.1" scope="test"/>
+          <dependency groupId="org.openjdk.jmh" artifactId="jmh-core" 
version="1.21" scope="test"/>
+          <dependency groupId="org.openjdk.jmh" 
artifactId="jmh-generator-annprocess" version="1.21" scope="test"/>
 
           <dependency groupId="org.apache.cassandra" 
artifactId="cassandra-all" version="${version}" />
           <!--dependency groupId="org.apache.cassandra" 
artifactId="cassandra-thrift" version="${version}" scope="provided"/-->
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index cc2a369..ec2157b 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -286,7 +286,12 @@ counter_cache_save_period: 7200
 # If not set, the default directory is $CASSANDRA_HOME/data/saved_caches.
 # saved_caches_directory: /var/lib/cassandra/saved_caches
 
-# commitlog_sync may be either "periodic" or "batch." 
+# Number of seconds the server will wait for each cache (row, key, etc ...) to 
load while starting
+# the Cassandra process. Setting this to a negative value is equivalent to 
disabling all cache loading on startup
+# while still having the cache during runtime.
+# cache_load_timeout_seconds: 30
+
+# commitlog_sync may be either "periodic" or "batch."
 # 
 # When in batch mode, Cassandra won't ack writes until the commit log
 # has been fsynced to disk.  It will wait
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java 
b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 3da6352..3b7a6b8 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -198,8 +198,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
                                               + " does not match current 
schema version "
                                               + Schema.instance.getVersion());
 
-                ArrayDeque<Future<Pair<K, V>>> futures = new 
ArrayDeque<Future<Pair<K, V>>>();
-                while (in.available() > 0)
+                ArrayDeque<Future<Pair<K, V>>> futures = new ArrayDeque<>();
+                long loadByNanos = start + 
TimeUnit.SECONDS.toNanos(DatabaseDescriptor.getCacheLoadTimeout());
+                while (System.nanoTime() < loadByNanos && in.available() > 0)
                 {
                     //ksname and cfname are serialized by the serializers in 
CacheService
                     //That is delegated there because there are serializer 
specific conditions
@@ -257,6 +258,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
             finally
             {
                 FileUtils.closeQuietly(in);
+                cacheLoader.cleanupAfterDeserialize();
             }
         }
         if (logger.isTraceEnabled())
@@ -433,5 +435,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
         void serialize(K key, DataOutputPlus out, ColumnFamilyStore cfs) 
throws IOException;
 
         Future<Pair<K, V>> deserialize(DataInputPlus in, ColumnFamilyStore 
cfs) throws IOException;
+
+        default void cleanupAfterDeserialize() { }
     }
 }
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index a835e5a..9e30306 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -254,6 +254,8 @@ public class Config
     public volatile int counter_cache_save_period = 7200;
     public volatile int counter_cache_keys_to_save = Integer.MAX_VALUE;
 
+    public int cache_load_timeout_seconds = 30;
+
     private static boolean isClientMode = false;
     private static Supplier<Config> overrideLoadConfig = null;
 
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index b7708cd..1795c98 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1979,6 +1979,17 @@ public class DatabaseDescriptor
         conf.counter_cache_save_period = counterCacheSavePeriod;
     }
 
+    public static int getCacheLoadTimeout()
+    {
+        return conf.cache_load_timeout_seconds;
+    }
+
+    @VisibleForTesting
+    public static void setCacheLoadTimeout(int seconds)
+    {
+        conf.cache_load_timeout_seconds = seconds;
+    }
+
     public static int getCounterCacheKeysToSave()
     {
         return conf.counter_cache_keys_to_save;
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java 
b/src/java/org/apache/cassandra/db/lifecycle/View.java
index 4b3aae0..c33f305 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -137,7 +137,7 @@ public class View
             case NONCOMPACTING:
                 return filter(sstables, (s) -> !compacting.contains(s));
             case CANONICAL:
-                Set<SSTableReader> canonicalSSTables = new HashSet<>();
+                Set<SSTableReader> canonicalSSTables = new 
HashSet<>(sstables.size() + compacting.size());
                 for (SSTableReader sstable : compacting)
                     if (sstable.openReason != SSTableReader.OpenReason.EARLY)
                         canonicalSSTables.add(sstable);
diff --git a/src/java/org/apache/cassandra/service/CacheService.java 
b/src/java/org/apache/cassandra/service/CacheService.java
index c4d1722..873a319 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -20,9 +20,12 @@ package org.apache.cassandra.service;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -441,6 +444,11 @@ public class CacheService implements CacheServiceMBean
 
     public static class KeyCacheSerializer implements 
CacheSerializer<KeyCacheKey, RowIndexEntry>
     {
+        // For column families with many SSTables the linear nature of 
getSSTables slowed down KeyCache loading
+        // by orders of magnitude. So we cache the sstables once and rely on 
cleanupAfterDeserialize to cleanup any
+        // cached state we may have accumulated during the load.
+        Map<Pair<String, String>, Map<Integer, SSTableReader>> 
cachedSSTableReaders = new ConcurrentHashMap<>();
+
         public void serialize(KeyCacheKey key, DataOutputPlus out, 
ColumnFamilyStore cfs) throws IOException
         {
             //Don't serialize old format entries since we didn't bother to 
implement serialization of both for simplicity
@@ -460,6 +468,8 @@ public class CacheService implements CacheServiceMBean
 
         public Future<Pair<KeyCacheKey, RowIndexEntry>> 
deserialize(DataInputPlus input, ColumnFamilyStore cfs) throws IOException
         {
+            boolean skipEntry = cfs == null || !cfs.isKeyCacheEnabled();
+
             //Keyspace and CF name are deserialized by AutoSaving cache and 
used to fetch the CFS provided as a
             //parameter so they aren't deserialized here, even though they are 
serialized by this serializer
             int keyLength = input.readInt();
@@ -472,7 +482,24 @@ public class CacheService implements CacheServiceMBean
             int generation = input.readInt();
             input.readBoolean(); // backwards compatibility for "promoted 
indexes" boolean
             SSTableReader reader = null;
-            if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = 
findDesc(generation, cfs.getSSTables(SSTableSet.CANONICAL))) == null)
+            if (!skipEntry)
+            {
+                Pair<String, String> qualifiedName = 
Pair.create(cfs.metadata.ksName, cfs.metadata.cfName);
+                Map<Integer, SSTableReader> generationToSSTableReader = 
cachedSSTableReaders.get(qualifiedName);
+                if (generationToSSTableReader == null)
+                {
+                    generationToSSTableReader = new 
HashMap<>(cfs.getLiveSSTables().size());
+                    for (SSTableReader ssTableReader : 
cfs.getSSTables(SSTableSet.CANONICAL))
+                    {
+                        
generationToSSTableReader.put(ssTableReader.descriptor.generation, 
ssTableReader);
+                    }
+
+                    cachedSSTableReaders.putIfAbsent(qualifiedName, 
generationToSSTableReader);
+                }
+                reader = generationToSSTableReader.get(generation);
+            }
+
+            if (skipEntry || reader == null)
             {
                 // The sstable doesn't exist anymore, so we can't be sure of 
the exact version and assume its the current version. The only case where we'll 
be
                 // wrong is during upgrade, in which case we fail at 
deserialization. This is not a huge deal however since 1) this is unlikely 
enough that
@@ -488,14 +515,9 @@ public class CacheService implements CacheServiceMBean
             return Futures.immediateFuture(Pair.create(new 
KeyCacheKey(cfs.metadata.ksAndCFName, reader.descriptor, key), entry));
         }
 
-        private SSTableReader findDesc(int generation, Iterable<SSTableReader> 
collection)
+        public void cleanupAfterDeserialize()
         {
-            for (SSTableReader sstable : collection)
-            {
-                if (sstable.descriptor.generation == generation)
-                    return sstable;
-            }
-            return null;
+            cachedSSTableReaders.clear();
         }
     }
 }
diff --git 
a/test/microbench/org/apache/cassandra/test/microbench/CacheLoaderBench.java 
b/test/microbench/org/apache/cassandra/test/microbench/CacheLoaderBench.java
new file mode 100644
index 0000000..b3a7136
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/CacheLoaderBench.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.cache.AutoSavingCache;
+import org.apache.cassandra.cache.KeyCacheKey;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+@SuppressWarnings("unused")
+@BenchmarkMode(Mode.SampleTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 2, time = 2, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@Threads(1)
+@State(Scope.Benchmark)
+public class CacheLoaderBench extends CQLTester
+{
+    private static final int numSSTables = 1000;
+    private final Random random = new Random();
+
+    @Setup(Level.Trial)
+    public void setup() throws Throwable
+    {
+        CQLTester.prepareServer();
+        String keyspace = createKeyspace("CREATE KEYSPACE %s with replication 
= { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } and durable_writes = 
false");
+        String table1 = createTable(keyspace, "CREATE TABLE %s (key text 
PRIMARY KEY, val text)");
+        String table2 = createTable(keyspace, "CREATE TABLE %s (key text 
PRIMARY KEY, val text)");
+
+
+        Keyspace.system().forEach(k -> 
k.getColumnFamilyStores().forEach(ColumnFamilyStore::disableAutoCompaction));
+
+        ColumnFamilyStore cfs1 = 
Keyspace.open(keyspace).getColumnFamilyStore(table1);
+        ColumnFamilyStore cfs2 = 
Keyspace.open(keyspace).getColumnFamilyStore(table2);
+        cfs1.disableAutoCompaction();
+        cfs2.disableAutoCompaction();
+
+        // Write a bunch of sstables to both cfs1 and cfs2
+
+        List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(2);
+        columnFamilyStores.add(cfs1);
+        columnFamilyStores.add(cfs2);
+
+        logger.info("Creating {} sstables", numSSTables);
+        for (ColumnFamilyStore cfs: columnFamilyStores)
+        {
+            cfs.truncateBlocking();
+            for (int i = 0; i < numSSTables ; i++)
+            {
+                ColumnDefinition colDef = 
ColumnDefinition.regularDef(cfs.metadata, ByteBufferUtil.bytes("val"), 
AsciiType.instance);
+                RowUpdateBuilder rowBuilder = new 
RowUpdateBuilder(cfs.metadata, System.currentTimeMillis() + random.nextInt(), 
"key");
+                rowBuilder.add(colDef, "val1");
+                rowBuilder.build().apply();
+                cfs.forceBlockingFlush();
+            }
+
+            Assert.assertEquals(numSSTables, cfs.getLiveSSTables().size());
+
+            // preheat key cache
+            for (SSTableReader sstable : cfs.getLiveSSTables())
+                sstable.getPosition(Util.dk("key"), SSTableReader.Operator.EQ);
+        }
+
+        AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = 
CacheService.instance.keyCache;
+
+        // serialize to file
+        keyCache.submitWrite(keyCache.size()).get();
+    }
+
+    @Setup(Level.Invocation)
+    public void setupKeyCache()
+    {
+        AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = 
CacheService.instance.keyCache;
+        keyCache.clear();
+    }
+
+    @TearDown(Level.Trial)
+    public void teardown()
+    {
+        CQLTester.cleanup();
+        CQLTester.tearDownClass();
+    }
+
+    @Benchmark
+    public void keyCacheLoadTest() throws Throwable
+    {
+        AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = 
CacheService.instance.keyCache;
+
+        keyCache.loadSavedAsync().get();
+    }
+}
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java 
b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index 515d30e..9cb06b9 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -17,9 +17,12 @@
  */
 package org.apache.cassandra.db;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -33,6 +36,8 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.cache.AutoSavingCache;
+import org.apache.cassandra.cache.ICache;
 import org.apache.cassandra.cache.KeyCacheKey;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -42,18 +47,32 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.Refs;
+import org.mockito.Mockito;
+import org.mockito.internal.stubbing.answers.AnswersWithDelay;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 
 public class KeyCacheTest
 {
     private static final String KEYSPACE1 = "KeyCacheTest1";
+    private static final String KEYSPACE2 = "KeyCacheTest2";
     private static final String COLUMN_FAMILY1 = "Standard1";
     private static final String COLUMN_FAMILY2 = "Standard2";
     private static final String COLUMN_FAMILY3 = "Standard3";
+    private static final String COLUMN_FAMILY7 = "Standard7";
+    private static final String COLUMN_FAMILY8 = "Standard8";
+    private static final String COLUMN_FAMILY9 = "Standard9";
+
+    private static final String COLUMN_FAMILY_K2_1 = "Standard1";
 
 
     @BeforeClass
@@ -64,7 +83,15 @@ public class KeyCacheTest
                                     KeyspaceParams.simple(1),
                                     SchemaLoader.standardCFMD(KEYSPACE1, 
COLUMN_FAMILY1),
                                     SchemaLoader.standardCFMD(KEYSPACE1, 
COLUMN_FAMILY2),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, 
COLUMN_FAMILY3));
+                                    SchemaLoader.standardCFMD(KEYSPACE1, 
COLUMN_FAMILY3),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, 
COLUMN_FAMILY7),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, 
COLUMN_FAMILY8),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, 
COLUMN_FAMILY9));
+
+        SchemaLoader.createKeyspace(KEYSPACE2,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE2, 
COLUMN_FAMILY_K2_1));
+
     }
 
     @AfterClass
@@ -231,6 +258,112 @@ public class KeyCacheTest
         assertKeyCacheSize(noEarlyOpen ? 4 : 2, KEYSPACE1, COLUMN_FAMILY1);
     }
 
+    @Test
+    public void testKeyCacheLoadNegativeCacheLoadTime() throws Exception
+    {
+        DatabaseDescriptor.setCacheLoadTimeout(-1);
+        String cf = COLUMN_FAMILY7;
+
+        
createAndInvalidateCache(Collections.singletonList(Pair.create(KEYSPACE1, cf)), 
100);
+
+        CacheService.instance.keyCache.loadSaved();
+
+        // Here max time to load cache is negative which means no time left to 
load cache. So the keyCache size should
+        // be zero after loadSaved().
+        assertKeyCacheSize(0, KEYSPACE1, cf);
+        assertEquals(0, CacheService.instance.keyCache.size());
+    }
+
+    @Test
+    public void testKeyCacheLoadTwoTablesTime() throws Exception
+    {
+        DatabaseDescriptor.setCacheLoadTimeout(60);
+        String columnFamily1 = COLUMN_FAMILY8;
+        String columnFamily2 = COLUMN_FAMILY_K2_1;
+        int numberOfRows = 100;
+        List<Pair<String, String>> tables = new ArrayList<>(2);
+        tables.add(Pair.create(KEYSPACE1, columnFamily1));
+        tables.add(Pair.create(KEYSPACE2, columnFamily2));
+
+        createAndInvalidateCache(tables, numberOfRows);
+
+        CacheService.instance.keyCache.loadSaved();
+
+        // Here max time to load cache is negative which means no time left to 
load cache. So the keyCache size should
+        // be zero after load.
+        assertKeyCacheSize(numberOfRows, KEYSPACE1, columnFamily1);
+        assertKeyCacheSize(numberOfRows, KEYSPACE2, columnFamily2);
+        assertEquals(numberOfRows * tables.size(), 
CacheService.instance.keyCache.size());
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Test
+    public void testKeyCacheLoadCacheLoadTimeExceedingLimit() throws Exception
+    {
+        DatabaseDescriptor.setCacheLoadTimeout(2);
+        int delayMillis = 1000;
+        int numberOfRows = 100;
+
+        String cf = COLUMN_FAMILY9;
+
+        
createAndInvalidateCache(Collections.singletonList(Pair.create(KEYSPACE1, cf)), 
numberOfRows);
+
+        // Testing cache load. Here using custom built AutoSavingCache 
instance as simulating delay is not possible with
+        // 'CacheService.instance.keyCache'. 'AutoSavingCache.loadSaved()' is 
returning no.of entries loaded so we don't need
+        // to instantiate ICache.class.
+        CacheService.KeyCacheSerializer keyCacheSerializer = new 
CacheService.KeyCacheSerializer();
+        CacheService.KeyCacheSerializer keyCacheSerializerSpy = 
Mockito.spy(keyCacheSerializer);
+        AutoSavingCache autoSavingCache = new 
AutoSavingCache(mock(ICache.class),
+                                                              
CacheService.CacheType.KEY_CACHE,
+                                                              
keyCacheSerializerSpy);
+
+        doAnswer(new AnswersWithDelay(delayMillis, answer -> 
keyCacheSerializer.deserialize(answer.getArgument(0),
+                                                                               
             answer.getArgument(1)) ))
+               
.when(keyCacheSerializerSpy).deserialize(any(DataInputPlus.class), 
any(ColumnFamilyStore.class));
+
+        long maxExpectedKeyCache = Math.min(numberOfRows,
+                                            1 + 
TimeUnit.SECONDS.toMillis(DatabaseDescriptor.getCacheLoadTimeout()) / 
delayMillis);
+
+        long keysLoaded = autoSavingCache.loadSaved();
+        assertTrue(keysLoaded < maxExpectedKeyCache);
+        assertTrue(0 != keysLoaded);
+        Mockito.verify(keyCacheSerializerSpy, 
Mockito.times(1)).cleanupAfterDeserialize();
+    }
+
+    private void createAndInvalidateCache(List<Pair<String, String>> tables, 
int numberOfRows) throws ExecutionException, InterruptedException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        // empty the cache
+        CacheService.instance.invalidateKeyCache();
+        assertEquals(0, CacheService.instance.keyCache.size());
+
+        for(Pair<String, String> entry : tables)
+        {
+            String keyspace = entry.left;
+            String cf = entry.right;
+            ColumnFamilyStore store = 
Keyspace.open(keyspace).getColumnFamilyStore(cf);
+
+            // insert data and force to disk
+            SchemaLoader.insertData(keyspace, cf, 0, numberOfRows);
+            store.forceBlockingFlush();
+        }
+        for(Pair<String, String> entry : tables)
+        {
+            String keyspace = entry.left;
+            String cf = entry.right;
+            // populate the cache
+            readData(keyspace, cf, 0, numberOfRows);
+            assertKeyCacheSize(numberOfRows, keyspace, cf);
+        }
+
+        // force the cache to disk
+        
CacheService.instance.keyCache.submitWrite(CacheService.instance.keyCache.size()).get();
+
+        CacheService.instance.invalidateKeyCache();
+        assertEquals(0, CacheService.instance.keyCache.size());
+    }
+
     private static void readData(String keyspace, String columnFamily, int 
startRow, int numberOfRows)
     {
         ColumnFamilyStore store = 
Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to