This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 0767c834162d74bdc857f47615b9e8a5c7e76d5b Merge: baa9d0327f 730b898b74 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Fri Dec 23 14:41:50 2022 +0100 Merge branch 'cassandra-3.0' into cassandra-3.11 CHANGES.txt | 1 + .../compaction/TimeWindowCompactionStrategy.java | 14 ++++++++++++ test/unit/org/apache/cassandra/MockSchema.java | 19 ++++++++++++---- .../TimeWindowCompactionStrategyTest.java | 25 +++++++++++++++++++++- 4 files changed, 54 insertions(+), 5 deletions(-) diff --cc CHANGES.txt index ad25802839,3900ab5b58..ee21b34ebc --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,7 -1,5 +1,8 @@@ -3.0.29 +3.11.15 + * Fix Splitter sometimes creating more splits than requested (CASSANDRA-18013) + +Merged from 3.0: + * Avoid anticompaction mixing data from two different time windows with TWCS (CASSANDRA-17970) * Do not spam the logs with MigrationCoordinator not being able to pull schemas (CASSANDRA-18096) * Fix incorrect resource name in LIST PERMISSION output (CASSANDRA-17848) * Suppress CVE-2022-41854 and similar (CASSANDRA-18083) diff --cc src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java index 74e5f9d95a,5ae1cc784c..bbc9cdf8ee --- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java @@@ -376,9 -333,23 +376,23 @@@ public class TimeWindowCompactionStrate LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION); if (txn == null) return null; - return Collections.singleton(new CompactionTask(cfs, txn, gcBefore)); + return Collections.singleton(new TimeWindowCompactionTask(cfs, txn, gcBefore, options.ignoreOverlaps)); } + /** + * TWCS should not group sstables for anticompaction - this can mix new and old data + */ + @Override + public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup) + { + Collection<Collection<SSTableReader>> groups = new ArrayList<>(sstablesToGroup.size()); + for (SSTableReader sstable : sstablesToGroup) + { + groups.add(Collections.singleton(sstable)); + } + return groups; + } + @Override @SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore) diff --cc test/unit/org/apache/cassandra/MockSchema.java index 2b480d8e3c,5f3198dad0..90c8e4c705 --- a/test/unit/org/apache/cassandra/MockSchema.java +++ b/test/unit/org/apache/cassandra/MockSchema.java @@@ -85,27 -86,18 +85,37 @@@ public class MockSchem return sstable(generation, size, false, cfs); } + public static SSTableReader sstable(int generation, int size, boolean keepRef, ColumnFamilyStore cfs) + { + return sstable(generation, size, keepRef, generation, generation, cfs); + } + + public static SSTableReader sstableWithLevel(int generation, long firstToken, long lastToken, int level, ColumnFamilyStore cfs) + { + return sstable(generation, 0, false, firstToken, lastToken, level, cfs); + } + + public static SSTableReader sstableWithLevel(int generation, int size, int level, ColumnFamilyStore cfs) + { + return sstable(generation, size, false, generation, generation, level, cfs); + } + + public static SSTableReader sstableWithTimestamp(int generation, long timestamp, ColumnFamilyStore cfs) + { - return sstable(generation, 0, false, timestamp, cfs); ++ return sstable(generation, 0, false, 0, 1000, 0, Integer.MAX_VALUE, timestamp, cfs); + } + - public static SSTableReader sstable(int generation, int size, boolean keepRef, ColumnFamilyStore cfs) + public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, ColumnFamilyStore cfs) { - return sstable(generation, size, keepRef, System.currentTimeMillis() * 1000, cfs); + return sstable(generation, size, keepRef, firstToken, lastToken, 0, cfs); } + public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, int level, ColumnFamilyStore cfs) ++ { ++ return sstable(generation, size, keepRef, firstToken, lastToken, level, Integer.MAX_VALUE, System.currentTimeMillis() * 1000, cfs); ++ } + - public static SSTableReader sstable(int generation, int size, boolean keepRef, long timestamp, ColumnFamilyStore cfs) ++ public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, int level, int minLocalDeletionTime, long timestamp, ColumnFamilyStore cfs) { Descriptor descriptor = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(), cfs.keyspace.getName(), @@@ -123,40 -115,37 +133,41 @@@ { } } - SegmentedFile segmentedFile = new BufferedSegmentedFile(new ChannelProxy(tempFile), RandomAccessReader.DEFAULT_BUFFER_SIZE, size); - if (size > 0) + // .complete() with size to make sstable.onDiskLength work + try (FileHandle.Builder builder = new FileHandle.Builder(new ChannelProxy(tempFile)).bufferSize(size); + FileHandle fileHandle = builder.complete(size)) { - try + if (size > 0) { - File file = new File(descriptor.filenameFor(Component.DATA)); - try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) + try { - raf.setLength(size); + File file = new File(descriptor.filenameFor(Component.DATA)); + try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) + { + raf.setLength(size); + } + } + catch (IOException e) + { + throw new RuntimeException(e); } } - catch (IOException e) - { - throw new RuntimeException(e); - } + SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.emptyList()); - StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator) - .sstableLevel(level) - .finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, UNREPAIRED_SSTABLE, header) - .get(MetadataType.STATS); ++ MetadataCollector collector = new MetadataCollector(cfs.metadata.comparator); ++ collector.update(new DeletionTime(timestamp, minLocalDeletionTime)); ++ StatsMetadata metadata = (StatsMetadata) collector.sstableLevel(level) ++ .finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, UNREPAIRED_SSTABLE, header) ++ .get(MetadataType.STATS); + SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata, + fileHandle.sharedCopy(), fileHandle.sharedCopy(), indexSummary.sharedCopy(), + new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header); + reader.first = readerBounds(firstToken); + reader.last = readerBounds(lastToken); + if (!keepRef) + reader.selfRef().release(); + return reader; } - SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.emptyList()); - MetadataCollector collector = new MetadataCollector(cfs.metadata.comparator); - collector.update(new DeletionTime(timestamp, (int) (System.currentTimeMillis() / 1000))); - StatsMetadata metadata = (StatsMetadata) collector.finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), - 0.01f, - -1, - header).get(MetadataType.STATS); - - SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata, - segmentedFile.sharedCopy(), segmentedFile.sharedCopy(), indexSummary.sharedCopy(), - new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header); - reader.first = reader.last = readerBounds(generation); - if (!keepRef) - reader.selfRef().release(); - return reader; + } public static ColumnFamilyStore newCFS() diff --cc test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java index ee7952bde4,9bed7c1c94..15d2a2e705 --- a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java @@@ -25,8 -26,10 +26,9 @@@ import java.util.Map import java.util.concurrent.TimeUnit; import com.google.common.collect.HashMultimap; + import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; - import org.junit.BeforeClass; import org.junit.Test; import static org.junit.Assert.assertEquals; @@@ -46,7 -48,8 +47,8 @@@ import org.apache.cassandra.db.RowUpdat import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.utils.Pair; + import org.apache.cassandra.MockSchema; +import org.apache.cassandra.utils.Pair; import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.getWindowBoundsInMillis; import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.newestBucket; @@@ -307,67 -282,24 +309,88 @@@ public class TimeWindowCompactionStrate t.transaction.abort(); } + @Test + public void testDropOverlappingExpiredSSTables() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + long timestamp = System.currentTimeMillis(); + ByteBuffer value = ByteBuffer.wrap(new byte[100]); + + // Create a expiring sstable with a TTL + DecoratedKey key = Util.dk("expired"); + new RowUpdateBuilder(cfs.metadata, timestamp, TTL_SECONDS, key.getKey()) + .clustering("column") + .add("val", value).build().applyUnsafe(); + + cfs.forceBlockingFlush(); + SSTableReader expiredSSTable = cfs.getLiveSSTables().iterator().next(); + Thread.sleep(10); + + // Create a second sstable without TTL and with a row superceded by the expiring row + new RowUpdateBuilder(cfs.metadata, timestamp - 1000, key.getKey()) + .clustering("column") + .add("val", value).build().applyUnsafe(); + key = Util.dk("nonexpired"); + new RowUpdateBuilder(cfs.metadata, timestamp, key.getKey()) + .clustering("column") + .add("val", value).build().applyUnsafe(); + + cfs.forceBlockingFlush(); + assertEquals(cfs.getLiveSSTables().size(), 2); + + Map<String, String> options = new HashMap<>(); + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "30"); + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "SECONDS"); + options.put(TimeWindowCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "MILLISECONDS"); + options.put(TimeWindowCompactionStrategyOptions.EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, "0"); + TimeWindowCompactionStrategy twcs = new TimeWindowCompactionStrategy(cfs, options); + for (SSTableReader sstable : cfs.getLiveSSTables()) + twcs.addSSTable(sstable); + + twcs.startup(); + assertNull(twcs.getNextBackgroundTask(nowInSeconds())); + + // Wait for the expiration of the first sstable + Thread.sleep(TimeUnit.SECONDS.toMillis(TTL_SECONDS + 1)); + assertNull(twcs.getNextBackgroundTask(nowInSeconds())); + + options.put(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY, "true"); + twcs = new TimeWindowCompactionStrategy(cfs, options); + for (SSTableReader sstable : cfs.getLiveSSTables()) + twcs.addSSTable(sstable); + + twcs.startup(); + AbstractCompactionTask t = twcs.getNextBackgroundTask(nowInSeconds()); + assertNotNull(t); + assertEquals(1, Iterables.size(t.transaction.originals())); + SSTableReader sstable = t.transaction.originals().iterator().next(); + assertEquals(sstable, expiredSSTable); + twcs.shutdown(); + t.transaction.abort(); + } ++ + @Test + public void testGroupForAntiCompaction() + { + ColumnFamilyStore cfs = MockSchema.newCFS("test_group_for_anticompaction"); + cfs.setCompactionParameters(ImmutableMap.of("class", "TimeWindowCompactionStrategy", + "timestamp_resolution", "MILLISECONDS", + "compaction_window_size", "1", + "compaction_window_unit", "MINUTES")); + + List<SSTableReader> sstables = new ArrayList<>(10); + long curr = System.currentTimeMillis(); + for (int i = 0; i < 10; i++) + sstables.add(MockSchema.sstableWithTimestamp(i, curr + TimeUnit.MILLISECONDS.convert(i, TimeUnit.MINUTES), cfs)); + + cfs.addSSTables(sstables); - Collection<Collection<SSTableReader>> groups = cfs.getCompactionStrategyManager().getStrategies().get(1).groupSSTablesForAntiCompaction(sstables); ++ Collection<Collection<SSTableReader>> groups = cfs.getCompactionStrategyManager().getCompactionStrategyFor(sstables.get(0)).groupSSTablesForAntiCompaction(sstables); + assertTrue(groups.size() > 0); + for (Collection<SSTableReader> group : groups) + assertEquals(1, group.size()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org