Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 d856d3d7b -> 27ca4915e
Do size tiered compaction in date tiered compaction windows Patch by marcuse; reviewed by Jeff Jirsa for CASSANDRA-10276 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cedcf07c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cedcf07c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cedcf07c Branch: refs/heads/cassandra-3.0 Commit: cedcf07c542235815c023b66f151ad8c7aa9ba9a Parents: 78810f2 Author: Marcus Eriksson <marc...@apache.org> Authored: Mon Sep 7 10:39:15 2015 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Oct 28 08:40:12 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../DateTieredCompactionStrategy.java | 46 ++++++++++------ .../DateTieredCompactionStrategyTest.java | 57 ++++++++++++++------ 3 files changed, 72 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cedcf07c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2ca3b43..5b46eac 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.12 + * Do STCS in DTCS windows (CASSANDRA-10276) * Don't try to get ancestors from half-renamed sstables (CASSANDRA-10501) * Avoid repetition of JVM_OPTS in debian package (CASSANDRA-10251) * Fix potential NPE from handling result of SIM.highestSelectivityIndex (CASSANDRA-10550) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cedcf07c/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java index a8e2aff..ece596f 100644 --- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java @@ -18,7 +18,6 @@ package org.apache.cassandra.db.compaction; import java.util.*; -import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; @@ -40,6 +39,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy protected volatile int estimatedRemainingTasks; private final Set<SSTableReader> sstables = new HashSet<>(); private long lastExpiredCheck; + private final SizeTieredCompactionStrategyOptions stcsOptions; public DateTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options) { @@ -54,6 +54,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy else logger.debug("Enabling tombstone compactions for DTCS"); + this.stcsOptions = new SizeTieredCompactionStrategyOptions(options); } @Override @@ -137,7 +138,8 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy cfs.getMinimumCompactionThreshold(), cfs.getMaximumCompactionThreshold(), now, - options.baseTime); + options.baseTime, + stcsOptions); if (!mostInteresting.isEmpty()) return mostInteresting; return null; @@ -328,7 +330,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy for (List<SSTableReader> bucket : tasks) { if (bucket.size() >= cfs.getMinimumCompactionThreshold()) - n += Math.ceil((double)bucket.size() / cfs.getMaximumCompactionThreshold()); + n += getSTCSBuckets(bucket, stcsOptions).size(); } estimatedRemainingTasks = n; } @@ -341,7 +343,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy * @return a bucket (list) of sstables to compact. */ @VisibleForTesting - static List<SSTableReader> newestBucket(List<List<SSTableReader>> buckets, int minThreshold, int maxThreshold, long now, long baseTime) + static List<SSTableReader> newestBucket(List<List<SSTableReader>> buckets, int minThreshold, int maxThreshold, long now, long baseTime, SizeTieredCompactionStrategyOptions stcsOptions) { // If the "incoming window" has at least minThreshold SSTables, choose that one. // For any other bucket, at least 2 SSTables is enough. @@ -349,23 +351,31 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy Target incomingWindow = getInitialTarget(now, baseTime); for (List<SSTableReader> bucket : buckets) { - if (bucket.size() >= minThreshold || - (bucket.size() >= 2 && !incomingWindow.onTarget(bucket.get(0).getMinTimestamp()))) - return trimToThreshold(bucket, maxThreshold); + boolean inFirstWindow = incomingWindow.onTarget(bucket.get(0).getMinTimestamp()); + if (bucket.size() >= minThreshold || (bucket.size() >= 2 && !inFirstWindow)) + { + List<SSTableReader> stcsSSTables = getSSTablesForSTCS(bucket, inFirstWindow ? minThreshold : 2, maxThreshold, stcsOptions); + if (!stcsSSTables.isEmpty()) + return stcsSSTables; + } } return Collections.emptyList(); } - /** - * @param bucket list of sstables, ordered from newest to oldest by getMinTimestamp(). - * @param maxThreshold maximum number of sstables in a single compaction task. - * @return A bucket trimmed to the <code>maxThreshold</code> newest sstables. - */ - @VisibleForTesting - static List<SSTableReader> trimToThreshold(List<SSTableReader> bucket, int maxThreshold) + private static List<SSTableReader> getSSTablesForSTCS(Collection<SSTableReader> sstables, int minThreshold, int maxThreshold, SizeTieredCompactionStrategyOptions stcsOptions) { - // Trim the oldest sstables off the end to meet the maxThreshold - return bucket.subList(0, Math.min(bucket.size(), maxThreshold)); + List<SSTableReader> s = SizeTieredCompactionStrategy.mostInterestingBucket(getSTCSBuckets(sstables, stcsOptions), minThreshold, maxThreshold); + logger.debug("Got sstables {} for STCS from {}", s, sstables); + return s; + } + + private static List<List<SSTableReader>> getSTCSBuckets(Collection<SSTableReader> sstables, SizeTieredCompactionStrategyOptions stcsOptions) + { + List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(AbstractCompactionStrategy.filterSuspectSSTables(sstables)); + return SizeTieredCompactionStrategy.getBuckets(pairs, + stcsOptions.bucketHigh, + stcsOptions.bucketLow, + stcsOptions.minSSTableSize); } @Override @@ -375,7 +385,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy if (sstables == null) return null; - return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, sstables, gcBefore, false)); + return Collections.<AbstractCompactionTask>singleton(new CompactionTask(cfs, sstables, gcBefore, false)); } @Override @@ -411,6 +421,8 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy uncheckedOptions.remove(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD); uncheckedOptions.remove(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD); + uncheckedOptions = SizeTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions); + return uncheckedOptions; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cedcf07c/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java index f05bf44..368101d 100644 --- a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java @@ -39,7 +39,6 @@ import org.apache.cassandra.utils.Pair; import static org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.getBuckets; import static org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.newestBucket; -import static org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.trimToThreshold; import static org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.filterOldSSTables; import static org.apache.cassandra.db.compaction.DateTieredCompactionStrategy.validateOptions; @@ -213,27 +212,15 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables()); - List<SSTableReader> newBucket = newestBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32, 9, 10); + List<SSTableReader> newBucket = newestBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32, 9, 10, new SizeTieredCompactionStrategyOptions()); assertTrue("incoming bucket should not be accepted when it has below the min threshold SSTables", newBucket.isEmpty()); - newBucket = newestBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32, 10, 10); + newBucket = newestBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32, 10, 10, new SizeTieredCompactionStrategyOptions()); assertFalse("non-incoming bucket should be accepted when it has at least 2 SSTables", newBucket.isEmpty()); assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(0).getMinTimestamp(), sstrs.get(0).getMaxTimestamp()); assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(1).getMinTimestamp(), sstrs.get(1).getMaxTimestamp()); assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(2).getMinTimestamp(), sstrs.get(2).getMaxTimestamp()); - - // if we have more than the max threshold, the oldest should be dropped - Collections.sort(sstrs, Collections.reverseOrder(new Comparator<SSTableReader>() { - public int compare(SSTableReader o1, SSTableReader o2) { - return Long.compare(o1.getMinTimestamp(), o2.getMinTimestamp()) ; - } - })); - - List<SSTableReader> bucket = trimToThreshold(sstrs, 2); - assertEquals("one bucket should have been dropped", 2, bucket.size()); - for (SSTableReader sstr : bucket) - assertFalse("the oldest sstable should be dropped", sstr.getMinTimestamp() == 0); } @Test @@ -320,4 +307,44 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader cfs.getDataTracker().unmarkCompacting(cfs.getSSTables()); } + @Test + public void testSTCSBigWindow() + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + ByteBuffer bigValue = ByteBuffer.wrap(new byte[10000]); + ByteBuffer value = ByteBuffer.wrap(new byte[100]); + int numSSTables = 40; + // create big sstabels out of half: + long timestamp = System.currentTimeMillis(); + for (int r = 0; r < numSSTables / 2; r++) + { + for (int i = 0; i < 10; i++) + { + DecoratedKey key = Util.dk(String.valueOf(r)); + Mutation rm = new Mutation(KEYSPACE1, key.getKey()); + rm.add(CF_STANDARD1, Util.cellname("column"), bigValue, timestamp); + rm.apply(); + } + cfs.forceBlockingFlush(); + } + // and small ones: + for (int r = 0; r < numSSTables / 2; r++) + { + DecoratedKey key = Util.dk(String.valueOf(r)); + Mutation rm = new Mutation(KEYSPACE1, key.getKey()); + rm.add(CF_STANDARD1, Util.cellname("column"), value, timestamp); + rm.apply(); + cfs.forceBlockingFlush(); + } + Map<String, String> options = new HashMap<>(); + options.put(SizeTieredCompactionStrategyOptions.MIN_SSTABLE_SIZE_KEY, "1"); + DateTieredCompactionStrategy dtcs = new DateTieredCompactionStrategy(cfs, options); + for (SSTableReader sstable : cfs.getSSTables()) + dtcs.addSSTable(sstable); + assertEquals(20, dtcs.getNextBackgroundTask(0).sstables.size()); + } + }