This is an automated email from the ASF dual-hosted git repository. jolynch pushed a commit to branch cassandra-4.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit e73d05bf858fa195ac2f2778027ef0d2ebcd3abd Merge: 8cef32a 1fce84f Author: Joseph Lynch <joe.e.ly...@gmail.com> AuthorDate: Thu Dec 9 10:27:29 2021 -0500 Merge branch 'cassandra-3.11' into cassandra-4.0 CHANGES.txt | 1 + conf/cassandra.yaml | 5 + .../apache/cassandra/cache/AutoSavingCache.java | 8 +- src/java/org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 11 ++ .../org/apache/cassandra/db/lifecycle/View.java | 2 +- .../org/apache/cassandra/service/CacheService.java | 41 ++++-- .../test/microbench/CacheLoaderBench.java | 137 ++++++++++++++++++++ .../unit/org/apache/cassandra/db/KeyCacheTest.java | 138 ++++++++++++++++++++- 9 files changed, 332 insertions(+), 13 deletions(-) diff --cc CHANGES.txt index fb9dfaf,6c70235..055a46e --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,31 -1,17 +1,32 @@@ -3.11.12 - * Upgrade snakeyaml to 1.26 in 3.11 (CASSANDRA=17028) +4.0.2 + * Fixed broken classpath when multiple jars in build directory (CASSANDRA-17129) + * DebuggableThreadPoolExecutor does not propagate client warnings (CASSANDRA-17072) + * internode_send_buff_size_in_bytes and internode_recv_buff_size_in_bytes have new names. Backward compatibility with the old names added (CASSANDRA-17141) + * Remove unused configuration parameters from cassandra.yaml (CASSANDRA-17132) + * Queries performed with NODE_LOCAL consistency level do not update request metrics (CASSANDRA-17052) + * Fix multiple full sources can be select unexpectedly for bootstrap streaming (CASSANDRA-16945) + * Fix cassandra.yaml formatting of parameters (CASSANDRA-17131) + * Add backward compatibility for CQLSSTableWriter Date fields (CASSANDRA-17117) + * Push initial client connection messages to trace (CASSANDRA-17038) + * Correct the internode message timestamp if sending node has wrapped (CASSANDRA-16997) + * Avoid race causing us to return null in RangesAtEndpoint (CASSANDRA-16965) + * Avoid rewriting all sstables during cleanup when transient replication is enabled (CASSANDRA-16966) + * Prevent CQLSH from failure on Python 3.10 (CASSANDRA-16987) + * Avoid trying to acquire 0 permits from the rate limiter when taking snapshot (CASSANDRA-16872) + * Upgrade Caffeine to 2.5.6 (CASSANDRA-15153) + * Include SASI components to snapshots (CASSANDRA-15134) + * Fix missed wait latencies in the output of `nodetool tpstats -F` (CASSANDRA-16938) + * Remove all the state pollution between tests in SSTableReaderTest (CASSANDRA-16888) + * Delay auth setup until after gossip has settled to avoid unavailables on startup (CASSANDRA-16783) + * Fix clustering order logic in CREATE MATERIALIZED VIEW (CASSANDRA-16898) + * org.apache.cassandra.db.rows.ArrayCell#unsharedHeapSizeExcludingData includes data twice (CASSANDRA-16900) + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854) +Merged from 3.11: * Add key validation to ssstablescrub (CASSANDRA-16969) * Update Jackson from 2.9.10 to 2.12.5 (CASSANDRA-16851) - * Include SASI components to snapshots (CASSANDRA-15134) * Make assassinate more resilient to missing tokens (CASSANDRA-16847) - * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854) - * Validate SASI tokenizer options before adding index to schema (CASSANDRA-15135) - * Fixup scrub output when no data post-scrub and clear up old use of row, which really means partition (CASSANDRA-16835) - * Fix ant-junit dependency issue (CASSANDRA-16827) - * Reduce thread contention in CommitLogSegment and HintsBuffer (CASSANDRA-16072) - * Avoid sending CDC column if not enabled (CASSANDRA-16770) Merged from 3.0: + * Fix slow keycache load which blocks startup for tables with many sstables (CASSANDRA-14898) * Fix rare NPE caused by batchlog replay / node decomission races (CASSANDRA-17049) * Allow users to view permissions of the roles they created (CASSANDRA-16902) * Fix failure handling in inter-node communication (CASSANDRA-16334) diff --cc conf/cassandra.yaml index 451f9df,d00c3d9..1b0871f --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@@ -382,22 -368,24 +382,27 @@@ counter_cache_save_period: 720 # If not set, the default directory is $CASSANDRA_HOME/data/saved_caches. # saved_caches_directory: /var/lib/cassandra/saved_caches + # Number of seconds the server will wait for each cache (row, key, etc ...) to load while starting + # the Cassandra process. Setting this to a negative value is equivalent to disabling all cache loading on startup + # while still having the cache during runtime. + # cache_load_timeout_seconds: 30 + -# commitlog_sync may be either "periodic" or "batch." +# commitlog_sync may be either "periodic", "group", or "batch." # # When in batch mode, Cassandra won't ack writes until the commit log -# has been fsynced to disk. It will wait -# commitlog_sync_batch_window_in_ms milliseconds between fsyncs. -# This window should be kept short because the writer threads will -# be unable to do extra work while waiting. (You may need to increase -# concurrent_writes for the same reason.) +# has been flushed to disk. Each incoming write will trigger the flush task. +# commitlog_sync_batch_window_in_ms is a deprecated value. Previously it had +# almost no value, and is being removed. # -# commitlog_sync: batch # commitlog_sync_batch_window_in_ms: 2 # -# the other option is "periodic" where writes may be acked immediately +# group mode is similar to batch mode, where Cassandra will not ack writes +# until the commit log has been flushed to disk. The difference is group +# mode will wait up to commitlog_sync_group_window_in_ms between flushes. +# +# commitlog_sync_group_window_in_ms: 1000 +# +# the default option is "periodic" where writes may be acked immediately # and the CommitLog is simply synced every commitlog_sync_period_in_ms # milliseconds. commitlog_sync: periodic diff --cc src/java/org/apache/cassandra/cache/AutoSavingCache.java index 34f056a,e7cda16..430161f --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@@ -209,20 -206,17 +209,21 @@@ public class AutoSavingCache<K extends + " does not match current schema version " + Schema.instance.getVersion()); - ArrayDeque<Future<Pair<K, V>>> futures = new ArrayDeque<Future<Pair<K, V>>>(); - while (in.available() > 0) + ArrayDeque<Future<Pair<K, V>>> futures = new ArrayDeque<>(); + long loadByNanos = start + TimeUnit.SECONDS.toNanos(DatabaseDescriptor.getCacheLoadTimeout()); + while (System.nanoTime() < loadByNanos && in.available() > 0) { - //ksname and cfname are serialized by the serializers in CacheService + //tableId and indexName are serialized by the serializers in CacheService //That is delegated there because there are serializer specific conditions //where a cache key is skipped and not written - String ksname = in.readUTF(); - String cfname = in.readUTF(); + TableId tableId = TableId.deserialize(in); + String indexName = in.readUTF(); + if (indexName.isEmpty()) + indexName = null; - ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(ksname, cfname)); + ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tableId); + if (indexName != null && cfs != null) + cfs = cfs.indexManager.getIndexByName(indexName).getBackingTable().orElse(null); Future<Pair<K, V>> entryFuture = cacheLoader.deserialize(in, cfs); // Key cache entry can return null, if the SSTable doesn't exist. diff --cc src/java/org/apache/cassandra/service/CacheService.java index a1225fb,434db54..0a281ad --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@@ -413,8 -443,17 +416,13 @@@ public class CacheService implements Ca public static class KeyCacheSerializer implements CacheSerializer<KeyCacheKey, RowIndexEntry> { + // For column families with many SSTables the linear nature of getSSTables slowed down KeyCache loading + // by orders of magnitude. So we cache the sstables once and rely on cleanupAfterDeserialize to cleanup any + // cached state we may have accumulated during the load. + Map<Pair<String, String>, Map<Integer, SSTableReader>> cachedSSTableReaders = new ConcurrentHashMap<>(); + public void serialize(KeyCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException { - //Don't serialize old format entries since we didn't bother to implement serialization of both for simplicity - //https://issues.apache.org/jira/browse/CASSANDRA-10778 - if (!key.desc.version.storeRows()) return; - RowIndexEntry entry = CacheService.instance.keyCache.getInternal(key); if (entry == null) return; @@@ -443,31 -482,43 +453,44 @@@ ByteBuffer key = ByteBufferUtil.read(input, keyLength); int generation = input.readInt(); input.readBoolean(); // backwards compatibility for "promoted indexes" boolean - SSTableReader reader; - if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = findDesc(generation, cfs.getSSTables(SSTableSet.CANONICAL))) == null) + SSTableReader reader = null; + if (!skipEntry) + { - Pair<String, String> qualifiedName = Pair.create(cfs.metadata.ksName, cfs.metadata.cfName); ++ Pair<String, String> qualifiedName = Pair.create(cfs.metadata.keyspace, cfs.metadata.name); + Map<Integer, SSTableReader> generationToSSTableReader = cachedSSTableReaders.get(qualifiedName); + if (generationToSSTableReader == null) + { + generationToSSTableReader = new HashMap<>(cfs.getLiveSSTables().size()); + for (SSTableReader ssTableReader : cfs.getSSTables(SSTableSet.CANONICAL)) + { + generationToSSTableReader.put(ssTableReader.descriptor.generation, ssTableReader); + } + + cachedSSTableReaders.putIfAbsent(qualifiedName, generationToSSTableReader); + } + reader = generationToSSTableReader.get(generation); + } + + if (skipEntry || reader == null) { // The sstable doesn't exist anymore, so we can't be sure of the exact version and assume its the current version. The only case where we'll be // wrong is during upgrade, in which case we fail at deserialization. This is not a huge deal however since 1) this is unlikely enough that // this won't affect many users (if any) and only once, 2) this doesn't prevent the node from starting and 3) CASSANDRA-10219 shows that this // part of the code has been broken for a while without anyone noticing (it is, btw, still broken until CASSANDRA-10219 is fixed). - RowIndexEntry.Serializer.skipForCache(input, BigFormat.instance.getLatestVersion()); + RowIndexEntry.Serializer.skipForCache(input); return null; } - RowIndexEntry.IndexSerializer<?> indexSerializer = reader.descriptor.getFormat().getIndexSerializer(reader.metadata, ++ + RowIndexEntry.IndexSerializer<?> indexSerializer = reader.descriptor.getFormat().getIndexSerializer(reader.metadata(), reader.descriptor.version, reader.header); RowIndexEntry<?> entry = indexSerializer.deserializeForCache(input); - return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.ksAndCFName, reader.descriptor, key), entry)); + return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata(), reader.descriptor, key), entry)); } - private SSTableReader findDesc(int generation, Iterable<SSTableReader> collection) + public void cleanupAfterDeserialize() { - for (SSTableReader sstable : collection) - { - if (sstable.descriptor.generation == generation) - return sstable; - } - return null; + cachedSSTableReaders.clear(); } } } diff --cc test/microbench/org/apache/cassandra/test/microbench/CacheLoaderBench.java index 0000000,b3a7136..d91db6e mode 000000,100644..100644 --- a/test/microbench/org/apache/cassandra/test/microbench/CacheLoaderBench.java +++ b/test/microbench/org/apache/cassandra/test/microbench/CacheLoaderBench.java @@@ -1,0 -1,137 +1,137 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.test.microbench; + + import java.util.ArrayList; + import java.util.List; + import java.util.Random; + import java.util.concurrent.TimeUnit; + + import org.junit.Assert; + + import org.apache.cassandra.Util; + import org.apache.cassandra.cache.AutoSavingCache; + import org.apache.cassandra.cache.KeyCacheKey; -import org.apache.cassandra.config.ColumnDefinition; + import org.apache.cassandra.cql3.CQLTester; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.db.RowIndexEntry; + import org.apache.cassandra.db.RowUpdateBuilder; + import org.apache.cassandra.db.marshal.AsciiType; + import org.apache.cassandra.io.sstable.format.SSTableReader; ++import org.apache.cassandra.schema.ColumnMetadata; + import org.apache.cassandra.service.CacheService; + import org.apache.cassandra.utils.ByteBufferUtil; + import org.openjdk.jmh.annotations.Benchmark; + import org.openjdk.jmh.annotations.BenchmarkMode; + import org.openjdk.jmh.annotations.Fork; + import org.openjdk.jmh.annotations.Level; + import org.openjdk.jmh.annotations.Measurement; + import org.openjdk.jmh.annotations.Mode; + import org.openjdk.jmh.annotations.OutputTimeUnit; + import org.openjdk.jmh.annotations.Scope; + import org.openjdk.jmh.annotations.Setup; + import org.openjdk.jmh.annotations.State; + import org.openjdk.jmh.annotations.TearDown; + import org.openjdk.jmh.annotations.Threads; + import org.openjdk.jmh.annotations.Warmup; + + @SuppressWarnings("unused") + @BenchmarkMode(Mode.SampleTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS) + @Measurement(iterations = 2, time = 2, timeUnit = TimeUnit.SECONDS) + @Fork(value = 1) + @Threads(1) + @State(Scope.Benchmark) + public class CacheLoaderBench extends CQLTester + { + private static final int numSSTables = 1000; + private final Random random = new Random(); + + @Setup(Level.Trial) + public void setup() throws Throwable + { + CQLTester.prepareServer(); + String keyspace = createKeyspace("CREATE KEYSPACE %s with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } and durable_writes = false"); + String table1 = createTable(keyspace, "CREATE TABLE %s (key text PRIMARY KEY, val text)"); + String table2 = createTable(keyspace, "CREATE TABLE %s (key text PRIMARY KEY, val text)"); + + + Keyspace.system().forEach(k -> k.getColumnFamilyStores().forEach(ColumnFamilyStore::disableAutoCompaction)); + + ColumnFamilyStore cfs1 = Keyspace.open(keyspace).getColumnFamilyStore(table1); + ColumnFamilyStore cfs2 = Keyspace.open(keyspace).getColumnFamilyStore(table2); + cfs1.disableAutoCompaction(); + cfs2.disableAutoCompaction(); + + // Write a bunch of sstables to both cfs1 and cfs2 + + List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(2); + columnFamilyStores.add(cfs1); + columnFamilyStores.add(cfs2); + + logger.info("Creating {} sstables", numSSTables); + for (ColumnFamilyStore cfs: columnFamilyStores) + { + cfs.truncateBlocking(); + for (int i = 0; i < numSSTables ; i++) + { - ColumnDefinition colDef = ColumnDefinition.regularDef(cfs.metadata, ByteBufferUtil.bytes("val"), AsciiType.instance); - RowUpdateBuilder rowBuilder = new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis() + random.nextInt(), "key"); ++ ColumnMetadata colDef = ColumnMetadata.regularColumn(cfs.metadata(), ByteBufferUtil.bytes("val"), AsciiType.instance); ++ RowUpdateBuilder rowBuilder = new RowUpdateBuilder(cfs.metadata(), System.currentTimeMillis() + random.nextInt(), "key"); + rowBuilder.add(colDef, "val1"); + rowBuilder.build().apply(); + cfs.forceBlockingFlush(); + } + + Assert.assertEquals(numSSTables, cfs.getLiveSSTables().size()); + + // preheat key cache + for (SSTableReader sstable : cfs.getLiveSSTables()) + sstable.getPosition(Util.dk("key"), SSTableReader.Operator.EQ); + } + + AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = CacheService.instance.keyCache; + + // serialize to file + keyCache.submitWrite(keyCache.size()).get(); + } + + @Setup(Level.Invocation) + public void setupKeyCache() + { + AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = CacheService.instance.keyCache; + keyCache.clear(); + } + + @TearDown(Level.Trial) + public void teardown() + { + CQLTester.cleanup(); + CQLTester.tearDownClass(); + } + + @Benchmark + public void keyCacheLoadTest() throws Throwable + { + AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = CacheService.instance.keyCache; + + keyCache.loadSavedAsync().get(); + } + } diff --cc test/unit/org/apache/cassandra/db/KeyCacheTest.java index 1819b18,f31df18..ee56d6e --- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java +++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java @@@ -308,6 -338,112 +338,112 @@@ public class KeyCacheTes assertKeyCacheSize(noEarlyOpen ? 4 : 2, KEYSPACE1, cf); } + @Test + public void testKeyCacheLoadNegativeCacheLoadTime() throws Exception + { + DatabaseDescriptor.setCacheLoadTimeout(-1); + String cf = COLUMN_FAMILY7; + + createAndInvalidateCache(Collections.singletonList(Pair.create(KEYSPACE1, cf)), 100); + + CacheService.instance.keyCache.loadSaved(); + + // Here max time to load cache is negative which means no time left to load cache. So the keyCache size should + // be zero after loadSaved(). + assertKeyCacheSize(0, KEYSPACE1, cf); + assertEquals(0, CacheService.instance.keyCache.size()); + } + + @Test + public void testKeyCacheLoadTwoTablesTime() throws Exception + { + DatabaseDescriptor.setCacheLoadTimeout(60); + String columnFamily1 = COLUMN_FAMILY8; + String columnFamily2 = COLUMN_FAMILY_K2_1; + int numberOfRows = 100; + List<Pair<String, String>> tables = new ArrayList<>(2); + tables.add(Pair.create(KEYSPACE1, columnFamily1)); + tables.add(Pair.create(KEYSPACE2, columnFamily2)); + + createAndInvalidateCache(tables, numberOfRows); + + CacheService.instance.keyCache.loadSaved(); + + // Here max time to load cache is negative which means no time left to load cache. So the keyCache size should + // be zero after load. + assertKeyCacheSize(numberOfRows, KEYSPACE1, columnFamily1); + assertKeyCacheSize(numberOfRows, KEYSPACE2, columnFamily2); + assertEquals(numberOfRows * tables.size(), CacheService.instance.keyCache.size()); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testKeyCacheLoadCacheLoadTimeExceedingLimit() throws Exception + { + DatabaseDescriptor.setCacheLoadTimeout(2); + int delayMillis = 1000; + int numberOfRows = 100; + + String cf = COLUMN_FAMILY9; + + createAndInvalidateCache(Collections.singletonList(Pair.create(KEYSPACE1, cf)), numberOfRows); + + // Testing cache load. Here using custom built AutoSavingCache instance as simulating delay is not possible with + // 'CacheService.instance.keyCache'. 'AutoSavingCache.loadSaved()' is returning no.of entries loaded so we don't need + // to instantiate ICache.class. + CacheService.KeyCacheSerializer keyCacheSerializer = new CacheService.KeyCacheSerializer(); + CacheService.KeyCacheSerializer keyCacheSerializerSpy = Mockito.spy(keyCacheSerializer); + AutoSavingCache autoSavingCache = new AutoSavingCache(mock(ICache.class), + CacheService.CacheType.KEY_CACHE, + keyCacheSerializerSpy); + + doAnswer(new AnswersWithDelay(delayMillis, answer -> keyCacheSerializer.deserialize(answer.getArgument(0), + answer.getArgument(1)) )) + .when(keyCacheSerializerSpy).deserialize(any(DataInputPlus.class), any(ColumnFamilyStore.class)); + + long maxExpectedKeyCache = Math.min(numberOfRows, + 1 + TimeUnit.SECONDS.toMillis(DatabaseDescriptor.getCacheLoadTimeout()) / delayMillis); + + long keysLoaded = autoSavingCache.loadSaved(); + assertThat(keysLoaded, Matchers.lessThanOrEqualTo(maxExpectedKeyCache)); + assertNotEquals(0, keysLoaded); + Mockito.verify(keyCacheSerializerSpy, Mockito.times(1)).cleanupAfterDeserialize(); + } + + private void createAndInvalidateCache(List<Pair<String, String>> tables, int numberOfRows) throws ExecutionException, InterruptedException + { + CompactionManager.instance.disableAutoCompaction(); + + // empty the cache + CacheService.instance.invalidateKeyCache(); + assertEquals(0, CacheService.instance.keyCache.size()); + + for(Pair<String, String> entry : tables) + { - String keyspace = entry.left; - String cf = entry.right; ++ String keyspace = entry.left(); ++ String cf = entry.right(); + ColumnFamilyStore store = Keyspace.open(keyspace).getColumnFamilyStore(cf); + + // insert data and force to disk + SchemaLoader.insertData(keyspace, cf, 0, numberOfRows); + store.forceBlockingFlush(); + } + for(Pair<String, String> entry : tables) + { - String keyspace = entry.left; - String cf = entry.right; ++ String keyspace = entry.left(); ++ String cf = entry.right(); + // populate the cache + readData(keyspace, cf, 0, numberOfRows); + assertKeyCacheSize(numberOfRows, keyspace, cf); + } + + // force the cache to disk + CacheService.instance.keyCache.submitWrite(CacheService.instance.keyCache.size()).get(); + + CacheService.instance.invalidateKeyCache(); + assertEquals(0, CacheService.instance.keyCache.size()); + } + private static void readData(String keyspace, String columnFamily, int startRow, int numberOfRows) { ColumnFamilyStore store = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org