Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 c9b83e0a8 -> 52191ebe3
More aggressive check for expired sstables in DTCS Patch by Björn Hegerfors; reviewed by marcuse for CASSANDRA-8359 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3d0c4e78 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3d0c4e78 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3d0c4e78 Branch: refs/heads/cassandra-2.1 Commit: 3d0c4e78c6bb3f8767aa0720b7de579908e2bf59 Parents: 74bfa77 Author: Björn Hegerfors <bj...@spotify.com> Authored: Wed Apr 1 12:26:00 2015 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Apr 1 12:32:09 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../DateTieredCompactionStrategy.java | 31 ++++++++++---- .../DateTieredCompactionStrategyTest.java | 43 ++++++++++++++++++++ 3 files changed, 67 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d0c4e78/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a956eb6..1afe6fe 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.14: + * More aggressive check for expired sstables in DTCS (CASSANDRA-8359) * Don't set clientMode to true when bulk-loading sstables to avoid a NullPointerException (CASSANDRA-8238) * Fix ignored index_interval change in ALTER TABLE statements (CASSANDRA-7976) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d0c4e78/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java index 6b3e800..cfa9c8a 100644 --- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java @@ -53,7 +53,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy while (true) { - List<SSTableReader> latestBucket = getNextBackgroundSStables(gcBefore); + List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore); if (latestBucket.isEmpty()) return null; @@ -68,24 +68,39 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy * @param gcBefore * @return */ - private List<SSTableReader> getNextBackgroundSStables(final int gcBefore) + private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore) { if (!isEnabled() || cfs.getSSTables().isEmpty()) return Collections.emptyList(); + Set<SSTableReader> uncompacting = cfs.getUncompactingSSTables(); + + // Find fully expired SSTables. Those will be included no matter what. + Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingSSTables(uncompacting), gcBefore); + Set<SSTableReader> candidates = Sets.newHashSet(filterSuspectSSTables(uncompacting)); + + List<SSTableReader> compactionCandidates = new ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired), gcBefore)); + if (!expired.isEmpty()) + { + logger.debug("Including expired sstables: {}", expired); + compactionCandidates.addAll(expired); + } + return compactionCandidates; + } + + private List<SSTableReader> getNextNonExpiredSSTables(Iterable<SSTableReader> nonExpiringSSTables, final int gcBefore) + { int base = cfs.getMinimumCompactionThreshold(); long now = getNow(); - Iterable<SSTableReader> candidates = filterSuspectSSTables(cfs.getUncompactingSSTables()); - - List<SSTableReader> mostInteresting = getCompactionCandidates(candidates, now, base); + List<SSTableReader> mostInteresting = getCompactionCandidates(nonExpiringSSTables, now, base); if (mostInteresting != null) return mostInteresting; // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone // ratio is greater than threshold. List<SSTableReader> sstablesWithTombstones = Lists.newArrayList(); - for (SSTableReader sstable : candidates) + for (SSTableReader sstable : nonExpiringSSTables) { if (worthDroppingTombstones(sstable, gcBefore)) sstablesWithTombstones.add(sstable); @@ -106,8 +121,8 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy List<SSTableReader> mostInteresting = newestBucket(buckets, cfs.getMinimumCompactionThreshold(), cfs.getMaximumCompactionThreshold(), - options.baseTime, - now); + now, + options.baseTime); if (!mostInteresting.isEmpty()) return mostInteresting; return null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d0c4e78/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java index f98e372..1fa41a3 100644 --- a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java @@ -273,4 +273,47 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader filtered = filterOldSSTables(sstrs, 1, 4); assertEquals("no sstables should remain when all are too old", 0, Iterables.size(filtered)); } + + + @Test + public void testDropExpiredSSTables() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + ByteBuffer value = ByteBuffer.wrap(new byte[100]); + + // create 2 sstables + DecoratedKey key = Util.dk(String.valueOf("expired")); + RowMutation rm = new RowMutation(KEYSPACE1, key.key); + rm.add(CF_STANDARD1, ByteBufferUtil.bytes("column"), value, System.currentTimeMillis(), 5); + rm.apply(); + cfs.forceBlockingFlush(); + SSTableReader expiredSSTable = cfs.getSSTables().iterator().next(); + Thread.sleep(10); + key = Util.dk(String.valueOf("nonexpired")); + rm = new RowMutation(KEYSPACE1, key.key); + rm.add(CF_STANDARD1, ByteBufferUtil.bytes("column"), value, System.currentTimeMillis()); + rm.apply(); + cfs.forceBlockingFlush(); + assertEquals(cfs.getSSTables().size(), 2); + + Map<String, String> options = new HashMap<>(); + + options.put(DateTieredCompactionStrategyOptions.BASE_TIME_KEY, "30"); + options.put(DateTieredCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "MILLISECONDS"); + options.put(DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY, Double.toString((1d / (24 * 60 * 60)))); + DateTieredCompactionStrategy dtcs = new DateTieredCompactionStrategy(cfs, options); + dtcs.startup(); + assertNull(dtcs.getNextBackgroundTask((int) (System.currentTimeMillis() / 1000))); + Thread.sleep(7000); + AbstractCompactionTask t = dtcs.getNextBackgroundTask((int) (System.currentTimeMillis()/1000)); + assertNotNull(t); + assertEquals(1, Iterables.size(t.sstables)); + SSTableReader sstable = t.sstables.iterator().next(); + assertEquals(sstable, expiredSSTable); + } + }