This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 446a9d1d011be89e33970def2eb25366d6a24122 Merge: 402e2f2f73 1053e3b475 Author: David Capwell <dcapw...@apache.org> AuthorDate: Thu May 18 10:34:49 2023 -0700 Merge branch 'cassandra-4.0' into cassandra-4.1 CHANGES.txt | 1 + .../db/compaction/CompactionController.java | 2 +- .../cassandra/db/compaction/CompactionTask.java | 25 ++- .../test/CompactionOverlappingSSTableTest.java | 115 ++++++++++++ .../db/compaction/PartialCompactionsTest.java | 207 +++++++++++++++++++++ 5 files changed, 341 insertions(+), 9 deletions(-) diff --cc CHANGES.txt index baf11ee6c0,6db0e3b084..6167e04416 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,11 -1,5 +1,12 @@@ -4.0.10 +4.1.2 + * Allow keystore and trustrore passwords to be nullable (CASSANDRA-18124) + * Return snapshots with dots in their name in nodetool listsnapshots (CASSANDRA-18371) + * Fix NPE when loading snapshots and data directory is one directory from root (CASSANDRA-18359) + * Do not submit hints when hinted_handoff_enabled=false (CASSANDRA-18304) + * Fix COPY ... TO STDOUT behavior in cqlsh (CASSANDRA-18353) + * Remove six and Py2SaferScanner merge cruft (CASSANDRA-18354) +Merged from 4.0: + * Partial compaction can resurrect deleted data (CASSANDRA-18507) * Allow internal address to change with reconnecting snitches (CASSANDRA-16718) * Fix quoting in toCqlString methods of UDTs and aggregates (CASSANDRA-17918) * NPE when deserializing malformed collections from client (CASSANDRA-18505) diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java index dc08f5ae01,90abac3fb2..5fc8031966 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@@ -88,13 -83,14 +88,14 @@@ public class CompactionTask extends Abs if (partialCompactionsAcceptable() && transaction.originals().size() > 1) { // Try again w/o the largest one. - logger.warn("insufficient space to compact all requested files. {}MiB required, {} for compaction {}", + SSTableReader removedSSTable = cfs.getMaxSizeFile(nonExpiredSSTables); - logger.warn("insufficient space to compact all requested files. {}MB required, {} for compaction {} - removing largest SSTable: {}", ++ logger.warn("insufficient space to compact all requested files. {}MiB required, {} for compaction {} - removing largest SSTable: {}", (float) expectedSize / 1024 / 1024, StringUtils.join(transaction.originals(), ", "), - transaction.opId()); + transaction.opId(), + removedSSTable); // Note that we have removed files that are still marked as compacting. // This suboptimal but ok since the caller will unmark all the sstables at the end. - SSTableReader removedSSTable = cfs.getMaxSizeFile(nonExpiredSSTables); transaction.cancel(removedSSTable); return true; } diff --cc test/distributed/org/apache/cassandra/distributed/test/CompactionOverlappingSSTableTest.java index 0000000000,54f8ad7a7a..6a65c91437 mode 000000,100644..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/CompactionOverlappingSSTableTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/CompactionOverlappingSSTableTest.java @@@ -1,0 -1,114 +1,115 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.distributed.test; + + import java.io.IOException; + import java.util.Arrays; + import java.util.Set; + import java.util.concurrent.TimeUnit; ++import java.util.concurrent.TimeoutException; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.stream.Collectors; + + import com.google.common.collect.Iterables; + import com.google.common.collect.Sets; + import com.google.common.util.concurrent.Uninterruptibles; + import org.junit.Test; + + import net.bytebuddy.ByteBuddy; + import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; + import net.bytebuddy.implementation.MethodDelegation; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.Directories; + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.db.lifecycle.SSTableSet; + import org.apache.cassandra.distributed.Cluster; + import org.apache.cassandra.distributed.api.ConsistencyLevel; + + import static net.bytebuddy.matcher.ElementMatchers.named; + import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + import static org.junit.Assert.assertEquals; + + public class CompactionOverlappingSSTableTest extends TestBaseImpl + { + @Test - public void partialCompactionOverlappingTest() throws IOException ++ public void partialCompactionOverlappingTest() throws IOException, TimeoutException + { + + try (Cluster cluster = init(builder().withNodes(1) + .withDataDirCount(1) + .withInstanceInitializer(BB::install) + .start())) + { + cluster.schemaChange(withKeyspace("alter keyspace %s with replication = {'class': 'SimpleStrategy', 'replication_factor':3}")); + cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key) with compaction = {'class':'SizeTieredCompactionStrategy', 'enabled': 'false'} AND gc_grace_seconds=0")); + Set<Integer> expected = Sets.newHashSetWithExpectedSize(990); + for (int i = 0; i < 1000; i++) + { + cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (id) values (?)"), ConsistencyLevel.ONE, i); + if (i >= 10) + expected.add(i); + } + cluster.get(1).flush(KEYSPACE); + for (int i = 0; i < 10; i++) + { + cluster.coordinator(1).execute(withKeyspace("delete from %s.tbl where id = ?"), ConsistencyLevel.ONE, i); + cluster.get(1).flush(KEYSPACE); + } + assertEquals(expected, Arrays.stream(cluster.coordinator(1).execute(withKeyspace("select * from %s.tbl"), ConsistencyLevel.ONE)) + .map(x -> x[0]) + .collect(Collectors.toSet())); + + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); // make sure tombstones are gc:able + + cluster.get(1).runOnInstance(() -> { + BB.enabled.set(true); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl"); + cfs.forceMajorCompaction(); + assertEquals("We should have 2 sstables (not 1) after major compaction since we reduced the scope of the compaction", + 2, Iterables.size(cfs.getSSTables(SSTableSet.CANONICAL))); + }); + assertEquals(expected, Arrays.stream(cluster.coordinator(1).execute(withKeyspace("select * from %s.tbl"), ConsistencyLevel.ONE)) + .map(x -> x[0]) + .collect(Collectors.toSet())); + } + } + + public static class BB + { + static AtomicBoolean enabled = new AtomicBoolean(); + public static void install(ClassLoader cl, Integer i) + { + new ByteBuddy().rebase(Directories.class) + .method(named("hasAvailableDiskSpace").and(takesArguments(2))) + .intercept(MethodDelegation.to(BB.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + + public static boolean hasAvailableDiskSpace(long ignore1, long ignore2) + { + if (enabled.get()) + { + enabled.set(false); + return false; + } + return true; + } + } + } diff --cc test/unit/org/apache/cassandra/db/compaction/PartialCompactionsTest.java index 0000000000,9cb3872f80..b922ca8057 mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/db/compaction/PartialCompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/PartialCompactionsTest.java @@@ -1,0 -1,207 +1,207 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.db.compaction; + + import java.util.Iterator; + + import org.junit.After; + import org.junit.Before; + import org.junit.BeforeClass; + import org.junit.Test; + + import org.apache.cassandra.SchemaLoader; + import org.apache.cassandra.Util; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.Directories; + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.db.RowUpdateBuilder; + import org.apache.cassandra.db.lifecycle.LifecycleTransaction; + import org.apache.cassandra.io.sstable.format.SSTableReader; + import org.apache.cassandra.schema.KeyspaceParams; + import org.apache.cassandra.schema.TableMetadataRef; + import org.apache.cassandra.utils.CloseableIterator; + import org.apache.cassandra.utils.FBUtilities; + + import static org.hamcrest.MatcherAssert.assertThat; + import static org.hamcrest.CoreMatchers.instanceOf; + import static org.junit.Assert.assertEquals; + + public class PartialCompactionsTest extends SchemaLoader + { + static final String KEYSPACE = PartialCompactionsTest.class.getSimpleName(); + static final String TABLE = "testtable"; + + @BeforeClass + public static void initSchema() + { + CompactionManager.instance.disableAutoCompaction(); + + SchemaLoader.createKeyspace(KEYSPACE, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE, TABLE)); + + LimitableDataDirectory.applyTo(KEYSPACE, TABLE); + } + + @Before + public void prepareCFS() + { + LimitableDataDirectory.setAvailableSpace(cfStore(), null); + } + + @After + public void truncateCF() + { + cfStore().truncateBlocking(); + LifecycleTransaction.waitForDeletions(); + } + + private static ColumnFamilyStore cfStore() + { + return Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE); + } + + @Test + public void shouldNotResurrectDataFromSSTableExcludedDueToInsufficientSpace() + { + // given + ColumnFamilyStore cfs = cfStore(); + int few = 10, many = 10 * few; + + // a large sstable as the oldest + createDataSSTable(cfs, 0, many); + // more inserts (to have more than one sstable to compact) + createDataSSTable(cfs, many, many + few); + // delete data that's in both of the prior sstables + createTombstonesSSTable(cfs, many - few / 2, many + few / 2); + + // emulate there not being enough space to compact all sstables + LimitableDataDirectory.setAvailableSpace(cfs, enoughSpaceForAllButTheLargestSSTable(cfs)); + + // when - run a compaction where all tombstones have timed out + FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs, Integer.MAX_VALUE, false)); + + // then - the tombstones should not be removed + assertEquals("live sstables after compaction", 2, cfs.getLiveSSTables().size()); + assertEquals("remaining live rows after compaction", many, liveRows(cfs)); + } + + private static long enoughSpaceForAllButTheLargestSSTable(ColumnFamilyStore cfs) + { + long totalSize = 1, maxSize = 0; + for (SSTableReader ssTable : cfs.getLiveSSTables()) + { + long size = ssTable.onDiskLength(); + if (size > maxSize) maxSize = size; + totalSize += size; + } + return totalSize - maxSize; + } + + private static int liveRows(ColumnFamilyStore cfs) + { + return Util.getAll(Util.cmd(cfs, "key1").build()).stream() + .map(partition -> count(partition.rowIterator())) + .reduce(Integer::sum) + .orElse(0); + } + + private static int count(Iterator<?> iter) + { + try (CloseableIterator<?> unused = iter instanceof CloseableIterator ? (CloseableIterator<?>) iter : null) + { + int count = 0; + for (; iter.hasNext(); iter.next()) + { + count++; + } + return count; + } + } + + private static void createDataSSTable(ColumnFamilyStore cfs, int firstKey, int endKey) + { + for (int i = firstKey; i < endKey; i++) + { + new RowUpdateBuilder(cfs.metadata(), 0, "key1") + .clustering(String.valueOf(i)) + .add("val", String.valueOf(i)) + .build() + .applyUnsafe(); + } - cfs.forceBlockingFlush(); ++ cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + } + + private static void createTombstonesSSTable(ColumnFamilyStore cfs, int firstKey, int endKey) + { + for (int i = firstKey; i < endKey; i++) + { + RowUpdateBuilder.deleteRow(cfs.metadata(), 1, "key1", String.valueOf(i)).applyUnsafe(); + } - cfs.forceBlockingFlush(); ++ cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + } + + private static class LimitableDataDirectory extends Directories.DataDirectory + { + private Long availableSpace; + + LimitableDataDirectory(Directories.DataDirectory dataDirectory) + { + super(dataDirectory.location); + } + + @Override + public long getAvailableSpace() + { + if (availableSpace != null) + return availableSpace; + return super.getAvailableSpace(); + } + + public static void setAvailableSpace(ColumnFamilyStore cfs, Long availableSpace) + { + for (Directories.DataDirectory location : cfs.getDirectories().getWriteableLocations()) + { + assertThat("ColumnFamilyStore set up with ability to emulate limited disk space", + location, instanceOf(LimitableDataDirectory.class)); + ((LimitableDataDirectory) location).availableSpace = availableSpace; + } + } + + public static void applyTo(String ks, String cf) + { + Keyspace keyspace = Keyspace.open(ks); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(cf); + TableMetadataRef metadata = store.metadata; - keyspace.dropCf(metadata.id); ++ keyspace.dropCf(metadata.id, true); + ColumnFamilyStore cfs = ColumnFamilyStore.createColumnFamilyStore(keyspace, cf, metadata, wrapDirectoriesOf(store), false, false, true); + keyspace.initCfCustom(cfs); + } + + private static Directories wrapDirectoriesOf(ColumnFamilyStore cfs) + { + Directories.DataDirectory[] original = cfs.getDirectories().getWriteableLocations(); + Directories.DataDirectory[] wrapped = new Directories.DataDirectory[original.length]; + for (int i = 0; i < wrapped.length; i++) + { + wrapped[i] = new LimitableDataDirectory(original[i]); + } + return new Directories(cfs.metadata(), wrapped); + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org