Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 95feab663 -> 073f06262
Limit size of windows with DTCS Patch by marcuse; reviewed by Branimir Lambov for CASSANDRA-10280 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/99617a52 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/99617a52 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/99617a52 Branch: refs/heads/cassandra-2.2 Commit: 99617a529378f00cb86ab733959c7be9966860c9 Parents: 4f2337f Author: Marcus Eriksson <marc...@apache.org> Authored: Tue Sep 8 13:50:16 2015 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Fri Nov 20 10:59:46 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + pylib/cqlshlib/cql3handling.py | 1 + .../DateTieredCompactionStrategy.java | 33 +++++++++-------- .../DateTieredCompactionStrategyOptions.java | 30 ++++++++++++++-- .../DateTieredCompactionStrategyTest.java | 38 +++++++++++++------- 5 files changed, 73 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/99617a52/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 94a9ae2..66423c7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.12 + * Limit window size in DTCS (CASSANDRA-10280) * sstableloader does not use MAX_HEAP_SIZE env parameter (CASSANDRA-10188) * (cqlsh) Improve COPY TO performance and error handling (CASSANDRA-9304) * Don't remove level info when running upgradesstables (CASSANDRA-10692) http://git-wip-us.apache.org/repos/asf/cassandra/blob/99617a52/pylib/cqlshlib/cql3handling.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py index 49970e4..38f118f 100644 --- a/pylib/cqlshlib/cql3handling.py +++ b/pylib/cqlshlib/cql3handling.py @@ -472,6 +472,7 @@ def cf_prop_val_mapkey_completer(ctxt, cass): opts.add('max_sstable_age_days') opts.add('timestamp_resolution') opts.add('min_threshold') + opts.add('max_window_size_seconds') return map(escape_value, opts) return () http://git-wip-us.apache.org/repos/asf/cassandra/blob/99617a52/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java index ece596f..ae684ec 100644 --- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java @@ -131,7 +131,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy { Iterable<SSTableReader> candidates = filterOldSSTables(Lists.newArrayList(candidateSSTables), options.maxSSTableAge, now); - List<List<SSTableReader>> buckets = getBuckets(createSSTableAndMinTimestampPairs(candidates), options.baseTime, base, now); + List<List<SSTableReader>> buckets = getBuckets(createSSTableAndMinTimestampPairs(candidates), options.baseTime, base, now, options.maxWindowSize); logger.debug("Compaction buckets are {}", buckets); updateEstimatedCompactionsByTasks(buckets); List<SSTableReader> mostInteresting = newestBucket(buckets, @@ -139,6 +139,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy cfs.getMaximumCompactionThreshold(), now, options.baseTime, + options.maxWindowSize, stcsOptions); if (!mostInteresting.isEmpty()) return mostInteresting; @@ -217,10 +218,13 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy // A timestamp t hits the target iff t / size == divPosition. public final long divPosition; - public Target(long size, long divPosition) + public final long maxWindowSize; + + public Target(long size, long divPosition, long maxWindowSize) { this.size = size; this.divPosition = divPosition; + this.maxWindowSize = maxWindowSize; } /** @@ -250,10 +254,10 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy */ public Target nextTarget(int base) { - if (divPosition % base > 0) - return new Target(size, divPosition - 1); + if (divPosition % base > 0 || size * base > maxWindowSize) + return new Target(size, divPosition - 1, maxWindowSize); else - return new Target(size * base, divPosition / base - 1); + return new Target(size * base, divPosition / base - 1, maxWindowSize); } } @@ -270,7 +274,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy * Each bucket is also a list of files ordered from newest to oldest. */ @VisibleForTesting - static <T> List<List<T>> getBuckets(Collection<Pair<T, Long>> files, long timeUnit, int base, long now) + static <T> List<List<T>> getBuckets(Collection<Pair<T, Long>> files, long timeUnit, int base, long now, long maxWindowSize) { // Sort files by age. Newest first. final List<Pair<T, Long>> sortedFiles = Lists.newArrayList(files); @@ -283,7 +287,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy })); List<List<T>> buckets = Lists.newArrayList(); - Target target = getInitialTarget(now, timeUnit); + Target target = getInitialTarget(now, timeUnit, maxWindowSize); PeekingIterator<Pair<T, Long>> it = Iterators.peekingIterator(sortedFiles.iterator()); outerLoop: @@ -302,7 +306,6 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy else // If the file is too old for the target, switch targets. target = target.nextTarget(base); } - List<T> bucket = Lists.newArrayList(); while (target.onTarget(it.peek().right)) { @@ -318,9 +321,9 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy } @VisibleForTesting - static Target getInitialTarget(long now, long timeUnit) + static Target getInitialTarget(long now, long timeUnit, long maxWindowSize) { - return new Target(timeUnit, now / timeUnit); + return new Target(timeUnit, now / timeUnit, maxWindowSize); } @@ -329,8 +332,9 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy int n = 0; for (List<SSTableReader> bucket : tasks) { - if (bucket.size() >= cfs.getMinimumCompactionThreshold()) - n += getSTCSBuckets(bucket, stcsOptions).size(); + for (List<SSTableReader> stcsBucket : getSTCSBuckets(bucket, stcsOptions)) + if (stcsBucket.size() >= cfs.getMinimumCompactionThreshold()) + n += Math.ceil((double)stcsBucket.size() / cfs.getMaximumCompactionThreshold()); } estimatedRemainingTasks = n; } @@ -343,12 +347,12 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy * @return a bucket (list) of sstables to compact. */ @VisibleForTesting - static List<SSTableReader> newestBucket(List<List<SSTableReader>> buckets, int minThreshold, int maxThreshold, long now, long baseTime, SizeTieredCompactionStrategyOptions stcsOptions) + static List<SSTableReader> newestBucket(List<List<SSTableReader>> buckets, int minThreshold, int maxThreshold, long now, long baseTime, long maxWindowSize, SizeTieredCompactionStrategyOptions stcsOptions) { // If the "incoming window" has at least minThreshold SSTables, choose that one. // For any other bucket, at least 2 SSTables is enough. // In any case, limit to maxThreshold SSTables. - Target incomingWindow = getInitialTarget(now, baseTime); + Target incomingWindow = getInitialTarget(now, baseTime, maxWindowSize); for (List<SSTableReader> bucket : buckets) { boolean inFirstWindow = incomingWindow.onTarget(bucket.get(0).getMinTimestamp()); @@ -412,7 +416,6 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy return Long.MAX_VALUE; } - public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException { Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options); http://git-wip-us.apache.org/repos/asf/cassandra/blob/99617a52/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java index 0cbf90e..5803115 100644 --- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java @@ -25,17 +25,24 @@ import org.apache.cassandra.exceptions.ConfigurationException; public final class DateTieredCompactionStrategyOptions { protected static final TimeUnit DEFAULT_TIMESTAMP_RESOLUTION = TimeUnit.MICROSECONDS; - protected static final double DEFAULT_MAX_SSTABLE_AGE_DAYS = 365; + @Deprecated + protected static final double DEFAULT_MAX_SSTABLE_AGE_DAYS = 365*1000; protected static final long DEFAULT_BASE_TIME_SECONDS = 60; + protected static final long DEFAULT_MAX_WINDOW_SIZE_SECONDS = TimeUnit.SECONDS.convert(1, TimeUnit.DAYS); + protected static final int DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS = 60 * 10; protected static final String TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution"; + @Deprecated protected static final String MAX_SSTABLE_AGE_KEY = "max_sstable_age_days"; protected static final String BASE_TIME_KEY = "base_time_seconds"; protected static final String EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY = "expired_sstable_check_frequency_seconds"; + protected static final String MAX_WINDOW_SIZE_KEY = "max_window_size_seconds"; + @Deprecated protected final long maxSSTableAge; protected final long baseTime; protected final long expiredSSTableCheckFrequency; + protected final long maxWindowSize; public DateTieredCompactionStrategyOptions(Map<String, String> options) { @@ -48,13 +55,16 @@ public final class DateTieredCompactionStrategyOptions baseTime = timestampResolution.convert(optionValue == null ? DEFAULT_BASE_TIME_SECONDS : Long.parseLong(optionValue), TimeUnit.SECONDS); optionValue = options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY); expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(optionValue == null ? DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS : Long.parseLong(optionValue), TimeUnit.SECONDS); + optionValue = options.get(MAX_WINDOW_SIZE_KEY); + maxWindowSize = timestampResolution.convert(optionValue == null ? DEFAULT_MAX_WINDOW_SIZE_SECONDS : Long.parseLong(optionValue), TimeUnit.SECONDS); } public DateTieredCompactionStrategyOptions() { - maxSSTableAge = Math.round(DEFAULT_MAX_SSTABLE_AGE_DAYS * DEFAULT_TIMESTAMP_RESOLUTION.convert(1, TimeUnit.DAYS)); + maxSSTableAge = Math.round(DEFAULT_MAX_SSTABLE_AGE_DAYS * DEFAULT_TIMESTAMP_RESOLUTION.convert((long) DEFAULT_MAX_SSTABLE_AGE_DAYS, TimeUnit.DAYS)); baseTime = DEFAULT_TIMESTAMP_RESOLUTION.convert(DEFAULT_BASE_TIME_SECONDS, TimeUnit.SECONDS); expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS, TimeUnit.SECONDS); + maxWindowSize = DEFAULT_TIMESTAMP_RESOLUTION.convert(1, TimeUnit.DAYS); } public static Map<String, String> validateOptions(Map<String, String> options, Map<String, String> uncheckedOptions) throws ConfigurationException @@ -112,10 +122,26 @@ public final class DateTieredCompactionStrategyOptions throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY), e); } + optionValue = options.get(MAX_WINDOW_SIZE_KEY); + try + { + long maxWindowSize = optionValue == null ? DEFAULT_MAX_WINDOW_SIZE_SECONDS : Long.parseLong(optionValue); + if (maxWindowSize < 0) + { + throw new ConfigurationException(String.format("%s must not be negative, but was %d", MAX_WINDOW_SIZE_KEY, maxWindowSize)); + } + } + catch (NumberFormatException e) + { + throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, MAX_WINDOW_SIZE_KEY), e); + } + + uncheckedOptions.remove(MAX_SSTABLE_AGE_KEY); uncheckedOptions.remove(BASE_TIME_KEY); uncheckedOptions.remove(TIMESTAMP_RESOLUTION_KEY); uncheckedOptions.remove(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY); + uncheckedOptions.remove(MAX_WINDOW_SIZE_KEY); return uncheckedOptions; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/99617a52/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java index 368101d..5afd575 100644 --- a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java @@ -88,6 +88,17 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader options.put(DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY, "0"); } + try + { + options.put(DateTieredCompactionStrategyOptions.MAX_WINDOW_SIZE_KEY, "-1"); + validateOptions(options); + fail(String.format("Negative %s should be rejected", DateTieredCompactionStrategyOptions.MAX_WINDOW_SIZE_KEY)); + } + catch (ConfigurationException e) + { + options.put(DateTieredCompactionStrategyOptions.MAX_WINDOW_SIZE_KEY, "0"); + } + options.put("bad_option", "1.0"); unvalidated = validateOptions(options); assertTrue(unvalidated.containsKey("bad_option")); @@ -101,11 +112,11 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader options.put(DateTieredCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "SECONDS"); DateTieredCompactionStrategyOptions opts = new DateTieredCompactionStrategyOptions(options); - assertEquals(opts.maxSSTableAge, TimeUnit.SECONDS.convert(365, TimeUnit.DAYS)); + assertEquals(opts.maxSSTableAge, TimeUnit.SECONDS.convert(365*1000, TimeUnit.DAYS)); options.put(DateTieredCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "MILLISECONDS"); opts = new DateTieredCompactionStrategyOptions(options); - assertEquals(opts.maxSSTableAge, TimeUnit.MILLISECONDS.convert(365, TimeUnit.DAYS)); + assertEquals(opts.maxSSTableAge, TimeUnit.MILLISECONDS.convert(365*1000, TimeUnit.DAYS)); options.put(DateTieredCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "MICROSECONDS"); options.put(DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY, "10"); @@ -132,7 +143,7 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader Pair.create("a", 1L), Pair.create("b", 201L) ); - List<List<String>> buckets = getBuckets(pairs, 100L, 2, 200L); + List<List<String>> buckets = getBuckets(pairs, 100L, 2, 200L, Long.MAX_VALUE); assertEquals(2, buckets.size()); for (List<String> bucket : buckets) @@ -151,7 +162,7 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader Pair.create("b", 3899L), Pair.create("c", 3900L) ); - buckets = getBuckets(pairs, 100L, 3, 4050L); + buckets = getBuckets(pairs, 100L, 3, 4050L, Long.MAX_VALUE); // targets (divPosition, size): (40, 100), (39, 100), (12, 300), (3, 900), (0, 2700) // in other words: 0 - 2699, 2700 - 3599, 3600 - 3899, 3900 - 3999, 4000 - 4099 assertEquals(3, buckets.size()); @@ -177,7 +188,7 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader Pair.create("e", 3950L), Pair.create("too new", 4125L) ); - buckets = getBuckets(pairs, 100L, 1, 4050L); + buckets = getBuckets(pairs, 100L, 1, 4050L, Long.MAX_VALUE); assertEquals(5, buckets.size()); @@ -193,7 +204,6 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); - cfs.truncateBlocking(); cfs.disableAutoCompaction(); ByteBuffer value = ByteBuffer.wrap(new byte[100]); @@ -212,15 +222,16 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables()); - List<SSTableReader> newBucket = newestBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32, 9, 10, new SizeTieredCompactionStrategyOptions()); + List<SSTableReader> newBucket = newestBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32, 9, 10, Long.MAX_VALUE, new SizeTieredCompactionStrategyOptions()); assertTrue("incoming bucket should not be accepted when it has below the min threshold SSTables", newBucket.isEmpty()); - newBucket = newestBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32, 10, 10, new SizeTieredCompactionStrategyOptions()); + newBucket = newestBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32, 10, 10, Long.MAX_VALUE, new SizeTieredCompactionStrategyOptions()); assertFalse("non-incoming bucket should be accepted when it has at least 2 SSTables", newBucket.isEmpty()); assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(0).getMinTimestamp(), sstrs.get(0).getMaxTimestamp()); assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(1).getMinTimestamp(), sstrs.get(1).getMaxTimestamp()); assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(2).getMinTimestamp(), sstrs.get(2).getMaxTimestamp()); + cfs.truncateBlocking(); } @Test @@ -228,7 +239,6 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); - cfs.truncateBlocking(); cfs.disableAutoCompaction(); ByteBuffer value = ByteBuffer.wrap(new byte[100]); @@ -259,6 +269,7 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader filtered = filterOldSSTables(sstrs, 1, 4); assertEquals("no sstables should remain when all are too old", 0, Iterables.size(filtered)); + cfs.truncateBlocking(); } @@ -267,7 +278,6 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); - cfs.truncateBlocking(); cfs.disableAutoCompaction(); ByteBuffer value = ByteBuffer.wrap(new byte[100]); @@ -305,6 +315,7 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader SSTableReader sstable = t.sstables.iterator().next(); assertEquals(sstable, expiredSSTable); cfs.getDataTracker().unmarkCompacting(cfs.getSSTables()); + cfs.truncateBlocking(); } @Test @@ -312,7 +323,6 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); - cfs.truncateBlocking(); cfs.disableAutoCompaction(); ByteBuffer bigValue = ByteBuffer.wrap(new byte[10000]); ByteBuffer value = ByteBuffer.wrap(new byte[100]); @@ -344,7 +354,9 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader DateTieredCompactionStrategy dtcs = new DateTieredCompactionStrategy(cfs, options); for (SSTableReader sstable : cfs.getSSTables()) dtcs.addSSTable(sstable); - assertEquals(20, dtcs.getNextBackgroundTask(0).sstables.size()); + AbstractCompactionTask task = dtcs.getNextBackgroundTask(0); + assertEquals(20, task.sstables.size()); + cfs.getDataTracker().unmarkCompacting(task.sstables); + cfs.truncateBlocking(); } - }