This is an automated email from the ASF dual-hosted git repository. blambov pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit db6641fbb6fd0c439e14f94caecdeee999311c62 Merge: 1acab45e4f c6385ac3dd Author: Branimir Lambov <branimir.lam...@datastax.com> AuthorDate: Thu Sep 21 16:03:41 2023 +0300 Merge branch 'cassandra-4.0' into cassandra-4.1 CHANGES.txt | 1 + .../db/compaction/CompactionController.java | 12 +- .../db/compaction/CompactionControllerTest.java | 140 +++++++++++++++++++++ 3 files changed, 146 insertions(+), 7 deletions(-) diff --cc CHANGES.txt index bae7f3c6d8,6c4e0ef6b0..bb5cc728c9 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -9,8 -4,10 +9,9 @@@ Merged from 4.0 * Fix BulkLoader ignoring cipher suites options (CASSANDRA-18582) * Migrate Python optparse to argparse (CASSANDRA-17914) Merged from 3.11: + * Fix delayed SSTable release with unsafe_aggressive_sstable_expiration (CASSANDRA-18756) * Revert CASSANDRA-18543 (CASSANDRA-18854) * Fix NPE when using udfContext in UDF after a restart of a node (CASSANDRA-18739) - * Moved jflex from runtime to build dependencies (CASSANDRA-18664) Merged from 3.0: * Add cqlshrc.sample and credentials.sample into Debian package (CASSANDRA-18818) * Refactor validation logic in StorageService.rebuild (CASSANDRA-18803) diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java index 9d81b61ed3,86546bb9f6..d1a8d01b92 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java @@@ -182,6 -199,124 +199,124 @@@ public class CompactionControllerTest e assertEquals(1, expired.size()); } + @Test + @BMRules(rules = { + @BMRule(name = "Pause compaction", + targetClass = "CompactionTask", + targetMethod = "runMayThrow", + targetLocation = "INVOKE getCompactionAwareWriter", + condition = "Thread.currentThread().getName().equals(\"compaction1\")", + action = "org.apache.cassandra.db.compaction.CompactionControllerTest.createCompactionControllerLatch.countDown();" + + "com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly" + + "(org.apache.cassandra.db.compaction.CompactionControllerTest.compaction2FinishLatch);"), + @BMRule(name = "Check overlaps", + targetClass = "CompactionTask", + targetMethod = "runMayThrow", + targetLocation = "INVOKE finish", + condition = "Thread.currentThread().getName().equals(\"compaction1\")", + action = "org.apache.cassandra.db.compaction.CompactionControllerTest.compaction1RefreshLatch.countDown();" + + "com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly" + + "(org.apache.cassandra.db.compaction.CompactionControllerTest.refreshCheckLatch);"), + @BMRule(name = "Increment overlap refresh counter", + targetClass = "ColumnFamilyStore", + targetMethod = "getAndReferenceOverlappingLiveSSTables", + condition = "Thread.currentThread().getName().equals(\"compaction1\")", + action = "org.apache.cassandra.db.compaction.CompactionControllerTest.incrementOverlapRefreshCounter();") + }) + public void testIgnoreOverlaps() throws Exception + { + testOverlapIterator(true); + overlapRefreshCounter = 0; + compaction2FinishLatch = new CountDownLatch(1); + createCompactionControllerLatch = new CountDownLatch(1); + compaction1RefreshLatch = new CountDownLatch(1); + refreshCheckLatch = new CountDownLatch(1); + testOverlapIterator(false); + } + + public void testOverlapIterator(boolean ignoreOverlaps) throws Exception + { + + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + //create 2 overlapping sstables + DecoratedKey key = Util.dk("k1"); + long timestamp1 = FBUtilities.timestampMicros(); + long timestamp2 = timestamp1 - 5; + applyMutation(cfs.metadata(), key, timestamp1); - cfs.forceBlockingFlush(); ++ cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + assertEquals(cfs.getLiveSSTables().size(), 1); + Set<SSTableReader> sstables = cfs.getLiveSSTables(); + + applyMutation(cfs.metadata(), key, timestamp2); - cfs.forceBlockingFlush(); ++ cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + assertEquals(cfs.getLiveSSTables().size(), 2); + String sstable2 = cfs.getLiveSSTables().iterator().next().getFilename(); + + System.setProperty(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_PROPERTY, "true"); + Map<String, String> options = new HashMap<>(); + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "30"); + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "SECONDS"); + options.put(TimeWindowCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "MILLISECONDS"); + options.put(TimeWindowCompactionStrategyOptions.EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, "0"); + options.put(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY, Boolean.toString(ignoreOverlaps)); + TimeWindowCompactionStrategy twcs = new TimeWindowCompactionStrategy(cfs, options); + for (SSTableReader sstable : cfs.getLiveSSTables()) + twcs.addSSTable(sstable); + + twcs.startup(); + + CompactionTask task = (CompactionTask)twcs.getUserDefinedTask(sstables, 0); + + assertNotNull(task); + assertEquals(1, Iterables.size(task.transaction.originals())); + + //start a compaction for the first sstable (compaction1) + //the overlap iterator should contain sstable2 + //this compaction will be paused by the BMRule + Thread t = new Thread(() -> { + task.execute(null); + }); + + //start a compaction for the second sstable (compaction2) + //the overlap iterator should contain sstable1 + //this compaction should complete as normal + Thread t2 = new Thread(() -> { + Uninterruptibles.awaitUninterruptibly(createCompactionControllerLatch); + assertEquals(1, overlapRefreshCounter); + CompactionManager.instance.forceUserDefinedCompaction(sstable2); + + //after compaction2 is finished, wait 1 minute and then resume compaction1 (this gives enough time for the overlapIterator to be refreshed) + //after resuming, the overlap iterator for compaction1 should be updated to include the new sstable created by compaction2, + //and it should not contain sstable2 + try + { + TimeUnit.MINUTES.sleep(1); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + compaction2FinishLatch.countDown(); + }); + + t.setName("compaction1"); + t.start(); + t2.start(); + + compaction1RefreshLatch.await(); + //at this point, the overlap iterator for compaction1 should be refreshed + + //verify that the overlap iterator for compaction1 is refreshed twice, (once during the constructor, and again after compaction2 finishes) + assertEquals(2, overlapRefreshCounter); + + refreshCheckLatch.countDown(); + t.join(); + } + private void applyMutation(TableMetadata cfm, DecoratedKey key, long timestamp) { ByteBuffer val = ByteBufferUtil.bytes(1L); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org