Add TimeWindowCompactionStrategy

Patch by Jeff Jirsa; reviewed by marcuse for CASSANDRA-9666


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c867f00
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c867f00
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c867f00

Branch: refs/heads/trunk
Commit: 6c867f00309a61af12fa452020c45dc0f2748aa1
Parents: 040ac66
Author: Jeff Jirsa <j...@jeffjirsa.net>
Authored: Tue May 24 20:21:22 2016 -0700
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Tue Jun 7 07:38:22 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +
 NEWS.txt                                        |  13 +
 doc/cql3/CQL.textile                            |   5 +-
 pylib/cqlshlib/cql3handling.py                  |   7 +
 pylib/cqlshlib/cqlhandling.py                   |   3 +-
 pylib/cqlshlib/test/test_cqlsh_completion.py    |  11 +-
 .../DateTieredCompactionStrategy.java           |   4 +
 .../TimeWindowCompactionStrategy.java           | 380 +++++++++++++++++++
 .../TimeWindowCompactionStrategyOptions.java    | 148 ++++++++
 .../db/compaction/CompactionsCQLTest.java       |  13 +
 .../TimeWindowCompactionStrategyTest.java       | 274 +++++++++++++
 11 files changed, 859 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 201a36c..cdbaebb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+3.0.8
+ * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
+
+
 3.0.7
  * Fix legacy serialization of Thrift-generated non-compound range tombstones
    when communicating with 2.x nodes (CASSANDRA-11930)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index ac1ef17..dbaece1 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -24,6 +24,19 @@ Upgrading
      value of native_transport_max_frame_size_in_mb. SSTables will be 
considered corrupt if
      they contain values whose size exceeds this limit. See CASSANDRA-9530 for 
more details.
 
+Deprecation
+-----------
+   - DateTieredCompactionStrategy has been deprecated - new tables should use
+     TimeWindowCompactionStrategy. Note that migrating an existing DTCS-table 
to TWCS might
+     cause increased compaction load for a while after the migration so make 
sure you run
+     tests before migrating. Read CASSANDRA-9666 for background on this.
+
+New features
+------------
+   - TimeWindowCompactionStrategy has been added. This has proven to be a 
better approach
+     to time series compaction and new tables should use this instead of DTCS. 
See
+     CASSANDRA-9666 for details.
+
 3.0.6
 =====
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/doc/cql3/CQL.textile
----------------------------------------------------------------------
diff --git a/doc/cql3/CQL.textile b/doc/cql3/CQL.textile
index 059b195..2a37452 100644
--- a/doc/cql3/CQL.textile
+++ b/doc/cql3/CQL.textile
@@ -346,7 +346,7 @@ Table creation supports the following other @<property>@:
 
 h4(#compactionOptions). Compaction options
 
-The @compaction@ property must at least define the @'class'@ sub-option, that 
defines the compaction strategy class to use. The default supported class are 
@'SizeTieredCompactionStrategy'@, @'LeveledCompactionStrategy'@ and 
@'DateTieredCompactionStrategy'@. Custom strategy can be provided by specifying 
the full class name as a "string constant":#constants. The rest of the 
sub-options depends on the chosen class. The sub-options supported by the 
default classes are:
+The @compaction@ property must at least define the @'class'@ sub-option, that 
defines the compaction strategy class to use. The default supported class are 
@'SizeTieredCompactionStrategy'@, @'LeveledCompactionStrategy'@, 
@'DateTieredCompactionStrategy'@ and @'TimeWindowCompactionStrategy'@. Custom 
strategy can be provided by specifying the full class name as a "string 
constant":#constants. The rest of the sub-options depends on the chosen class. 
The sub-options supported by the default classes are:
 
 |_. option                         |_. supported compaction strategy |_. 
default    |_. description |
 | @enabled@                        | _all_                           | true    
     | A boolean denoting whether compaction should be enabled or not.|
@@ -362,6 +362,9 @@ The @compaction@ property must at least define the 
@'class'@ sub-option, that de
 | @timestamp_resolution@           | DateTieredCompactionStrategy    | 
MICROSECONDS | The timestamp resolution used when inserting data, could be 
MILLISECONDS, MICROSECONDS etc (should be understandable by Java TimeUnit) - 
don't change this unless you do mutations with USING TIMESTAMP 
<non_microsecond_timestamps> (or equivalent directly in the client)|
 | @base_time_seconds@              | DateTieredCompactionStrategy    | 60      
     | The base size of the time windows. |
 | @max_sstable_age_days@           | DateTieredCompactionStrategy    | 365     
     | SSTables only containing data that is older than this will never be 
compacted. |
+| @timestamp_resolution@           | TimeWindowCompactionStrategy    | 
MICROSECONDS | The timestamp resolution used when inserting data, could be 
MILLISECONDS, MICROSECONDS etc (should be understandable by Java TimeUnit) - 
don't change this unless you do mutations with USING TIMESTAMP 
<non_microsecond_timestamps> (or equivalent directly in the client)|
+| @compaction_window_unit@         | TimeWindowCompactionStrategy    | DAYS    
     | The Java TimeUnit used for the window size, set in conjunction with 
@compaction_window_size@. Must be one of DAYS, HOURS, MINUTES |
+| @compaction_window_size@         | TimeWindowCompactionStrategy    | 1       
     | The number of @compaction_window_unit@ units that make up a time window. 
|
 
 
 h4(#compressionOptions). Compression options

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index fd04f64..9008514 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -515,6 +515,13 @@ def cf_prop_val_mapkey_completer(ctxt, cass):
             opts.add('min_threshold')
             opts.add('max_window_size_seconds')
             opts.add('timestamp_resolution')
+        elif csc == 'TimeWindowCompactionStrategy':
+            opts.add('compaction_window_unit')
+            opts.add('compaction_window_size')
+            opts.add('min_threshold')
+            opts.add('max_threshold')
+            opts.add('timestamp_resolution')
+
         return map(escape_value, opts)
     return ()
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/pylib/cqlshlib/cqlhandling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cqlhandling.py b/pylib/cqlshlib/cqlhandling.py
index a8a0ba8..51d9726 100644
--- a/pylib/cqlshlib/cqlhandling.py
+++ b/pylib/cqlshlib/cqlhandling.py
@@ -35,7 +35,8 @@ class CqlParsingRuleSet(pylexotron.ParsingRuleSet):
     available_compaction_classes = (
         'LeveledCompactionStrategy',
         'SizeTieredCompactionStrategy',
-        'DateTieredCompactionStrategy'
+        'DateTieredCompactionStrategy',
+        'TimeWindowCompactionStrategy'
     )
 
     replication_strategies = (

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/pylib/cqlshlib/test/test_cqlsh_completion.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/test/test_cqlsh_completion.py 
b/pylib/cqlshlib/test/test_cqlsh_completion.py
index 0f0cc4d..e736ea7 100644
--- a/pylib/cqlshlib/test/test_cqlsh_completion.py
+++ b/pylib/cqlshlib/test/test_cqlsh_completion.py
@@ -617,7 +617,8 @@ class TestCqlshCompletion(CqlshCompletionCase):
                             + "{'class': '",
                             choices=['SizeTieredCompactionStrategy',
                                      'LeveledCompactionStrategy',
-                                     'DateTieredCompactionStrategy'])
+                                     'DateTieredCompactionStrategy',
+                                     'TimeWindowCompactionStrategy'])
         self.trycompletions(prefix + " new_table (col_a int PRIMARY KEY) WITH 
compaction = "
                             + "{'class': 'S",
                             immediate="izeTieredCompactionStrategy'")
@@ -660,6 +661,14 @@ class TestCqlshCompletion(CqlshCompletionCase):
                                      'tombstone_compaction_interval', 
'tombstone_threshold',
                                      'enabled', 
'unchecked_tombstone_compaction',
                                      'max_window_size_seconds', 
'only_purge_repaired_tombstones'])
+        self.trycompletions(prefix + " new_table (col_a int PRIMARY KEY) WITH 
compaction = "
+                            + "{'class': 'TimeWindowCompactionStrategy', '",
+                            choices=['compaction_window_unit', 
'compaction_window_size',
+                                     'timestamp_resolution', 'min_threshold', 
'class', 'max_threshold',
+                                     'tombstone_compaction_interval', 
'tombstone_threshold',
+                                     'enabled', 
'unchecked_tombstone_compaction',
+                                     'only_purge_repaired_tombstones'])
+
 
     def test_complete_in_create_columnfamily(self):
         self.trycompletions('CREATE C', choices=['COLUMNFAMILY', 'CUSTOM'])

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 1addd0d..8571906 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -35,6 +35,10 @@ import org.apache.cassandra.utils.Pair;
 
 import static com.google.common.collect.Iterables.filter;
 
+/**
+ * @deprecated in favour of {@link TimeWindowCompactionStrategy}
+ */
+@Deprecated
 public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
 {
     private static final Logger logger = 
LoggerFactory.getLogger(DateTieredCompactionStrategy.class);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
new file mode 100644
index 0000000..d1630c5
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.utils.Pair;
+
+import static com.google.common.collect.Iterables.filter;
+
+public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(TimeWindowCompactionStrategy.class);
+
+    private final TimeWindowCompactionStrategyOptions options;
+    protected volatile int estimatedRemainingTasks;
+    private final Set<SSTableReader> sstables = new HashSet<>();
+    private long lastExpiredCheck;
+    private long highestWindowSeen;
+
+    public TimeWindowCompactionStrategy(ColumnFamilyStore cfs, Map<String, 
String> options)
+    {
+        super(cfs, options);
+        this.estimatedRemainingTasks = 0;
+        this.options = new TimeWindowCompactionStrategyOptions(options);
+        if 
(!options.containsKey(AbstractCompactionStrategy.TOMBSTONE_COMPACTION_INTERVAL_OPTION)
 && !options.containsKey(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION))
+        {
+            disableTombstoneCompactions = true;
+            logger.debug("Disabling tombstone compactions for TWCS");
+        }
+        else
+            logger.debug("Enabling tombstone compactions for TWCS");
+
+    }
+
+    @Override
+    public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+    {
+        while (true)
+        {
+            List<SSTableReader> latestBucket = 
getNextBackgroundSSTables(gcBefore);
+
+            if (latestBucket.isEmpty())
+                return null;
+
+            LifecycleTransaction modifier = 
cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
+            if (modifier != null)
+                return new CompactionTask(cfs, modifier, gcBefore);
+        }
+    }
+
+    /**
+     *
+     * @param gcBefore
+     * @return
+     */
+    private synchronized List<SSTableReader> getNextBackgroundSSTables(final 
int gcBefore)
+    {
+        if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
+            return Collections.emptyList();
+
+        Set<SSTableReader> uncompacting = 
ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
+
+        // Find fully expired SSTables. Those will be included no matter what.
+        Set<SSTableReader> expired = Collections.emptySet();
+
+        if (System.currentTimeMillis() - lastExpiredCheck > 
options.expiredSSTableCheckFrequency)
+        {
+            logger.debug("TWCS expired check sufficiently far in the past, 
checking for fully expired SSTables");
+            expired = CompactionController.getFullyExpiredSSTables(cfs, 
uncompacting, cfs.getOverlappingSSTables(SSTableSet.CANONICAL, uncompacting), 
gcBefore);
+            lastExpiredCheck = System.currentTimeMillis();
+        }
+        else
+        {
+            logger.debug("TWCS skipping check for fully expired SSTables");
+        }
+
+        Set<SSTableReader> candidates = 
Sets.newHashSet(filterSuspectSSTables(uncompacting));
+
+        List<SSTableReader> compactionCandidates = new 
ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired), 
gcBefore));
+        if (!expired.isEmpty())
+        {
+            logger.debug("Including expired sstables: {}", expired);
+            compactionCandidates.addAll(expired);
+        }
+
+        return compactionCandidates;
+    }
+
+    private List<SSTableReader> 
getNextNonExpiredSSTables(Iterable<SSTableReader> nonExpiringSSTables, final 
int gcBefore)
+    {
+        List<SSTableReader> mostInteresting = 
getCompactionCandidates(nonExpiringSSTables);
+
+        if (mostInteresting != null)
+        {
+            return mostInteresting;
+        }
+
+        // if there is no sstable to compact in standard way, try compacting 
single sstable whose droppable tombstone
+        // ratio is greater than threshold.
+        List<SSTableReader> sstablesWithTombstones = new ArrayList<>();
+        for (SSTableReader sstable : nonExpiringSSTables)
+        {
+            if (worthDroppingTombstones(sstable, gcBefore))
+                sstablesWithTombstones.add(sstable);
+        }
+        if (sstablesWithTombstones.isEmpty())
+            return Collections.emptyList();
+
+        return 
Collections.singletonList(Collections.min(sstablesWithTombstones, new 
SSTableReader.SizeComparator()));
+    }
+
+    private List<SSTableReader> 
getCompactionCandidates(Iterable<SSTableReader> candidateSSTables)
+    {
+        Pair<HashMultimap<Long, SSTableReader>, Long> buckets = 
getBuckets(candidateSSTables, options.sstableWindowUnit, 
options.sstableWindowSize, options.timestampResolution);
+        // Update the highest window seen, if necessary
+        if(buckets.right > this.highestWindowSeen)
+            this.highestWindowSeen = buckets.right;
+
+        updateEstimatedCompactionsByTasks(buckets.left);
+        List<SSTableReader> mostInteresting = newestBucket(buckets.left,
+                                                           
cfs.getMinimumCompactionThreshold(),
+                                                           
cfs.getMaximumCompactionThreshold(),
+                                                           
options.sstableWindowUnit,
+                                                           
options.sstableWindowSize,
+                                                           options.stcsOptions,
+                                                           
this.highestWindowSeen);
+        if (!mostInteresting.isEmpty())
+            return mostInteresting;
+        return null;
+    }
+
+    @Override
+    public void addSSTable(SSTableReader sstable)
+    {
+        sstables.add(sstable);
+    }
+
+    @Override
+    public void removeSSTable(SSTableReader sstable)
+    {
+        sstables.remove(sstable);
+    }
+
+    /**
+     * Find the lowest and highest timestamps in a given timestamp/unit pair
+     * Returns milliseconds, caller should adjust accordingly
+     */
+    public static Pair<Long,Long> getWindowBoundsInMillis(TimeUnit 
windowTimeUnit, int windowTimeSize, long timestampInMillis)
+    {
+        long lowerTimestamp;
+        long upperTimestamp;
+        long timestampInSeconds = TimeUnit.SECONDS.convert(timestampInMillis, 
TimeUnit.MILLISECONDS);
+
+        switch(windowTimeUnit)
+        {
+            case MINUTES:
+                lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % 
(60 * windowTimeSize));
+                upperTimestamp = (lowerTimestamp + (60L * (windowTimeSize - 
1L))) + 59L;
+                break;
+            case HOURS:
+                lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % 
(3600 * windowTimeSize));
+                upperTimestamp = (lowerTimestamp + (3600L * (windowTimeSize - 
1L))) + 3599L;
+                break;
+            case DAYS:
+            default:
+                lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % 
(86400 * windowTimeSize));
+                upperTimestamp = (lowerTimestamp + (86400L * (windowTimeSize - 
1L))) + 86399L;
+                break;
+        }
+
+        return Pair.create(TimeUnit.MILLISECONDS.convert(lowerTimestamp, 
TimeUnit.SECONDS),
+                           TimeUnit.MILLISECONDS.convert(upperTimestamp, 
TimeUnit.SECONDS));
+
+    }
+
+    /**
+     * Group files with similar max timestamp into buckets.
+     *
+     * @param files pairs consisting of a file and its min timestamp
+     * @param sstableWindowUnit
+     * @param sstableWindowSize
+     * @param timestampResolution
+     * @return A pair, where the left element is the bucket representation 
(map of timestamp to sstablereader), and the right is the highest timestamp seen
+     */
+    @VisibleForTesting
+    static Pair<HashMultimap<Long, SSTableReader>, Long> 
getBuckets(Iterable<SSTableReader> files, TimeUnit sstableWindowUnit, int 
sstableWindowSize, TimeUnit timestampResolution)
+    {
+        HashMultimap<Long, SSTableReader> buckets = HashMultimap.create();
+
+        long maxTimestamp = 0;
+        // Create hash map to represent buckets
+        // For each sstable, add sstable to the time bucket
+        // Where the bucket is the file's max timestamp rounded to the nearest 
window bucket
+        for (SSTableReader f : files)
+        {
+            assert 
TimeWindowCompactionStrategyOptions.validTimestampTimeUnits.contains(timestampResolution);
+            long tStamp = TimeUnit.MILLISECONDS.convert(f.getMaxTimestamp(), 
timestampResolution);
+            Pair<Long,Long> bounds = 
getWindowBoundsInMillis(sstableWindowUnit, sstableWindowSize, tStamp);
+            buckets.put(bounds.left, f);
+            if (bounds.left > maxTimestamp)
+                maxTimestamp = bounds.left;
+        }
+
+        logger.trace("buckets {}, max timestamp", buckets, maxTimestamp);
+        return Pair.create(buckets, maxTimestamp);
+    }
+
+    private void updateEstimatedCompactionsByTasks(HashMultimap<Long, 
SSTableReader> tasks)
+    {
+        int n = 0;
+        long now = this.highestWindowSeen;
+
+        for(Long key : tasks.keySet())
+        {
+            // For current window, make sure it's compactable
+            if (key.compareTo(now) >= 0 && tasks.get(key).size() >= 
cfs.getMinimumCompactionThreshold())
+                n++;
+            else if (key.compareTo(now) < 0 && tasks.get(key).size() >= 2)
+                n++;
+        }
+        this.estimatedRemainingTasks = n;
+    }
+
+
+    /**
+     * @param buckets list of buckets, sorted from newest to oldest, from 
which to return the newest bucket within thresholds.
+     * @param minThreshold minimum number of sstables in a bucket to qualify.
+     * @param maxThreshold maximum number of sstables to compact at once (the 
returned bucket will be trimmed down to this).
+     * @return a bucket (list) of sstables to compact.
+     */
+    @VisibleForTesting
+    static List<SSTableReader> newestBucket(HashMultimap<Long, SSTableReader> 
buckets, int minThreshold, int maxThreshold, TimeUnit sstableWindowUnit, int 
sstableWindowSize, SizeTieredCompactionStrategyOptions stcsOptions, long now)
+    {
+        // If the current bucket has at least minThreshold SSTables, choose 
that one.
+        // For any other bucket, at least 2 SSTables is enough.
+        // In any case, limit to maxThreshold SSTables.
+
+        TreeSet<Long> allKeys = new TreeSet<>(buckets.keySet());
+
+        Iterator<Long> it = allKeys.descendingIterator();
+        while(it.hasNext())
+        {
+            Long key = it.next();
+            Set<SSTableReader> bucket = buckets.get(key);
+            logger.trace("Key {}, now {}", key, now);
+            if (bucket.size() >= minThreshold && key >= now)
+            {
+                // If we're in the newest bucket, we'll use STCS to prioritize 
sstables
+                List<Pair<SSTableReader,Long>> pairs = 
SizeTieredCompactionStrategy.createSSTableAndLengthPairs(bucket);
+                List<List<SSTableReader>> stcsBuckets = 
SizeTieredCompactionStrategy.getBuckets(pairs, stcsOptions.bucketHigh, 
stcsOptions.bucketLow, stcsOptions.minSSTableSize);
+                logger.debug("Using STCS compaction for first window of 
bucket: data files {} , options {}", pairs, stcsOptions);
+                List<SSTableReader> stcsInterestingBucket = 
SizeTieredCompactionStrategy.mostInterestingBucket(stcsBuckets, minThreshold, 
maxThreshold);
+
+                // If the tables in the current bucket aren't eligible in the 
STCS strategy, we'll skip it and look for other buckets
+                if (!stcsInterestingBucket.isEmpty())
+                    return stcsInterestingBucket;
+            }
+            else if (bucket.size() >= 2 && key < now)
+            {
+                logger.debug("bucket size {} >= 2 and not in current bucket, 
compacting what's here: {}", bucket.size(), bucket);
+                return trimToThreshold(bucket, maxThreshold);
+            }
+            else
+            {
+                logger.debug("No compaction necessary for bucket size {} , key 
{}, now {}", bucket.size(), key, now);
+            }
+        }
+        return Collections.<SSTableReader>emptyList();
+    }
+
+    /**
+     * @param bucket set of sstables
+     * @param maxThreshold maximum number of sstables in a single compaction 
task.
+     * @return A bucket trimmed to the maxThreshold newest sstables.
+     */
+    @VisibleForTesting
+    static List<SSTableReader> trimToThreshold(Set<SSTableReader> bucket, int 
maxThreshold)
+    {
+        List<SSTableReader> ssTableReaders = new ArrayList<>(bucket);
+
+        // Trim the largest sstables off the end to meet the maxThreshold
+        Collections.sort(ssTableReaders, new SSTableReader.SizeComparator());
+
+        return ImmutableList.copyOf(Iterables.limit(ssTableReaders, 
maxThreshold));
+    }
+
+    @Override
+    public synchronized Collection<AbstractCompactionTask> getMaximalTask(int 
gcBefore, boolean splitOutput)
+    {
+        Iterable<SSTableReader> filteredSSTables = 
filterSuspectSSTables(sstables);
+        if (Iterables.isEmpty(filteredSSTables))
+            return null;
+        LifecycleTransaction txn = 
cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
+        if (txn == null)
+            return null;
+        return Collections.singleton(new CompactionTask(cfs, txn, gcBefore));
+    }
+
+    @Override
+    public synchronized AbstractCompactionTask 
getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+    {
+        assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
+
+        LifecycleTransaction modifier = cfs.getTracker().tryModify(sstables, 
OperationType.COMPACTION);
+        if (modifier == null)
+        {
+            logger.debug("Unable to mark {} for compaction; probably a 
background compaction got to it first.  You can disable background compactions 
temporarily if this is a problem", sstables);
+            return null;
+        }
+
+        return new CompactionTask(cfs, modifier, 
gcBefore).setUserDefined(true);
+    }
+
+    public int getEstimatedRemainingTasks()
+    {
+        return this.estimatedRemainingTasks;
+    }
+
+    public long getMaxSSTableBytes()
+    {
+        return Long.MAX_VALUE;
+    }
+
+
+    public static Map<String, String> validateOptions(Map<String, String> 
options) throws ConfigurationException
+    {
+        Map<String, String> uncheckedOptions = 
AbstractCompactionStrategy.validateOptions(options);
+        uncheckedOptions = 
TimeWindowCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
+
+        
uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString());
+        
uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString());
+
+        return uncheckedOptions;
+    }
+
+    public String toString()
+    {
+        return String.format("TimeWindowCompactionStrategy[%s/%s]",
+                cfs.getMinimumCompactionThreshold(),
+                cfs.getMaximumCompactionThreshold());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
 
b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
new file mode 100644
index 0000000..bcbdab6
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+public final class TimeWindowCompactionStrategyOptions
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(TimeWindowCompactionStrategyOptions.class);
+
+    protected static final TimeUnit DEFAULT_TIMESTAMP_RESOLUTION = 
TimeUnit.MICROSECONDS;
+    protected static final TimeUnit DEFAULT_COMPACTION_WINDOW_UNIT = 
TimeUnit.DAYS;
+    protected static final int DEFAULT_COMPACTION_WINDOW_SIZE = 1;
+    protected static final int DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS 
= 60 * 10;
+
+    protected static final String TIMESTAMP_RESOLUTION_KEY = 
"timestamp_resolution";
+    protected static final String COMPACTION_WINDOW_UNIT_KEY = 
"compaction_window_unit";
+    protected static final String COMPACTION_WINDOW_SIZE_KEY = 
"compaction_window_size";
+    protected static final String EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY 
= "expired_sstable_check_frequency_seconds";
+
+    protected final int sstableWindowSize;
+    protected final TimeUnit sstableWindowUnit;
+    protected final TimeUnit timestampResolution;
+    protected final long expiredSSTableCheckFrequency;
+
+    SizeTieredCompactionStrategyOptions stcsOptions;
+
+    protected final static ImmutableList<TimeUnit> validTimestampTimeUnits = 
ImmutableList.of(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, 
TimeUnit.MICROSECONDS, TimeUnit.NANOSECONDS);
+    protected final static ImmutableList<TimeUnit> validWindowTimeUnits = 
ImmutableList.of(TimeUnit.MINUTES, TimeUnit.HOURS, TimeUnit.DAYS);
+
+    public TimeWindowCompactionStrategyOptions(Map<String, String> options)
+    {
+        String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY);
+        timestampResolution = optionValue == null ? 
DEFAULT_TIMESTAMP_RESOLUTION : TimeUnit.valueOf(optionValue);
+        if (timestampResolution != DEFAULT_TIMESTAMP_RESOLUTION)
+            logger.warn("Using a non-default timestamp_resolution {} - are you 
really doing inserts with USING TIMESTAMP <non_microsecond_timestamp> (or 
driver equivalent)?", timestampResolution.toString());
+
+        optionValue = options.get(COMPACTION_WINDOW_UNIT_KEY);
+        sstableWindowUnit = optionValue == null ? 
DEFAULT_COMPACTION_WINDOW_UNIT : TimeUnit.valueOf(optionValue);
+
+        optionValue = options.get(COMPACTION_WINDOW_SIZE_KEY);
+        sstableWindowSize = optionValue == null ? 
DEFAULT_COMPACTION_WINDOW_SIZE : Integer.parseInt(optionValue);
+
+        optionValue = options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+        expiredSSTableCheckFrequency = 
TimeUnit.MILLISECONDS.convert(optionValue == null ? 
DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS : Long.parseLong(optionValue), 
TimeUnit.SECONDS);
+
+        stcsOptions = new SizeTieredCompactionStrategyOptions(options);
+    }
+
+    public TimeWindowCompactionStrategyOptions()
+    {
+        sstableWindowUnit = DEFAULT_COMPACTION_WINDOW_UNIT;
+        timestampResolution = DEFAULT_TIMESTAMP_RESOLUTION;
+        sstableWindowSize = DEFAULT_COMPACTION_WINDOW_SIZE;
+        expiredSSTableCheckFrequency = 
TimeUnit.MILLISECONDS.convert(DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS, 
TimeUnit.SECONDS);
+        stcsOptions = new SizeTieredCompactionStrategyOptions();
+    }
+
+    public static Map<String, String> validateOptions(Map<String, String> 
options, Map<String, String> uncheckedOptions) throws  ConfigurationException
+    {
+        String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY);
+        try
+        {
+            if (optionValue != null)
+                if 
(!validTimestampTimeUnits.contains(TimeUnit.valueOf(optionValue)))
+                    throw new ConfigurationException(String.format("%s is not 
valid for %s", optionValue, TIMESTAMP_RESOLUTION_KEY));
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw new ConfigurationException(String.format("%s is not valid 
for %s", optionValue, TIMESTAMP_RESOLUTION_KEY));
+        }
+
+
+        optionValue = options.get(COMPACTION_WINDOW_UNIT_KEY);
+        try
+        {
+            if (optionValue != null)
+                if 
(!validWindowTimeUnits.contains(TimeUnit.valueOf(optionValue)))
+                    throw new ConfigurationException(String.format("%s is not 
valid for %s", optionValue, COMPACTION_WINDOW_UNIT_KEY));
+
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw new ConfigurationException(String.format("%s is not valid 
for %s", optionValue, COMPACTION_WINDOW_UNIT_KEY), e);
+        }
+
+        optionValue = options.get(COMPACTION_WINDOW_SIZE_KEY);
+        try
+        {
+            int sstableWindowSize = optionValue == null ? 
DEFAULT_COMPACTION_WINDOW_SIZE : Integer.parseInt(optionValue);
+            if (sstableWindowSize < 1)
+            {
+                throw new ConfigurationException(String.format("%s must be 
greater than 1", DEFAULT_COMPACTION_WINDOW_SIZE, sstableWindowSize));
+            }
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException(String.format("%s is not a 
parsable int (base10) for %s", optionValue, DEFAULT_COMPACTION_WINDOW_SIZE), e);
+        }
+
+        optionValue = options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+        try
+        {
+            long expiredCheckFrequency = optionValue == null ? 
DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS : Long.parseLong(optionValue);
+            if (expiredCheckFrequency < 0)
+            {
+                throw new ConfigurationException(String.format("%s must not be 
negative, but was %d", EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, 
expiredCheckFrequency));
+             }
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException(String.format("%s is not a 
parsable int (base10) for %s", optionValue, 
EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY), e);
+        }
+
+        uncheckedOptions.remove(COMPACTION_WINDOW_SIZE_KEY);
+        uncheckedOptions.remove(COMPACTION_WINDOW_UNIT_KEY);
+        uncheckedOptions.remove(TIMESTAMP_RESOLUTION_KEY);
+        uncheckedOptions.remove(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+
+        uncheckedOptions = 
SizeTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
+
+        return uncheckedOptions;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 5d42aae..afbfee1 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -72,6 +72,19 @@ public class CompactionsCQLTest extends CQLTester
     }
 
     @Test
+    public void testTriggerMinorCompactionTWCS() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id text PRIMARY KEY) WITH compaction = 
{'class':'TimeWindowCompactionStrategy', 'min_threshold':2};");
+        
assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+        execute("insert into %s (id) values ('1')");
+        flush();
+        execute("insert into %s (id) values ('1')");
+        flush();
+        waitForMinor(KEYSPACE, currentTable(), SLEEP_TIME, true);
+    }
+
+
+    @Test
     public void testTriggerNoMinorCompactionSTCSDisabled() throws Throwable
     {
         createTable("CREATE TABLE %s (id text PRIMARY KEY)  WITH compaction = 
{'class':'SizeTieredCompactionStrategy', 'min_threshold':2, 'enabled':false};");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
new file mode 100644
index 0000000..3238170
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.Pair;
+
+import static 
org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.getWindowBoundsInMillis;
+import static 
org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.newestBucket;
+import static 
org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.validateOptions;
+
+public class TimeWindowCompactionStrategyTest extends SchemaLoader
+{
+    public static final String KEYSPACE1 = "Keyspace1";
+    private static final String CF_STANDARD1 = "Standard1";
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, 
CF_STANDARD1));
+    }
+
+    @Test
+    public void testOptionsValidation() throws ConfigurationException
+    {
+        Map<String, String> options = new HashMap<>();
+        
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, 
"30");
+        
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, 
"MINUTES");
+        Map<String, String> unvalidated = validateOptions(options);
+        assertTrue(unvalidated.isEmpty());
+
+        try
+        {
+            
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, 
"0");
+            validateOptions(options);
+            fail(String.format("%s == 0 should be rejected", 
TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY));
+        }
+        catch (ConfigurationException e) {}
+
+        try
+        {
+            
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, 
"-1337");
+            validateOptions(options);
+            fail(String.format("Negative %s should be rejected", 
TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY));
+        }
+        catch (ConfigurationException e)
+        {
+            
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, 
"1");
+        }
+
+        try
+        {
+            
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, 
"MONTHS");
+            validateOptions(options);
+            fail(String.format("Invalid time units should be rejected", 
TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY));
+        }
+        catch (ConfigurationException e)
+        {
+            
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, 
"MINUTES");
+        }
+
+        options.put("bad_option", "1.0");
+        unvalidated = validateOptions(options);
+        assertTrue(unvalidated.containsKey("bad_option"));
+    }
+
+
+    @Test
+    public void testTimeWindows()
+    {
+        Long tstamp1 = 1451001601000L; // 2015-12-25 @ 00:00:01, in 
milliseconds
+        Long tstamp2 = 1451088001000L; // 2015-12-26 @ 00:00:01, in 
milliseconds
+        Long lowHour = 1451001600000L; // 2015-12-25 @ 00:00:00, in 
milliseconds
+
+        // A 1 hour window should round down to the beginning of the hour
+        assertTrue(getWindowBoundsInMillis(TimeUnit.HOURS, 1, 
tstamp1).left.compareTo(lowHour) == 0);
+
+        // A 1 minute window should round down to the beginning of the hour
+        assertTrue(getWindowBoundsInMillis(TimeUnit.MINUTES, 1, 
tstamp1).left.compareTo(lowHour) == 0);
+
+        // A 1 day window should round down to the beginning of the hour
+        assertTrue(getWindowBoundsInMillis(TimeUnit.DAYS, 1, 
tstamp1).left.compareTo(lowHour) == 0 );
+
+        // The 2 day window of 2015-12-25 + 2015-12-26 should round down to 
the beginning of 2015-12-25
+        assertTrue(getWindowBoundsInMillis(TimeUnit.DAYS, 2, 
tstamp2).left.compareTo(lowHour) == 0);
+
+
+        return;
+    }
+
+    @Test
+    public void testPrepBucket()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+        Long tstamp = System.currentTimeMillis();
+        Long tstamp2 =  tstamp - (2L * 3600L * 1000L);
+
+        // create 5 sstables
+        for (int r = 0; r < 3; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            new RowUpdateBuilder(cfs.metadata, r, key.getKey())
+                .clustering("column")
+                .add("val", value).build().applyUnsafe();
+
+            cfs.forceBlockingFlush();
+        }
+        // Decrement the timestamp to simulate a timestamp in the past hour
+        for (int r = 3; r < 5; r++)
+        {
+            // And add progressively more cells into each sstable
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            new RowUpdateBuilder(cfs.metadata, r, key.getKey())
+                .clustering("column")
+                .add("val", value).build().applyUnsafe();
+            cfs.forceBlockingFlush();
+        }
+
+        cfs.forceBlockingFlush();
+
+        HashMultimap<Long, SSTableReader> buckets = HashMultimap.create();
+        List<SSTableReader> sstrs = new ArrayList<>(cfs.getLiveSSTables());
+
+        // We'll put 3 sstables into the newest bucket
+        for (int i = 0 ; i < 3; i++)
+        {
+            Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 
1, tstamp );
+            buckets.put(bounds.left, sstrs.get(i));
+        }
+        List<SSTableReader> newBucket = newestBucket(buckets, 4, 32, 
TimeUnit.HOURS, 1, new SizeTieredCompactionStrategyOptions(), 
getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left );
+        assertTrue("incoming bucket should not be accepted when it has below 
the min threshold SSTables", newBucket.isEmpty());
+
+        newBucket = newestBucket(buckets, 2, 32, TimeUnit.HOURS, 1, new 
SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 
1, System.currentTimeMillis()).left);
+        assertTrue("incoming bucket should be accepted when it is larger than 
the min threshold SSTables", !newBucket.isEmpty());
+
+        // And 2 into the second bucket (1 hour back)
+        for (int i = 3 ; i < 5; i++)
+        {
+            Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 
1, tstamp2 );
+            buckets.put(bounds.left, sstrs.get(i));
+        }
+
+        assertEquals("an sstable with a single value should have equal min/max 
timestamps", sstrs.get(0).getMinTimestamp(), sstrs.get(0).getMaxTimestamp());
+        assertEquals("an sstable with a single value should have equal min/max 
timestamps", sstrs.get(1).getMinTimestamp(), sstrs.get(1).getMaxTimestamp());
+        assertEquals("an sstable with a single value should have equal min/max 
timestamps", sstrs.get(2).getMinTimestamp(), sstrs.get(2).getMaxTimestamp());
+
+        // Test trim
+        int numSSTables = 40;
+        for (int r = 5; r < numSSTables; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            for(int i = 0 ; i < r ; i++)
+            {
+                new RowUpdateBuilder(cfs.metadata, tstamp + r, key.getKey())
+                    .clustering("column")
+                    .add("val", value).build().applyUnsafe();
+            }
+            cfs.forceBlockingFlush();
+        }
+
+        // Reset the buckets, overfill it now
+        sstrs = new ArrayList<>(cfs.getLiveSSTables());
+        for (int i = 0 ; i < 40; i++)
+        {
+            Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 
1, sstrs.get(i).getMaxTimestamp());
+            buckets.put(bounds.left, sstrs.get(i));
+        }
+
+        newBucket = newestBucket(buckets, 4, 32, TimeUnit.DAYS, 1, new 
SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 
1, System.currentTimeMillis()).left);
+        assertEquals("new bucket should be trimmed to max threshold of 32", 
newBucket.size(),  32);
+    }
+
+
+    @Test
+    public void testDropExpiredSSTables() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+        // create 2 sstables
+        DecoratedKey key = Util.dk(String.valueOf("expired"));
+        new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), 1, 
key.getKey())
+            .clustering("column")
+            .add("val", value).build().applyUnsafe();
+
+        cfs.forceBlockingFlush();
+        SSTableReader expiredSSTable = cfs.getLiveSSTables().iterator().next();
+        Thread.sleep(10);
+
+        key = Util.dk(String.valueOf("nonexpired"));
+        new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), 
key.getKey())
+            .clustering("column")
+            .add("val", value).build().applyUnsafe();
+
+        cfs.forceBlockingFlush();
+        assertEquals(cfs.getLiveSSTables().size(), 2);
+
+        Map<String, String> options = new HashMap<>();
+
+        
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, 
"30");
+        
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, 
"SECONDS");
+        
options.put(TimeWindowCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, 
"MILLISECONDS");
+        
options.put(TimeWindowCompactionStrategyOptions.EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY,
 "0");
+        TimeWindowCompactionStrategy twcs = new 
TimeWindowCompactionStrategy(cfs, options);
+        for (SSTableReader sstable : cfs.getLiveSSTables())
+            twcs.addSSTable(sstable);
+        twcs.startup();
+        assertNull(twcs.getNextBackgroundTask((int) 
(System.currentTimeMillis() / 1000)));
+        Thread.sleep(2000);
+        AbstractCompactionTask t = twcs.getNextBackgroundTask((int) 
(System.currentTimeMillis()/1000));
+        assertNotNull(t);
+        assertEquals(1, Iterables.size(t.transaction.originals()));
+        SSTableReader sstable = t.transaction.originals().iterator().next();
+        assertEquals(sstable, expiredSSTable);
+        t.transaction.abort();
+    }
+
+}

Reply via email to