Add TimeWindowCompactionStrategy Patch by Jeff Jirsa; reviewed by marcuse for CASSANDRA-9666
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c867f00 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c867f00 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c867f00 Branch: refs/heads/trunk Commit: 6c867f00309a61af12fa452020c45dc0f2748aa1 Parents: 040ac66 Author: Jeff Jirsa <j...@jeffjirsa.net> Authored: Tue May 24 20:21:22 2016 -0700 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Jun 7 07:38:22 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 4 + NEWS.txt | 13 + doc/cql3/CQL.textile | 5 +- pylib/cqlshlib/cql3handling.py | 7 + pylib/cqlshlib/cqlhandling.py | 3 +- pylib/cqlshlib/test/test_cqlsh_completion.py | 11 +- .../DateTieredCompactionStrategy.java | 4 + .../TimeWindowCompactionStrategy.java | 380 +++++++++++++++++++ .../TimeWindowCompactionStrategyOptions.java | 148 ++++++++ .../db/compaction/CompactionsCQLTest.java | 13 + .../TimeWindowCompactionStrategyTest.java | 274 +++++++++++++ 11 files changed, 859 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 201a36c..cdbaebb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,7 @@ +3.0.8 + * Add TimeWindowCompactionStrategy (CASSANDRA-9666) + + 3.0.7 * Fix legacy serialization of Thrift-generated non-compound range tombstones when communicating with 2.x nodes (CASSANDRA-11930) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index ac1ef17..dbaece1 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -24,6 +24,19 @@ Upgrading value of native_transport_max_frame_size_in_mb. SSTables will be considered corrupt if they contain values whose size exceeds this limit. See CASSANDRA-9530 for more details. +Deprecation +----------- + - DateTieredCompactionStrategy has been deprecated - new tables should use + TimeWindowCompactionStrategy. Note that migrating an existing DTCS-table to TWCS might + cause increased compaction load for a while after the migration so make sure you run + tests before migrating. Read CASSANDRA-9666 for background on this. + +New features +------------ + - TimeWindowCompactionStrategy has been added. This has proven to be a better approach + to time series compaction and new tables should use this instead of DTCS. See + CASSANDRA-9666 for details. + 3.0.6 ===== http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/doc/cql3/CQL.textile ---------------------------------------------------------------------- diff --git a/doc/cql3/CQL.textile b/doc/cql3/CQL.textile index 059b195..2a37452 100644 --- a/doc/cql3/CQL.textile +++ b/doc/cql3/CQL.textile @@ -346,7 +346,7 @@ Table creation supports the following other @<property>@: h4(#compactionOptions). Compaction options -The @compaction@ property must at least define the @'class'@ sub-option, that defines the compaction strategy class to use. The default supported class are @'SizeTieredCompactionStrategy'@, @'LeveledCompactionStrategy'@ and @'DateTieredCompactionStrategy'@. Custom strategy can be provided by specifying the full class name as a "string constant":#constants. The rest of the sub-options depends on the chosen class. The sub-options supported by the default classes are: +The @compaction@ property must at least define the @'class'@ sub-option, that defines the compaction strategy class to use. The default supported class are @'SizeTieredCompactionStrategy'@, @'LeveledCompactionStrategy'@, @'DateTieredCompactionStrategy'@ and @'TimeWindowCompactionStrategy'@. Custom strategy can be provided by specifying the full class name as a "string constant":#constants. The rest of the sub-options depends on the chosen class. The sub-options supported by the default classes are: |_. option |_. supported compaction strategy |_. default |_. description | | @enabled@ | _all_ | true | A boolean denoting whether compaction should be enabled or not.| @@ -362,6 +362,9 @@ The @compaction@ property must at least define the @'class'@ sub-option, that de | @timestamp_resolution@ | DateTieredCompactionStrategy | MICROSECONDS | The timestamp resolution used when inserting data, could be MILLISECONDS, MICROSECONDS etc (should be understandable by Java TimeUnit) - don't change this unless you do mutations with USING TIMESTAMP <non_microsecond_timestamps> (or equivalent directly in the client)| | @base_time_seconds@ | DateTieredCompactionStrategy | 60 | The base size of the time windows. | | @max_sstable_age_days@ | DateTieredCompactionStrategy | 365 | SSTables only containing data that is older than this will never be compacted. | +| @timestamp_resolution@ | TimeWindowCompactionStrategy | MICROSECONDS | The timestamp resolution used when inserting data, could be MILLISECONDS, MICROSECONDS etc (should be understandable by Java TimeUnit) - don't change this unless you do mutations with USING TIMESTAMP <non_microsecond_timestamps> (or equivalent directly in the client)| +| @compaction_window_unit@ | TimeWindowCompactionStrategy | DAYS | The Java TimeUnit used for the window size, set in conjunction with @compaction_window_size@. Must be one of DAYS, HOURS, MINUTES | +| @compaction_window_size@ | TimeWindowCompactionStrategy | 1 | The number of @compaction_window_unit@ units that make up a time window. | h4(#compressionOptions). Compression options http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/pylib/cqlshlib/cql3handling.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py index fd04f64..9008514 100644 --- a/pylib/cqlshlib/cql3handling.py +++ b/pylib/cqlshlib/cql3handling.py @@ -515,6 +515,13 @@ def cf_prop_val_mapkey_completer(ctxt, cass): opts.add('min_threshold') opts.add('max_window_size_seconds') opts.add('timestamp_resolution') + elif csc == 'TimeWindowCompactionStrategy': + opts.add('compaction_window_unit') + opts.add('compaction_window_size') + opts.add('min_threshold') + opts.add('max_threshold') + opts.add('timestamp_resolution') + return map(escape_value, opts) return () http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/pylib/cqlshlib/cqlhandling.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/cqlhandling.py b/pylib/cqlshlib/cqlhandling.py index a8a0ba8..51d9726 100644 --- a/pylib/cqlshlib/cqlhandling.py +++ b/pylib/cqlshlib/cqlhandling.py @@ -35,7 +35,8 @@ class CqlParsingRuleSet(pylexotron.ParsingRuleSet): available_compaction_classes = ( 'LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', - 'DateTieredCompactionStrategy' + 'DateTieredCompactionStrategy', + 'TimeWindowCompactionStrategy' ) replication_strategies = ( http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/pylib/cqlshlib/test/test_cqlsh_completion.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/test/test_cqlsh_completion.py b/pylib/cqlshlib/test/test_cqlsh_completion.py index 0f0cc4d..e736ea7 100644 --- a/pylib/cqlshlib/test/test_cqlsh_completion.py +++ b/pylib/cqlshlib/test/test_cqlsh_completion.py @@ -617,7 +617,8 @@ class TestCqlshCompletion(CqlshCompletionCase): + "{'class': '", choices=['SizeTieredCompactionStrategy', 'LeveledCompactionStrategy', - 'DateTieredCompactionStrategy']) + 'DateTieredCompactionStrategy', + 'TimeWindowCompactionStrategy']) self.trycompletions(prefix + " new_table (col_a int PRIMARY KEY) WITH compaction = " + "{'class': 'S", immediate="izeTieredCompactionStrategy'") @@ -660,6 +661,14 @@ class TestCqlshCompletion(CqlshCompletionCase): 'tombstone_compaction_interval', 'tombstone_threshold', 'enabled', 'unchecked_tombstone_compaction', 'max_window_size_seconds', 'only_purge_repaired_tombstones']) + self.trycompletions(prefix + " new_table (col_a int PRIMARY KEY) WITH compaction = " + + "{'class': 'TimeWindowCompactionStrategy', '", + choices=['compaction_window_unit', 'compaction_window_size', + 'timestamp_resolution', 'min_threshold', 'class', 'max_threshold', + 'tombstone_compaction_interval', 'tombstone_threshold', + 'enabled', 'unchecked_tombstone_compaction', + 'only_purge_repaired_tombstones']) + def test_complete_in_create_columnfamily(self): self.trycompletions('CREATE C', choices=['COLUMNFAMILY', 'CUSTOM']) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java index 1addd0d..8571906 100644 --- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java @@ -35,6 +35,10 @@ import org.apache.cassandra.utils.Pair; import static com.google.common.collect.Iterables.filter; +/** + * @deprecated in favour of {@link TimeWindowCompactionStrategy} + */ +@Deprecated public class DateTieredCompactionStrategy extends AbstractCompactionStrategy { private static final Logger logger = LoggerFactory.getLogger(DateTieredCompactionStrategy.class); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java new file mode 100644 index 0000000..d1630c5 --- /dev/null +++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.CompactionParams; +import org.apache.cassandra.utils.Pair; + +import static com.google.common.collect.Iterables.filter; + +public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy +{ + private static final Logger logger = LoggerFactory.getLogger(TimeWindowCompactionStrategy.class); + + private final TimeWindowCompactionStrategyOptions options; + protected volatile int estimatedRemainingTasks; + private final Set<SSTableReader> sstables = new HashSet<>(); + private long lastExpiredCheck; + private long highestWindowSeen; + + public TimeWindowCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options) + { + super(cfs, options); + this.estimatedRemainingTasks = 0; + this.options = new TimeWindowCompactionStrategyOptions(options); + if (!options.containsKey(AbstractCompactionStrategy.TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.containsKey(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION)) + { + disableTombstoneCompactions = true; + logger.debug("Disabling tombstone compactions for TWCS"); + } + else + logger.debug("Enabling tombstone compactions for TWCS"); + + } + + @Override + public AbstractCompactionTask getNextBackgroundTask(int gcBefore) + { + while (true) + { + List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore); + + if (latestBucket.isEmpty()) + return null; + + LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION); + if (modifier != null) + return new CompactionTask(cfs, modifier, gcBefore); + } + } + + /** + * + * @param gcBefore + * @return + */ + private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore) + { + if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE))) + return Collections.emptyList(); + + Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains)); + + // Find fully expired SSTables. Those will be included no matter what. + Set<SSTableReader> expired = Collections.emptySet(); + + if (System.currentTimeMillis() - lastExpiredCheck > options.expiredSSTableCheckFrequency) + { + logger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables"); + expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingSSTables(SSTableSet.CANONICAL, uncompacting), gcBefore); + lastExpiredCheck = System.currentTimeMillis(); + } + else + { + logger.debug("TWCS skipping check for fully expired SSTables"); + } + + Set<SSTableReader> candidates = Sets.newHashSet(filterSuspectSSTables(uncompacting)); + + List<SSTableReader> compactionCandidates = new ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired), gcBefore)); + if (!expired.isEmpty()) + { + logger.debug("Including expired sstables: {}", expired); + compactionCandidates.addAll(expired); + } + + return compactionCandidates; + } + + private List<SSTableReader> getNextNonExpiredSSTables(Iterable<SSTableReader> nonExpiringSSTables, final int gcBefore) + { + List<SSTableReader> mostInteresting = getCompactionCandidates(nonExpiringSSTables); + + if (mostInteresting != null) + { + return mostInteresting; + } + + // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone + // ratio is greater than threshold. + List<SSTableReader> sstablesWithTombstones = new ArrayList<>(); + for (SSTableReader sstable : nonExpiringSSTables) + { + if (worthDroppingTombstones(sstable, gcBefore)) + sstablesWithTombstones.add(sstable); + } + if (sstablesWithTombstones.isEmpty()) + return Collections.emptyList(); + + return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator())); + } + + private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables) + { + Pair<HashMultimap<Long, SSTableReader>, Long> buckets = getBuckets(candidateSSTables, options.sstableWindowUnit, options.sstableWindowSize, options.timestampResolution); + // Update the highest window seen, if necessary + if(buckets.right > this.highestWindowSeen) + this.highestWindowSeen = buckets.right; + + updateEstimatedCompactionsByTasks(buckets.left); + List<SSTableReader> mostInteresting = newestBucket(buckets.left, + cfs.getMinimumCompactionThreshold(), + cfs.getMaximumCompactionThreshold(), + options.sstableWindowUnit, + options.sstableWindowSize, + options.stcsOptions, + this.highestWindowSeen); + if (!mostInteresting.isEmpty()) + return mostInteresting; + return null; + } + + @Override + public void addSSTable(SSTableReader sstable) + { + sstables.add(sstable); + } + + @Override + public void removeSSTable(SSTableReader sstable) + { + sstables.remove(sstable); + } + + /** + * Find the lowest and highest timestamps in a given timestamp/unit pair + * Returns milliseconds, caller should adjust accordingly + */ + public static Pair<Long,Long> getWindowBoundsInMillis(TimeUnit windowTimeUnit, int windowTimeSize, long timestampInMillis) + { + long lowerTimestamp; + long upperTimestamp; + long timestampInSeconds = TimeUnit.SECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS); + + switch(windowTimeUnit) + { + case MINUTES: + lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (60 * windowTimeSize)); + upperTimestamp = (lowerTimestamp + (60L * (windowTimeSize - 1L))) + 59L; + break; + case HOURS: + lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (3600 * windowTimeSize)); + upperTimestamp = (lowerTimestamp + (3600L * (windowTimeSize - 1L))) + 3599L; + break; + case DAYS: + default: + lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (86400 * windowTimeSize)); + upperTimestamp = (lowerTimestamp + (86400L * (windowTimeSize - 1L))) + 86399L; + break; + } + + return Pair.create(TimeUnit.MILLISECONDS.convert(lowerTimestamp, TimeUnit.SECONDS), + TimeUnit.MILLISECONDS.convert(upperTimestamp, TimeUnit.SECONDS)); + + } + + /** + * Group files with similar max timestamp into buckets. + * + * @param files pairs consisting of a file and its min timestamp + * @param sstableWindowUnit + * @param sstableWindowSize + * @param timestampResolution + * @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader), and the right is the highest timestamp seen + */ + @VisibleForTesting + static Pair<HashMultimap<Long, SSTableReader>, Long> getBuckets(Iterable<SSTableReader> files, TimeUnit sstableWindowUnit, int sstableWindowSize, TimeUnit timestampResolution) + { + HashMultimap<Long, SSTableReader> buckets = HashMultimap.create(); + + long maxTimestamp = 0; + // Create hash map to represent buckets + // For each sstable, add sstable to the time bucket + // Where the bucket is the file's max timestamp rounded to the nearest window bucket + for (SSTableReader f : files) + { + assert TimeWindowCompactionStrategyOptions.validTimestampTimeUnits.contains(timestampResolution); + long tStamp = TimeUnit.MILLISECONDS.convert(f.getMaxTimestamp(), timestampResolution); + Pair<Long,Long> bounds = getWindowBoundsInMillis(sstableWindowUnit, sstableWindowSize, tStamp); + buckets.put(bounds.left, f); + if (bounds.left > maxTimestamp) + maxTimestamp = bounds.left; + } + + logger.trace("buckets {}, max timestamp", buckets, maxTimestamp); + return Pair.create(buckets, maxTimestamp); + } + + private void updateEstimatedCompactionsByTasks(HashMultimap<Long, SSTableReader> tasks) + { + int n = 0; + long now = this.highestWindowSeen; + + for(Long key : tasks.keySet()) + { + // For current window, make sure it's compactable + if (key.compareTo(now) >= 0 && tasks.get(key).size() >= cfs.getMinimumCompactionThreshold()) + n++; + else if (key.compareTo(now) < 0 && tasks.get(key).size() >= 2) + n++; + } + this.estimatedRemainingTasks = n; + } + + + /** + * @param buckets list of buckets, sorted from newest to oldest, from which to return the newest bucket within thresholds. + * @param minThreshold minimum number of sstables in a bucket to qualify. + * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this). + * @return a bucket (list) of sstables to compact. + */ + @VisibleForTesting + static List<SSTableReader> newestBucket(HashMultimap<Long, SSTableReader> buckets, int minThreshold, int maxThreshold, TimeUnit sstableWindowUnit, int sstableWindowSize, SizeTieredCompactionStrategyOptions stcsOptions, long now) + { + // If the current bucket has at least minThreshold SSTables, choose that one. + // For any other bucket, at least 2 SSTables is enough. + // In any case, limit to maxThreshold SSTables. + + TreeSet<Long> allKeys = new TreeSet<>(buckets.keySet()); + + Iterator<Long> it = allKeys.descendingIterator(); + while(it.hasNext()) + { + Long key = it.next(); + Set<SSTableReader> bucket = buckets.get(key); + logger.trace("Key {}, now {}", key, now); + if (bucket.size() >= minThreshold && key >= now) + { + // If we're in the newest bucket, we'll use STCS to prioritize sstables + List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(bucket); + List<List<SSTableReader>> stcsBuckets = SizeTieredCompactionStrategy.getBuckets(pairs, stcsOptions.bucketHigh, stcsOptions.bucketLow, stcsOptions.minSSTableSize); + logger.debug("Using STCS compaction for first window of bucket: data files {} , options {}", pairs, stcsOptions); + List<SSTableReader> stcsInterestingBucket = SizeTieredCompactionStrategy.mostInterestingBucket(stcsBuckets, minThreshold, maxThreshold); + + // If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets + if (!stcsInterestingBucket.isEmpty()) + return stcsInterestingBucket; + } + else if (bucket.size() >= 2 && key < now) + { + logger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here: {}", bucket.size(), bucket); + return trimToThreshold(bucket, maxThreshold); + } + else + { + logger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now); + } + } + return Collections.<SSTableReader>emptyList(); + } + + /** + * @param bucket set of sstables + * @param maxThreshold maximum number of sstables in a single compaction task. + * @return A bucket trimmed to the maxThreshold newest sstables. + */ + @VisibleForTesting + static List<SSTableReader> trimToThreshold(Set<SSTableReader> bucket, int maxThreshold) + { + List<SSTableReader> ssTableReaders = new ArrayList<>(bucket); + + // Trim the largest sstables off the end to meet the maxThreshold + Collections.sort(ssTableReaders, new SSTableReader.SizeComparator()); + + return ImmutableList.copyOf(Iterables.limit(ssTableReaders, maxThreshold)); + } + + @Override + public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput) + { + Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables); + if (Iterables.isEmpty(filteredSSTables)) + return null; + LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION); + if (txn == null) + return null; + return Collections.singleton(new CompactionTask(cfs, txn, gcBefore)); + } + + @Override + public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore) + { + assert !sstables.isEmpty(); // checked for by CM.submitUserDefined + + LifecycleTransaction modifier = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION); + if (modifier == null) + { + logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables); + return null; + } + + return new CompactionTask(cfs, modifier, gcBefore).setUserDefined(true); + } + + public int getEstimatedRemainingTasks() + { + return this.estimatedRemainingTasks; + } + + public long getMaxSSTableBytes() + { + return Long.MAX_VALUE; + } + + + public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException + { + Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options); + uncheckedOptions = TimeWindowCompactionStrategyOptions.validateOptions(options, uncheckedOptions); + + uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString()); + uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString()); + + return uncheckedOptions; + } + + public String toString() + { + return String.format("TimeWindowCompactionStrategy[%s/%s]", + cfs.getMinimumCompactionThreshold(), + cfs.getMaximumCompactionThreshold()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java new file mode 100644 index 0000000..bcbdab6 --- /dev/null +++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import com.google.common.collect.ImmutableList; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.exceptions.ConfigurationException; + +public final class TimeWindowCompactionStrategyOptions +{ + private static final Logger logger = LoggerFactory.getLogger(TimeWindowCompactionStrategyOptions.class); + + protected static final TimeUnit DEFAULT_TIMESTAMP_RESOLUTION = TimeUnit.MICROSECONDS; + protected static final TimeUnit DEFAULT_COMPACTION_WINDOW_UNIT = TimeUnit.DAYS; + protected static final int DEFAULT_COMPACTION_WINDOW_SIZE = 1; + protected static final int DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS = 60 * 10; + + protected static final String TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution"; + protected static final String COMPACTION_WINDOW_UNIT_KEY = "compaction_window_unit"; + protected static final String COMPACTION_WINDOW_SIZE_KEY = "compaction_window_size"; + protected static final String EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY = "expired_sstable_check_frequency_seconds"; + + protected final int sstableWindowSize; + protected final TimeUnit sstableWindowUnit; + protected final TimeUnit timestampResolution; + protected final long expiredSSTableCheckFrequency; + + SizeTieredCompactionStrategyOptions stcsOptions; + + protected final static ImmutableList<TimeUnit> validTimestampTimeUnits = ImmutableList.of(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, TimeUnit.MICROSECONDS, TimeUnit.NANOSECONDS); + protected final static ImmutableList<TimeUnit> validWindowTimeUnits = ImmutableList.of(TimeUnit.MINUTES, TimeUnit.HOURS, TimeUnit.DAYS); + + public TimeWindowCompactionStrategyOptions(Map<String, String> options) + { + String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY); + timestampResolution = optionValue == null ? DEFAULT_TIMESTAMP_RESOLUTION : TimeUnit.valueOf(optionValue); + if (timestampResolution != DEFAULT_TIMESTAMP_RESOLUTION) + logger.warn("Using a non-default timestamp_resolution {} - are you really doing inserts with USING TIMESTAMP <non_microsecond_timestamp> (or driver equivalent)?", timestampResolution.toString()); + + optionValue = options.get(COMPACTION_WINDOW_UNIT_KEY); + sstableWindowUnit = optionValue == null ? DEFAULT_COMPACTION_WINDOW_UNIT : TimeUnit.valueOf(optionValue); + + optionValue = options.get(COMPACTION_WINDOW_SIZE_KEY); + sstableWindowSize = optionValue == null ? DEFAULT_COMPACTION_WINDOW_SIZE : Integer.parseInt(optionValue); + + optionValue = options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY); + expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(optionValue == null ? DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS : Long.parseLong(optionValue), TimeUnit.SECONDS); + + stcsOptions = new SizeTieredCompactionStrategyOptions(options); + } + + public TimeWindowCompactionStrategyOptions() + { + sstableWindowUnit = DEFAULT_COMPACTION_WINDOW_UNIT; + timestampResolution = DEFAULT_TIMESTAMP_RESOLUTION; + sstableWindowSize = DEFAULT_COMPACTION_WINDOW_SIZE; + expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS, TimeUnit.SECONDS); + stcsOptions = new SizeTieredCompactionStrategyOptions(); + } + + public static Map<String, String> validateOptions(Map<String, String> options, Map<String, String> uncheckedOptions) throws ConfigurationException + { + String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY); + try + { + if (optionValue != null) + if (!validTimestampTimeUnits.contains(TimeUnit.valueOf(optionValue))) + throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, TIMESTAMP_RESOLUTION_KEY)); + } + catch (IllegalArgumentException e) + { + throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, TIMESTAMP_RESOLUTION_KEY)); + } + + + optionValue = options.get(COMPACTION_WINDOW_UNIT_KEY); + try + { + if (optionValue != null) + if (!validWindowTimeUnits.contains(TimeUnit.valueOf(optionValue))) + throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, COMPACTION_WINDOW_UNIT_KEY)); + + } + catch (IllegalArgumentException e) + { + throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, COMPACTION_WINDOW_UNIT_KEY), e); + } + + optionValue = options.get(COMPACTION_WINDOW_SIZE_KEY); + try + { + int sstableWindowSize = optionValue == null ? DEFAULT_COMPACTION_WINDOW_SIZE : Integer.parseInt(optionValue); + if (sstableWindowSize < 1) + { + throw new ConfigurationException(String.format("%s must be greater than 1", DEFAULT_COMPACTION_WINDOW_SIZE, sstableWindowSize)); + } + } + catch (NumberFormatException e) + { + throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, DEFAULT_COMPACTION_WINDOW_SIZE), e); + } + + optionValue = options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY); + try + { + long expiredCheckFrequency = optionValue == null ? DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS : Long.parseLong(optionValue); + if (expiredCheckFrequency < 0) + { + throw new ConfigurationException(String.format("%s must not be negative, but was %d", EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, expiredCheckFrequency)); + } + } + catch (NumberFormatException e) + { + throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY), e); + } + + uncheckedOptions.remove(COMPACTION_WINDOW_SIZE_KEY); + uncheckedOptions.remove(COMPACTION_WINDOW_UNIT_KEY); + uncheckedOptions.remove(TIMESTAMP_RESOLUTION_KEY); + uncheckedOptions.remove(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY); + + uncheckedOptions = SizeTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions); + + return uncheckedOptions; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java index 5d42aae..afbfee1 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java @@ -72,6 +72,19 @@ public class CompactionsCQLTest extends CQLTester } @Test + public void testTriggerMinorCompactionTWCS() throws Throwable + { + createTable("CREATE TABLE %s (id text PRIMARY KEY) WITH compaction = {'class':'TimeWindowCompactionStrategy', 'min_threshold':2};"); + assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled()); + execute("insert into %s (id) values ('1')"); + flush(); + execute("insert into %s (id) values ('1')"); + flush(); + waitForMinor(KEYSPACE, currentTable(), SLEEP_TIME, true); + } + + + @Test public void testTriggerNoMinorCompactionSTCSDisabled() throws Throwable { createTable("CREATE TABLE %s (id text PRIMARY KEY) WITH compaction = {'class':'SizeTieredCompactionStrategy', 'min_threshold':2, 'enabled':false};"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java new file mode 100644 index 0000000..3238170 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + + +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.Pair; + +import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.getWindowBoundsInMillis; +import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.newestBucket; +import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.validateOptions; + +public class TimeWindowCompactionStrategyTest extends SchemaLoader +{ + public static final String KEYSPACE1 = "Keyspace1"; + private static final String CF_STANDARD1 = "Standard1"; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1)); + } + + @Test + public void testOptionsValidation() throws ConfigurationException + { + Map<String, String> options = new HashMap<>(); + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "30"); + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "MINUTES"); + Map<String, String> unvalidated = validateOptions(options); + assertTrue(unvalidated.isEmpty()); + + try + { + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "0"); + validateOptions(options); + fail(String.format("%s == 0 should be rejected", TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY)); + } + catch (ConfigurationException e) {} + + try + { + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "-1337"); + validateOptions(options); + fail(String.format("Negative %s should be rejected", TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY)); + } + catch (ConfigurationException e) + { + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "1"); + } + + try + { + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "MONTHS"); + validateOptions(options); + fail(String.format("Invalid time units should be rejected", TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY)); + } + catch (ConfigurationException e) + { + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "MINUTES"); + } + + options.put("bad_option", "1.0"); + unvalidated = validateOptions(options); + assertTrue(unvalidated.containsKey("bad_option")); + } + + + @Test + public void testTimeWindows() + { + Long tstamp1 = 1451001601000L; // 2015-12-25 @ 00:00:01, in milliseconds + Long tstamp2 = 1451088001000L; // 2015-12-26 @ 00:00:01, in milliseconds + Long lowHour = 1451001600000L; // 2015-12-25 @ 00:00:00, in milliseconds + + // A 1 hour window should round down to the beginning of the hour + assertTrue(getWindowBoundsInMillis(TimeUnit.HOURS, 1, tstamp1).left.compareTo(lowHour) == 0); + + // A 1 minute window should round down to the beginning of the hour + assertTrue(getWindowBoundsInMillis(TimeUnit.MINUTES, 1, tstamp1).left.compareTo(lowHour) == 0); + + // A 1 day window should round down to the beginning of the hour + assertTrue(getWindowBoundsInMillis(TimeUnit.DAYS, 1, tstamp1).left.compareTo(lowHour) == 0 ); + + // The 2 day window of 2015-12-25 + 2015-12-26 should round down to the beginning of 2015-12-25 + assertTrue(getWindowBoundsInMillis(TimeUnit.DAYS, 2, tstamp2).left.compareTo(lowHour) == 0); + + + return; + } + + @Test + public void testPrepBucket() + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + ByteBuffer value = ByteBuffer.wrap(new byte[100]); + Long tstamp = System.currentTimeMillis(); + Long tstamp2 = tstamp - (2L * 3600L * 1000L); + + // create 5 sstables + for (int r = 0; r < 3; r++) + { + DecoratedKey key = Util.dk(String.valueOf(r)); + new RowUpdateBuilder(cfs.metadata, r, key.getKey()) + .clustering("column") + .add("val", value).build().applyUnsafe(); + + cfs.forceBlockingFlush(); + } + // Decrement the timestamp to simulate a timestamp in the past hour + for (int r = 3; r < 5; r++) + { + // And add progressively more cells into each sstable + DecoratedKey key = Util.dk(String.valueOf(r)); + new RowUpdateBuilder(cfs.metadata, r, key.getKey()) + .clustering("column") + .add("val", value).build().applyUnsafe(); + cfs.forceBlockingFlush(); + } + + cfs.forceBlockingFlush(); + + HashMultimap<Long, SSTableReader> buckets = HashMultimap.create(); + List<SSTableReader> sstrs = new ArrayList<>(cfs.getLiveSSTables()); + + // We'll put 3 sstables into the newest bucket + for (int i = 0 ; i < 3; i++) + { + Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 1, tstamp ); + buckets.put(bounds.left, sstrs.get(i)); + } + List<SSTableReader> newBucket = newestBucket(buckets, 4, 32, TimeUnit.HOURS, 1, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left ); + assertTrue("incoming bucket should not be accepted when it has below the min threshold SSTables", newBucket.isEmpty()); + + newBucket = newestBucket(buckets, 2, 32, TimeUnit.HOURS, 1, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left); + assertTrue("incoming bucket should be accepted when it is larger than the min threshold SSTables", !newBucket.isEmpty()); + + // And 2 into the second bucket (1 hour back) + for (int i = 3 ; i < 5; i++) + { + Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 1, tstamp2 ); + buckets.put(bounds.left, sstrs.get(i)); + } + + assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(0).getMinTimestamp(), sstrs.get(0).getMaxTimestamp()); + assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(1).getMinTimestamp(), sstrs.get(1).getMaxTimestamp()); + assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(2).getMinTimestamp(), sstrs.get(2).getMaxTimestamp()); + + // Test trim + int numSSTables = 40; + for (int r = 5; r < numSSTables; r++) + { + DecoratedKey key = Util.dk(String.valueOf(r)); + for(int i = 0 ; i < r ; i++) + { + new RowUpdateBuilder(cfs.metadata, tstamp + r, key.getKey()) + .clustering("column") + .add("val", value).build().applyUnsafe(); + } + cfs.forceBlockingFlush(); + } + + // Reset the buckets, overfill it now + sstrs = new ArrayList<>(cfs.getLiveSSTables()); + for (int i = 0 ; i < 40; i++) + { + Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 1, sstrs.get(i).getMaxTimestamp()); + buckets.put(bounds.left, sstrs.get(i)); + } + + newBucket = newestBucket(buckets, 4, 32, TimeUnit.DAYS, 1, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left); + assertEquals("new bucket should be trimmed to max threshold of 32", newBucket.size(), 32); + } + + + @Test + public void testDropExpiredSSTables() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + ByteBuffer value = ByteBuffer.wrap(new byte[100]); + + // create 2 sstables + DecoratedKey key = Util.dk(String.valueOf("expired")); + new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), 1, key.getKey()) + .clustering("column") + .add("val", value).build().applyUnsafe(); + + cfs.forceBlockingFlush(); + SSTableReader expiredSSTable = cfs.getLiveSSTables().iterator().next(); + Thread.sleep(10); + + key = Util.dk(String.valueOf("nonexpired")); + new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), key.getKey()) + .clustering("column") + .add("val", value).build().applyUnsafe(); + + cfs.forceBlockingFlush(); + assertEquals(cfs.getLiveSSTables().size(), 2); + + Map<String, String> options = new HashMap<>(); + + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "30"); + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "SECONDS"); + options.put(TimeWindowCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "MILLISECONDS"); + options.put(TimeWindowCompactionStrategyOptions.EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, "0"); + TimeWindowCompactionStrategy twcs = new TimeWindowCompactionStrategy(cfs, options); + for (SSTableReader sstable : cfs.getLiveSSTables()) + twcs.addSSTable(sstable); + twcs.startup(); + assertNull(twcs.getNextBackgroundTask((int) (System.currentTimeMillis() / 1000))); + Thread.sleep(2000); + AbstractCompactionTask t = twcs.getNextBackgroundTask((int) (System.currentTimeMillis()/1000)); + assertNotNull(t); + assertEquals(1, Iterables.size(t.transaction.originals())); + SSTableReader sstable = t.transaction.originals().iterator().next(); + assertEquals(sstable, expiredSSTable); + t.transaction.abort(); + } + +}