This is an automated email from the ASF dual-hosted git repository.

jolynch pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit e73d05bf858fa195ac2f2778027ef0d2ebcd3abd
Merge: 8cef32a 1fce84f
Author: Joseph Lynch <joe.e.ly...@gmail.com>
AuthorDate: Thu Dec 9 10:27:29 2021 -0500

    Merge branch 'cassandra-3.11' into cassandra-4.0

 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |   5 +
 .../apache/cassandra/cache/AutoSavingCache.java    |   8 +-
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../cassandra/config/DatabaseDescriptor.java       |  11 ++
 .../org/apache/cassandra/db/lifecycle/View.java    |   2 +-
 .../org/apache/cassandra/service/CacheService.java |  41 ++++--
 .../test/microbench/CacheLoaderBench.java          | 137 ++++++++++++++++++++
 .../unit/org/apache/cassandra/db/KeyCacheTest.java | 138 ++++++++++++++++++++-
 9 files changed, 332 insertions(+), 13 deletions(-)

diff --cc CHANGES.txt
index fb9dfaf,6c70235..055a46e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,31 -1,17 +1,32 @@@
 -3.11.12
 - * Upgrade snakeyaml to 1.26 in 3.11 (CASSANDRA=17028)
 +4.0.2
 + * Fixed broken classpath when multiple jars in build directory 
(CASSANDRA-17129)
 + * DebuggableThreadPoolExecutor does not propagate client warnings 
(CASSANDRA-17072)
 + * internode_send_buff_size_in_bytes and internode_recv_buff_size_in_bytes 
have new names. Backward compatibility with the old names added 
(CASSANDRA-17141)
 + * Remove unused configuration parameters from cassandra.yaml 
(CASSANDRA-17132)
 + * Queries performed with NODE_LOCAL consistency level do not update request 
metrics (CASSANDRA-17052)
 + * Fix multiple full sources can be select unexpectedly for bootstrap 
streaming (CASSANDRA-16945)
 + * Fix cassandra.yaml formatting of parameters (CASSANDRA-17131)
 + * Add backward compatibility for CQLSSTableWriter Date fields 
(CASSANDRA-17117)
 + * Push initial client connection messages to trace (CASSANDRA-17038)
 + * Correct the internode message timestamp if sending node has wrapped 
(CASSANDRA-16997)
 + * Avoid race causing us to return null in RangesAtEndpoint (CASSANDRA-16965)
 + * Avoid rewriting all sstables during cleanup when transient replication is 
enabled (CASSANDRA-16966)
 + * Prevent CQLSH from failure on Python 3.10 (CASSANDRA-16987)
 + * Avoid trying to acquire 0 permits from the rate limiter when taking 
snapshot (CASSANDRA-16872)
 + * Upgrade Caffeine to 2.5.6 (CASSANDRA-15153)
 + * Include SASI components to snapshots (CASSANDRA-15134)
 + * Fix missed wait latencies in the output of `nodetool tpstats -F` 
(CASSANDRA-16938)
 + * Remove all the state pollution between tests in SSTableReaderTest 
(CASSANDRA-16888)
 + * Delay auth setup until after gossip has settled to avoid unavailables on 
startup (CASSANDRA-16783)
 + * Fix clustering order logic in CREATE MATERIALIZED VIEW (CASSANDRA-16898)
 + * org.apache.cassandra.db.rows.ArrayCell#unsharedHeapSizeExcludingData 
includes data twice (CASSANDRA-16900)
 + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies 
(CASSANDRA-16854)
 +Merged from 3.11:
   * Add key validation to ssstablescrub (CASSANDRA-16969)
   * Update Jackson from 2.9.10 to 2.12.5 (CASSANDRA-16851)
 - * Include SASI components to snapshots (CASSANDRA-15134)
   * Make assassinate more resilient to missing tokens (CASSANDRA-16847)
 - * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies 
(CASSANDRA-16854)
 - * Validate SASI tokenizer options before adding index to schema 
(CASSANDRA-15135)
 - * Fixup scrub output when no data post-scrub and clear up old use of row, 
which really means partition (CASSANDRA-16835)
 - * Fix ant-junit dependency issue (CASSANDRA-16827)
 - * Reduce thread contention in CommitLogSegment and HintsBuffer 
(CASSANDRA-16072)
 - * Avoid sending CDC column if not enabled (CASSANDRA-16770)
  Merged from 3.0:
+  * Fix slow keycache load which blocks startup for tables with many sstables 
(CASSANDRA-14898)
   * Fix rare NPE caused by batchlog replay / node decomission races 
(CASSANDRA-17049)
   * Allow users to view permissions of the roles they created (CASSANDRA-16902)
   * Fix failure handling in inter-node communication (CASSANDRA-16334)
diff --cc conf/cassandra.yaml
index 451f9df,d00c3d9..1b0871f
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -382,22 -368,24 +382,27 @@@ counter_cache_save_period: 720
  # If not set, the default directory is $CASSANDRA_HOME/data/saved_caches.
  # saved_caches_directory: /var/lib/cassandra/saved_caches
  
+ # Number of seconds the server will wait for each cache (row, key, etc ...) 
to load while starting
+ # the Cassandra process. Setting this to a negative value is equivalent to 
disabling all cache loading on startup
+ # while still having the cache during runtime.
+ # cache_load_timeout_seconds: 30
+ 
 -# commitlog_sync may be either "periodic" or "batch."
 +# commitlog_sync may be either "periodic", "group", or "batch." 
  # 
  # When in batch mode, Cassandra won't ack writes until the commit log
 -# has been fsynced to disk.  It will wait
 -# commitlog_sync_batch_window_in_ms milliseconds between fsyncs.
 -# This window should be kept short because the writer threads will
 -# be unable to do extra work while waiting.  (You may need to increase
 -# concurrent_writes for the same reason.)
 +# has been flushed to disk.  Each incoming write will trigger the flush task.
 +# commitlog_sync_batch_window_in_ms is a deprecated value. Previously it had
 +# almost no value, and is being removed.
  #
 -# commitlog_sync: batch
  # commitlog_sync_batch_window_in_ms: 2
  #
 -# the other option is "periodic" where writes may be acked immediately
 +# group mode is similar to batch mode, where Cassandra will not ack writes
 +# until the commit log has been flushed to disk. The difference is group
 +# mode will wait up to commitlog_sync_group_window_in_ms between flushes.
 +#
 +# commitlog_sync_group_window_in_ms: 1000
 +#
 +# the default option is "periodic" where writes may be acked immediately
  # and the CommitLog is simply synced every commitlog_sync_period_in_ms
  # milliseconds.
  commitlog_sync: periodic
diff --cc src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 34f056a,e7cda16..430161f
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@@ -209,20 -206,17 +209,21 @@@ public class AutoSavingCache<K extends 
                                                + " does not match current 
schema version "
                                                + Schema.instance.getVersion());
  
-                 ArrayDeque<Future<Pair<K, V>>> futures = new 
ArrayDeque<Future<Pair<K, V>>>();
-                 while (in.available() > 0)
+                 ArrayDeque<Future<Pair<K, V>>> futures = new ArrayDeque<>();
+                 long loadByNanos = start + 
TimeUnit.SECONDS.toNanos(DatabaseDescriptor.getCacheLoadTimeout());
+                 while (System.nanoTime() < loadByNanos && in.available() > 0)
                  {
 -                    //ksname and cfname are serialized by the serializers in 
CacheService
 +                    //tableId and indexName are serialized by the serializers 
in CacheService
                      //That is delegated there because there are serializer 
specific conditions
                      //where a cache key is skipped and not written
 -                    String ksname = in.readUTF();
 -                    String cfname = in.readUTF();
 +                    TableId tableId = TableId.deserialize(in);
 +                    String indexName = in.readUTF();
 +                    if (indexName.isEmpty())
 +                        indexName = null;
  
 -                    ColumnFamilyStore cfs = 
Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(ksname, 
cfname));
 +                    ColumnFamilyStore cfs = 
Schema.instance.getColumnFamilyStoreInstance(tableId);
 +                    if (indexName != null && cfs != null)
 +                        cfs = 
cfs.indexManager.getIndexByName(indexName).getBackingTable().orElse(null);
  
                      Future<Pair<K, V>> entryFuture = 
cacheLoader.deserialize(in, cfs);
                      // Key cache entry can return null, if the SSTable 
doesn't exist.
diff --cc src/java/org/apache/cassandra/service/CacheService.java
index a1225fb,434db54..0a281ad
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@@ -413,8 -443,17 +416,13 @@@ public class CacheService implements Ca
  
      public static class KeyCacheSerializer implements 
CacheSerializer<KeyCacheKey, RowIndexEntry>
      {
+         // For column families with many SSTables the linear nature of 
getSSTables slowed down KeyCache loading
+         // by orders of magnitude. So we cache the sstables once and rely on 
cleanupAfterDeserialize to cleanup any
+         // cached state we may have accumulated during the load.
+         Map<Pair<String, String>, Map<Integer, SSTableReader>> 
cachedSSTableReaders = new ConcurrentHashMap<>();
+ 
          public void serialize(KeyCacheKey key, DataOutputPlus out, 
ColumnFamilyStore cfs) throws IOException
          {
 -            //Don't serialize old format entries since we didn't bother to 
implement serialization of both for simplicity
 -            //https://issues.apache.org/jira/browse/CASSANDRA-10778
 -            if (!key.desc.version.storeRows()) return;
 -
              RowIndexEntry entry = 
CacheService.instance.keyCache.getInternal(key);
              if (entry == null)
                  return;
@@@ -443,31 -482,43 +453,44 @@@
              ByteBuffer key = ByteBufferUtil.read(input, keyLength);
              int generation = input.readInt();
              input.readBoolean(); // backwards compatibility for "promoted 
indexes" boolean
-             SSTableReader reader;
-             if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = 
findDesc(generation, cfs.getSSTables(SSTableSet.CANONICAL))) == null)
+             SSTableReader reader = null;
+             if (!skipEntry)
+             {
 -                Pair<String, String> qualifiedName = 
Pair.create(cfs.metadata.ksName, cfs.metadata.cfName);
++                Pair<String, String> qualifiedName = 
Pair.create(cfs.metadata.keyspace, cfs.metadata.name);
+                 Map<Integer, SSTableReader> generationToSSTableReader = 
cachedSSTableReaders.get(qualifiedName);
+                 if (generationToSSTableReader == null)
+                 {
+                     generationToSSTableReader = new 
HashMap<>(cfs.getLiveSSTables().size());
+                     for (SSTableReader ssTableReader : 
cfs.getSSTables(SSTableSet.CANONICAL))
+                     {
+                         
generationToSSTableReader.put(ssTableReader.descriptor.generation, 
ssTableReader);
+                     }
+ 
+                     cachedSSTableReaders.putIfAbsent(qualifiedName, 
generationToSSTableReader);
+                 }
+                 reader = generationToSSTableReader.get(generation);
+             }
+ 
+             if (skipEntry || reader == null)
              {
                  // The sstable doesn't exist anymore, so we can't be sure of 
the exact version and assume its the current version. The only case where we'll 
be
                  // wrong is during upgrade, in which case we fail at 
deserialization. This is not a huge deal however since 1) this is unlikely 
enough that
                  // this won't affect many users (if any) and only once, 2) 
this doesn't prevent the node from starting and 3) CASSANDRA-10219 shows that 
this
                  // part of the code has been broken for a while without 
anyone noticing (it is, btw, still broken until CASSANDRA-10219 is fixed).
 -                RowIndexEntry.Serializer.skipForCache(input, 
BigFormat.instance.getLatestVersion());
 +                RowIndexEntry.Serializer.skipForCache(input);
                  return null;
              }
 -            RowIndexEntry.IndexSerializer<?> indexSerializer = 
reader.descriptor.getFormat().getIndexSerializer(reader.metadata,
++
 +            RowIndexEntry.IndexSerializer<?> indexSerializer = 
reader.descriptor.getFormat().getIndexSerializer(reader.metadata(),
                                                                                
                                  reader.descriptor.version,
                                                                                
                                  reader.header);
              RowIndexEntry<?> entry = 
indexSerializer.deserializeForCache(input);
 -            return Futures.immediateFuture(Pair.create(new 
KeyCacheKey(cfs.metadata.ksAndCFName, reader.descriptor, key), entry));
 +            return Futures.immediateFuture(Pair.create(new 
KeyCacheKey(cfs.metadata(), reader.descriptor, key), entry));
          }
  
-         private SSTableReader findDesc(int generation, 
Iterable<SSTableReader> collection)
+         public void cleanupAfterDeserialize()
          {
-             for (SSTableReader sstable : collection)
-             {
-                 if (sstable.descriptor.generation == generation)
-                     return sstable;
-             }
-             return null;
+             cachedSSTableReaders.clear();
          }
      }
  }
diff --cc 
test/microbench/org/apache/cassandra/test/microbench/CacheLoaderBench.java
index 0000000,b3a7136..d91db6e
mode 000000,100644..100644
--- a/test/microbench/org/apache/cassandra/test/microbench/CacheLoaderBench.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/CacheLoaderBench.java
@@@ -1,0 -1,137 +1,137 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.test.microbench;
+ 
+ import java.util.ArrayList;
+ import java.util.List;
+ import java.util.Random;
+ import java.util.concurrent.TimeUnit;
+ 
+ import org.junit.Assert;
+ 
+ import org.apache.cassandra.Util;
+ import org.apache.cassandra.cache.AutoSavingCache;
+ import org.apache.cassandra.cache.KeyCacheKey;
 -import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.cql3.CQLTester;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.RowIndexEntry;
+ import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.marshal.AsciiType;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
++import org.apache.cassandra.schema.ColumnMetadata;
+ import org.apache.cassandra.service.CacheService;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.openjdk.jmh.annotations.Benchmark;
+ import org.openjdk.jmh.annotations.BenchmarkMode;
+ import org.openjdk.jmh.annotations.Fork;
+ import org.openjdk.jmh.annotations.Level;
+ import org.openjdk.jmh.annotations.Measurement;
+ import org.openjdk.jmh.annotations.Mode;
+ import org.openjdk.jmh.annotations.OutputTimeUnit;
+ import org.openjdk.jmh.annotations.Scope;
+ import org.openjdk.jmh.annotations.Setup;
+ import org.openjdk.jmh.annotations.State;
+ import org.openjdk.jmh.annotations.TearDown;
+ import org.openjdk.jmh.annotations.Threads;
+ import org.openjdk.jmh.annotations.Warmup;
+ 
+ @SuppressWarnings("unused")
+ @BenchmarkMode(Mode.SampleTime)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ @Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS)
+ @Measurement(iterations = 2, time = 2, timeUnit = TimeUnit.SECONDS)
+ @Fork(value = 1)
+ @Threads(1)
+ @State(Scope.Benchmark)
+ public class CacheLoaderBench extends CQLTester
+ {
+     private static final int numSSTables = 1000;
+     private final Random random = new Random();
+ 
+     @Setup(Level.Trial)
+     public void setup() throws Throwable
+     {
+         CQLTester.prepareServer();
+         String keyspace = createKeyspace("CREATE KEYSPACE %s with replication 
= { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } and durable_writes = 
false");
+         String table1 = createTable(keyspace, "CREATE TABLE %s (key text 
PRIMARY KEY, val text)");
+         String table2 = createTable(keyspace, "CREATE TABLE %s (key text 
PRIMARY KEY, val text)");
+ 
+ 
+         Keyspace.system().forEach(k -> 
k.getColumnFamilyStores().forEach(ColumnFamilyStore::disableAutoCompaction));
+ 
+         ColumnFamilyStore cfs1 = 
Keyspace.open(keyspace).getColumnFamilyStore(table1);
+         ColumnFamilyStore cfs2 = 
Keyspace.open(keyspace).getColumnFamilyStore(table2);
+         cfs1.disableAutoCompaction();
+         cfs2.disableAutoCompaction();
+ 
+         // Write a bunch of sstables to both cfs1 and cfs2
+ 
+         List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(2);
+         columnFamilyStores.add(cfs1);
+         columnFamilyStores.add(cfs2);
+ 
+         logger.info("Creating {} sstables", numSSTables);
+         for (ColumnFamilyStore cfs: columnFamilyStores)
+         {
+             cfs.truncateBlocking();
+             for (int i = 0; i < numSSTables ; i++)
+             {
 -                ColumnDefinition colDef = 
ColumnDefinition.regularDef(cfs.metadata, ByteBufferUtil.bytes("val"), 
AsciiType.instance);
 -                RowUpdateBuilder rowBuilder = new 
RowUpdateBuilder(cfs.metadata, System.currentTimeMillis() + random.nextInt(), 
"key");
++                ColumnMetadata colDef = 
ColumnMetadata.regularColumn(cfs.metadata(), ByteBufferUtil.bytes("val"), 
AsciiType.instance);
++                RowUpdateBuilder rowBuilder = new 
RowUpdateBuilder(cfs.metadata(), System.currentTimeMillis() + random.nextInt(), 
"key");
+                 rowBuilder.add(colDef, "val1");
+                 rowBuilder.build().apply();
+                 cfs.forceBlockingFlush();
+             }
+ 
+             Assert.assertEquals(numSSTables, cfs.getLiveSSTables().size());
+ 
+             // preheat key cache
+             for (SSTableReader sstable : cfs.getLiveSSTables())
+                 sstable.getPosition(Util.dk("key"), 
SSTableReader.Operator.EQ);
+         }
+ 
+         AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = 
CacheService.instance.keyCache;
+ 
+         // serialize to file
+         keyCache.submitWrite(keyCache.size()).get();
+     }
+ 
+     @Setup(Level.Invocation)
+     public void setupKeyCache()
+     {
+         AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = 
CacheService.instance.keyCache;
+         keyCache.clear();
+     }
+ 
+     @TearDown(Level.Trial)
+     public void teardown()
+     {
+         CQLTester.cleanup();
+         CQLTester.tearDownClass();
+     }
+ 
+     @Benchmark
+     public void keyCacheLoadTest() throws Throwable
+     {
+         AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = 
CacheService.instance.keyCache;
+ 
+         keyCache.loadSavedAsync().get();
+     }
+ }
diff --cc test/unit/org/apache/cassandra/db/KeyCacheTest.java
index 1819b18,f31df18..ee56d6e
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@@ -308,6 -338,112 +338,112 @@@ public class KeyCacheTes
          assertKeyCacheSize(noEarlyOpen ? 4 : 2, KEYSPACE1, cf);
      }
  
+     @Test
+     public void testKeyCacheLoadNegativeCacheLoadTime() throws Exception
+     {
+         DatabaseDescriptor.setCacheLoadTimeout(-1);
+         String cf = COLUMN_FAMILY7;
+ 
+         
createAndInvalidateCache(Collections.singletonList(Pair.create(KEYSPACE1, cf)), 
100);
+ 
+         CacheService.instance.keyCache.loadSaved();
+ 
+         // Here max time to load cache is negative which means no time left 
to load cache. So the keyCache size should
+         // be zero after loadSaved().
+         assertKeyCacheSize(0, KEYSPACE1, cf);
+         assertEquals(0, CacheService.instance.keyCache.size());
+     }
+ 
+     @Test
+     public void testKeyCacheLoadTwoTablesTime() throws Exception
+     {
+         DatabaseDescriptor.setCacheLoadTimeout(60);
+         String columnFamily1 = COLUMN_FAMILY8;
+         String columnFamily2 = COLUMN_FAMILY_K2_1;
+         int numberOfRows = 100;
+         List<Pair<String, String>> tables = new ArrayList<>(2);
+         tables.add(Pair.create(KEYSPACE1, columnFamily1));
+         tables.add(Pair.create(KEYSPACE2, columnFamily2));
+ 
+         createAndInvalidateCache(tables, numberOfRows);
+ 
+         CacheService.instance.keyCache.loadSaved();
+ 
+         // Here max time to load cache is negative which means no time left 
to load cache. So the keyCache size should
+         // be zero after load.
+         assertKeyCacheSize(numberOfRows, KEYSPACE1, columnFamily1);
+         assertKeyCacheSize(numberOfRows, KEYSPACE2, columnFamily2);
+         assertEquals(numberOfRows * tables.size(), 
CacheService.instance.keyCache.size());
+     }
+ 
+     @SuppressWarnings({ "unchecked", "rawtypes" })
+     @Test
+     public void testKeyCacheLoadCacheLoadTimeExceedingLimit() throws Exception
+     {
+         DatabaseDescriptor.setCacheLoadTimeout(2);
+         int delayMillis = 1000;
+         int numberOfRows = 100;
+ 
+         String cf = COLUMN_FAMILY9;
+ 
+         
createAndInvalidateCache(Collections.singletonList(Pair.create(KEYSPACE1, cf)), 
numberOfRows);
+ 
+         // Testing cache load. Here using custom built AutoSavingCache 
instance as simulating delay is not possible with
+         // 'CacheService.instance.keyCache'. 'AutoSavingCache.loadSaved()' is 
returning no.of entries loaded so we don't need
+         // to instantiate ICache.class.
+         CacheService.KeyCacheSerializer keyCacheSerializer = new 
CacheService.KeyCacheSerializer();
+         CacheService.KeyCacheSerializer keyCacheSerializerSpy = 
Mockito.spy(keyCacheSerializer);
+         AutoSavingCache autoSavingCache = new 
AutoSavingCache(mock(ICache.class),
+                                                               
CacheService.CacheType.KEY_CACHE,
+                                                               
keyCacheSerializerSpy);
+ 
+         doAnswer(new AnswersWithDelay(delayMillis, answer -> 
keyCacheSerializer.deserialize(answer.getArgument(0),
+                                                                               
              answer.getArgument(1)) ))
+                
.when(keyCacheSerializerSpy).deserialize(any(DataInputPlus.class), 
any(ColumnFamilyStore.class));
+ 
+         long maxExpectedKeyCache = Math.min(numberOfRows,
+                                             1 + 
TimeUnit.SECONDS.toMillis(DatabaseDescriptor.getCacheLoadTimeout()) / 
delayMillis);
+ 
+         long keysLoaded = autoSavingCache.loadSaved();
+         assertThat(keysLoaded, 
Matchers.lessThanOrEqualTo(maxExpectedKeyCache));
+         assertNotEquals(0, keysLoaded);
+         Mockito.verify(keyCacheSerializerSpy, 
Mockito.times(1)).cleanupAfterDeserialize();
+     }
+ 
+     private void createAndInvalidateCache(List<Pair<String, String>> tables, 
int numberOfRows) throws ExecutionException, InterruptedException
+     {
+         CompactionManager.instance.disableAutoCompaction();
+ 
+         // empty the cache
+         CacheService.instance.invalidateKeyCache();
+         assertEquals(0, CacheService.instance.keyCache.size());
+ 
+         for(Pair<String, String> entry : tables)
+         {
 -            String keyspace = entry.left;
 -            String cf = entry.right;
++            String keyspace = entry.left();
++            String cf = entry.right();
+             ColumnFamilyStore store = 
Keyspace.open(keyspace).getColumnFamilyStore(cf);
+ 
+             // insert data and force to disk
+             SchemaLoader.insertData(keyspace, cf, 0, numberOfRows);
+             store.forceBlockingFlush();
+         }
+         for(Pair<String, String> entry : tables)
+         {
 -            String keyspace = entry.left;
 -            String cf = entry.right;
++            String keyspace = entry.left();
++            String cf = entry.right();
+             // populate the cache
+             readData(keyspace, cf, 0, numberOfRows);
+             assertKeyCacheSize(numberOfRows, keyspace, cf);
+         }
+ 
+         // force the cache to disk
+         
CacheService.instance.keyCache.submitWrite(CacheService.instance.keyCache.size()).get();
+ 
+         CacheService.instance.invalidateKeyCache();
+         assertEquals(0, CacheService.instance.keyCache.size());
+     }
+ 
      private static void readData(String keyspace, String columnFamily, int 
startRow, int numberOfRows)
      {
          ColumnFamilyStore store = 
Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to