CASSANDRA-13418 Allow to skip overlapings checks patch by Romain GÃRARD; reviewed by Mick Semb Wever for CASSANDRA-13418
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/14d67d81 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/14d67d81 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/14d67d81 Branch: refs/heads/trunk Commit: 14d67d81c57d6387c77bd85c57b342d285880835 Parents: 37d6730 Author: Romain GÃRARD <r.ger...@criteo.com> Authored: Wed Aug 16 16:21:46 2017 +0200 Committer: Mick Semb Wever <m...@apache.org> Committed: Tue Sep 5 08:33:25 2017 +1000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionController.java | 67 ++++++++++++++++-- .../TimeWindowCompactionController.java | 49 +++++++++++++ .../TimeWindowCompactionStrategy.java | 10 +-- .../TimeWindowCompactionStrategyOptions.java | 22 ++++++ .../db/compaction/TimeWindowCompactionTask.java | 42 +++++++++++ .../db/compaction/CompactionControllerTest.java | 5 ++ .../TimeWindowCompactionStrategyTest.java | 74 +++++++++++++++++++- 8 files changed, 257 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d67d81/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1f63ced..9218d90 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.11.1 + * Add a compaction option to TWCS to ignore sstables overlapping checks (CASSANDRA-13418) * BTree.Builder memory leak (CASSANDRA-13754) * Revert CASSANDRA-10368 of supporting non-pk column filtering due to correctness (CASSANDRA-13798) * Fix cassandra-stress hang issues when an error during cluster connection happens (CASSANDRA-12938) http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d67d81/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index bf3647a..84aac09 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction; import java.util.*; import java.util.function.Predicate; +import org.apache.cassandra.config.Config; import org.apache.cassandra.db.Memtable; import org.apache.cassandra.db.rows.UnfilteredRowIterator; @@ -49,7 +50,8 @@ import static org.apache.cassandra.db.lifecycle.SSTableIntervalTree.buildInterva public class CompactionController implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(CompactionController.class); - static final boolean NEVER_PURGE_TOMBSTONES = Boolean.getBoolean("cassandra.never_purge_tombstones"); + private static final String NEVER_PURGE_TOMBSTONES_PROPERTY = Config.PROPERTY_PREFIX + "never_purge_tombstones"; + static final boolean NEVER_PURGE_TOMBSTONES = Boolean.getBoolean(NEVER_PURGE_TOMBSTONES_PROPERTY); public final ColumnFamilyStore cfs; private final boolean compactingRepaired; @@ -98,7 +100,14 @@ public class CompactionController implements AutoCloseable { if (NEVER_PURGE_TOMBSTONES) { - logger.debug("not refreshing overlaps - running with -Dcassandra.never_purge_tombstones=true"); + logger.debug("not refreshing overlaps - running with -D{}=true", + NEVER_PURGE_TOMBSTONES_PROPERTY); + return; + } + + if (ignoreOverlaps()) + { + logger.debug("not refreshing overlaps - running with ignoreOverlaps activated"); return; } @@ -120,7 +129,7 @@ public class CompactionController implements AutoCloseable if (this.overlappingSSTables != null) close(); - if (compacting == null) + if (compacting == null || ignoreOverlaps()) overlappingSSTables = Refs.tryRef(Collections.<SSTableReader>emptyList()); else overlappingSSTables = cfs.getAndReferenceOverlappingLiveSSTables(compacting); @@ -129,7 +138,7 @@ public class CompactionController implements AutoCloseable public Set<SSTableReader> getFullyExpiredSSTables() { - return getFullyExpiredSSTables(cfs, compacting, overlappingSSTables, gcBefore); + return getFullyExpiredSSTables(cfs, compacting, overlappingSSTables, gcBefore, ignoreOverlaps()); } /** @@ -146,20 +155,39 @@ public class CompactionController implements AutoCloseable * @param compacting we take the drop-candidates from this set, it is usually the sstables included in the compaction * @param overlapping the sstables that overlap the ones in compacting. * @param gcBefore + * @param ignoreOverlaps don't check if data shadows/overlaps any data in other sstables * @return */ - public static Set<SSTableReader> getFullyExpiredSSTables(ColumnFamilyStore cfStore, Iterable<SSTableReader> compacting, Iterable<SSTableReader> overlapping, int gcBefore) + public static Set<SSTableReader> getFullyExpiredSSTables(ColumnFamilyStore cfStore, + Iterable<SSTableReader> compacting, + Iterable<SSTableReader> overlapping, + int gcBefore, + boolean ignoreOverlaps) { logger.trace("Checking droppable sstables in {}", cfStore); if (NEVER_PURGE_TOMBSTONES || compacting == null) - return Collections.<SSTableReader>emptySet(); + return Collections.emptySet(); if (cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones() && !Iterables.all(compacting, SSTableReader::isRepaired)) return Collections.emptySet(); - List<SSTableReader> candidates = new ArrayList<>(); + if (ignoreOverlaps) + { + Set<SSTableReader> fullyExpired = new HashSet<>(); + for (SSTableReader candidate : compacting) + { + if (candidate.getSSTableMetadata().maxLocalDeletionTime < gcBefore) + { + fullyExpired.add(candidate); + logger.trace("Dropping overlap ignored expired SSTable {} (maxLocalDeletionTime={}, gcBefore={})", + candidate, candidate.getSSTableMetadata().maxLocalDeletionTime, gcBefore); + } + } + return fullyExpired; + } + List<SSTableReader> candidates = new ArrayList<>(); long minTimestamp = Long.MAX_VALUE; for (SSTableReader sstable : overlapping) @@ -203,6 +231,14 @@ public class CompactionController implements AutoCloseable return new HashSet<>(candidates); } + public static Set<SSTableReader> getFullyExpiredSSTables(ColumnFamilyStore cfStore, + Iterable<SSTableReader> compacting, + Iterable<SSTableReader> overlapping, + int gcBefore) + { + return getFullyExpiredSSTables(cfStore, compacting, overlapping, gcBefore, false); + } + public String getKeyspace() { return cfs.keyspace.getName(); @@ -306,6 +342,23 @@ public class CompactionController implements AutoCloseable return reader.simpleIterator(dfile, key, position, tombstoneOnly); } + /** + * Is overlapped sstables ignored + * + * Control whether or not we are taking into account overlapping sstables when looking for fully expired sstables. + * In order to reduce the amount of work needed, we look for sstables that can be dropped instead of compacted. + * As a safeguard mechanism, for each time range of data in a sstable, we are checking globally to see if all data + * of this time range is fully expired before considering to drop the sstable. + * This strategy can retain for a long time a lot of sstables on disk (see CASSANDRA-13418) so this option + * control whether or not this check should be ignored. + * + * @return false by default + */ + protected boolean ignoreOverlaps() + { + return false; + } + private FileDataInput openDataFile(SSTableReader reader) { return limiter != null ? reader.openDataReader(limiter) : reader.openDataReader(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d67d81/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionController.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionController.java new file mode 100644 index 0000000..cf9e0e6 --- /dev/null +++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionController.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.io.sstable.format.SSTableReader; + +public class TimeWindowCompactionController extends CompactionController +{ + private static final Logger logger = LoggerFactory.getLogger(TimeWindowCompactionController.class); + + private final boolean ignoreOverlaps; + + public TimeWindowCompactionController(ColumnFamilyStore cfs, Set<SSTableReader> compacting, int gcBefore, boolean ignoreOverlaps) + { + super(cfs, compacting, gcBefore); + this.ignoreOverlaps = ignoreOverlaps; + if (ignoreOverlaps) + logger.warn("You are running with sstables overlapping checks disabled, it can result in loss of data"); + } + + @Override + protected boolean ignoreOverlaps() + { + return ignoreOverlaps; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d67d81/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java index 595c46d..9532cc4 100644 --- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java @@ -66,7 +66,6 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy } else logger.debug("Enabling tombstone compactions for TWCS"); - } @Override @@ -82,7 +81,7 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION); if (modifier != null) - return new CompactionTask(cfs, modifier, gcBefore); + return new TimeWindowCompactionTask(cfs, modifier, gcBefore, options.ignoreOverlaps); } } @@ -104,7 +103,8 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy if (System.currentTimeMillis() - lastExpiredCheck > options.expiredSSTableCheckFrequency) { logger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables"); - expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingLiveSSTables(uncompacting), gcBefore); + expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, options.ignoreOverlaps ? Collections.emptySet() : cfs.getOverlappingLiveSSTables(uncompacting), + gcBefore, options.ignoreOverlaps); lastExpiredCheck = System.currentTimeMillis(); } else @@ -330,7 +330,7 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION); if (txn == null) return null; - return Collections.singleton(new CompactionTask(cfs, txn, gcBefore)); + return Collections.singleton(new TimeWindowCompactionTask(cfs, txn, gcBefore, options.ignoreOverlaps)); } @Override @@ -346,7 +346,7 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy return null; } - return new CompactionTask(cfs, modifier, gcBefore).setUserDefined(true); + return new TimeWindowCompactionTask(cfs, modifier, gcBefore, options.ignoreOverlaps).setUserDefined(true); } public int getEstimatedRemainingTasks() http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d67d81/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java index 07df606..24b4fe0 100644 --- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java +++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.Config; import org.apache.cassandra.exceptions.ConfigurationException; public final class TimeWindowCompactionStrategyOptions @@ -36,16 +37,21 @@ public final class TimeWindowCompactionStrategyOptions protected static final TimeUnit DEFAULT_COMPACTION_WINDOW_UNIT = TimeUnit.DAYS; protected static final int DEFAULT_COMPACTION_WINDOW_SIZE = 1; protected static final int DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS = 60 * 10; + protected static final Boolean DEFAULT_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION = false; protected static final String TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution"; protected static final String COMPACTION_WINDOW_UNIT_KEY = "compaction_window_unit"; protected static final String COMPACTION_WINDOW_SIZE_KEY = "compaction_window_size"; protected static final String EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY = "expired_sstable_check_frequency_seconds"; + protected static final String UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY = "unsafe_aggressive_sstable_expiration"; + + static final String UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_PROPERTY = Config.PROPERTY_PREFIX + "allow_unsafe_aggressive_sstable_expiration"; protected final int sstableWindowSize; protected final TimeUnit sstableWindowUnit; protected final TimeUnit timestampResolution; protected final long expiredSSTableCheckFrequency; + protected final boolean ignoreOverlaps; SizeTieredCompactionStrategyOptions stcsOptions; @@ -68,6 +74,9 @@ public final class TimeWindowCompactionStrategyOptions optionValue = options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY); expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(optionValue == null ? DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS : Long.parseLong(optionValue), TimeUnit.SECONDS); + optionValue = options.get(UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY); + ignoreOverlaps = optionValue == null ? DEFAULT_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION : (Boolean.getBoolean(UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_PROPERTY) && Boolean.parseBoolean(optionValue)); + stcsOptions = new SizeTieredCompactionStrategyOptions(options); } @@ -77,6 +86,7 @@ public final class TimeWindowCompactionStrategyOptions timestampResolution = DEFAULT_TIMESTAMP_RESOLUTION; sstableWindowSize = DEFAULT_COMPACTION_WINDOW_SIZE; expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS, TimeUnit.SECONDS); + ignoreOverlaps = DEFAULT_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION; stcsOptions = new SizeTieredCompactionStrategyOptions(); } @@ -136,10 +146,22 @@ public final class TimeWindowCompactionStrategyOptions throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY), e); } + + optionValue = options.get(UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY); + if (optionValue != null) + { + if (!(optionValue.equalsIgnoreCase("true") || optionValue.equalsIgnoreCase("false"))) + throw new ConfigurationException(String.format("%s is not 'true' or 'false' (%s)", UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY, optionValue)); + + if(optionValue.equalsIgnoreCase("true") && !Boolean.getBoolean(UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_PROPERTY)) + throw new ConfigurationException(String.format("%s is requested but not allowed, restart cassandra with -D%s=true to allow it", UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY, UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_PROPERTY)); + } + uncheckedOptions.remove(COMPACTION_WINDOW_SIZE_KEY); uncheckedOptions.remove(COMPACTION_WINDOW_UNIT_KEY); uncheckedOptions.remove(TIMESTAMP_RESOLUTION_KEY); uncheckedOptions.remove(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY); + uncheckedOptions.remove(UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY); uncheckedOptions = SizeTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions); http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d67d81/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionTask.java new file mode 100644 index 0000000..4f1fe6a --- /dev/null +++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionTask.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.util.Set; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.format.SSTableReader; + +public class TimeWindowCompactionTask extends CompactionTask +{ + private final boolean ignoreOverlaps; + + public TimeWindowCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore, boolean ignoreOverlaps) + { + super(cfs, txn, gcBefore); + this.ignoreOverlaps = ignoreOverlaps; + } + + @Override + public CompactionController getCompactionController(Set<SSTableReader> toCompact) + { + return new TimeWindowCompactionController(cfs, toCompact, gcBefore, ignoreOverlaps); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d67d81/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java index 1b400e8..052206e 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java @@ -177,6 +177,11 @@ public class CompactionControllerTest extends SchemaLoader expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore); assertNotNull(expired); assertEquals(0, expired.size()); + + // Now if we explicitly ask to ignore overlaped sstables, we should get back our expired sstable + expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore, true); + assertNotNull(expired); + assertEquals(1, expired.size()); } private void applyMutation(CFMetaData cfm, DecoratedKey key, long timestamp) http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d67d81/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java index 56d53bd..6fff279 100644 --- a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java @@ -60,6 +60,7 @@ public class TimeWindowCompactionStrategyTest extends SchemaLoader { // Disable tombstone histogram rounding for tests System.setProperty("cassandra.streaminghistogram.roundseconds", "1"); + System.setProperty(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_PROPERTY, "true"); SchemaLoader.prepareServer(); @@ -100,13 +101,24 @@ public class TimeWindowCompactionStrategyTest extends SchemaLoader { options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "MONTHS"); validateOptions(options); - fail(String.format("Invalid time units should be rejected", TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY)); + fail(String.format("Invalid %s should be rejected", TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY)); } catch (ConfigurationException e) { options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "MINUTES"); } + try + { + options.put(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY, "not-a-boolean"); + validateOptions(options); + fail(String.format("Invalid %s should be rejected", TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY)); + } + catch (ConfigurationException e) + { + options.put(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY, "true"); + } + options.put("bad_option", "1.0"); unvalidated = validateOptions(options); assertTrue(unvalidated.containsKey("bad_option")); @@ -272,4 +284,64 @@ public class TimeWindowCompactionStrategyTest extends SchemaLoader t.transaction.abort(); } + @Test + public void testDropOverlappingExpiredSSTables() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + ByteBuffer value = ByteBuffer.wrap(new byte[100]); + + // create 2 sstables + DecoratedKey key = Util.dk(String.valueOf("expired")); + new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), 1, key.getKey()) + .clustering("column") + .add("val", value).build().applyUnsafe(); + + cfs.forceBlockingFlush(); + SSTableReader expiredSSTable = cfs.getLiveSSTables().iterator().next(); + Thread.sleep(10); + + new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis() - 1000, key.getKey()) + .clustering("column") + .add("val", value).build().applyUnsafe(); + key = Util.dk(String.valueOf("nonexpired")); + new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), key.getKey()) + .clustering("column") + .add("val", value).build().applyUnsafe(); + + cfs.forceBlockingFlush(); + assertEquals(cfs.getLiveSSTables().size(), 2); + + Map<String, String> options = new HashMap<>(); + + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "30"); + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "SECONDS"); + options.put(TimeWindowCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "MILLISECONDS"); + options.put(TimeWindowCompactionStrategyOptions.EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, "0"); + TimeWindowCompactionStrategy twcs = new TimeWindowCompactionStrategy(cfs, options); + for (SSTableReader sstable : cfs.getLiveSSTables()) + twcs.addSSTable(sstable); + + twcs.startup(); + assertNull(twcs.getNextBackgroundTask((int) (System.currentTimeMillis() / 1000))); + Thread.sleep(2000); + assertNull(twcs.getNextBackgroundTask((int) (System.currentTimeMillis()/1000))); + + options.put(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY, "true"); + twcs = new TimeWindowCompactionStrategy(cfs, options); + for (SSTableReader sstable : cfs.getLiveSSTables()) + twcs.addSSTable(sstable); + + twcs.startup(); + AbstractCompactionTask t = twcs.getNextBackgroundTask((int) (System.currentTimeMillis()/1000)); + assertNotNull(t); + assertEquals(1, Iterables.size(t.transaction.originals())); + SSTableReader sstable = t.transaction.originals().iterator().next(); + assertEquals(sstable, expiredSSTable); + twcs.shutdown(); + t.transaction.abort(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org