Updated Branches:
  refs/heads/trunk 29804fa9a -> 6b2ea2647

Track tombstone expiration
patch by yukim; reviewed by jbellis for CASSANDRA-3442


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6b2ea264
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6b2ea264
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6b2ea264

Branch: refs/heads/trunk
Commit: 6b2ea264702f80518a1147ab0aac44e0cf40dfc3
Parents: 29804fa
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Wed Mar 7 15:21:11 2012 -0600
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Wed Mar 7 15:34:28 2012 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    5 +
 src/java/org/apache/cassandra/db/ColumnFamily.java |   22 ++-
 .../cassandra/db/ColumnFamilySerializer.java       |   13 +-
 src/java/org/apache/cassandra/db/EchoedRow.java    |   12 +-
 .../db/compaction/AbstractCompactedRow.java        |   13 +-
 .../db/compaction/AbstractCompactionTask.java      |   12 +-
 .../cassandra/db/compaction/CompactionTask.java    |    8 -
 .../db/compaction/LazilyCompactedRow.java          |   37 ++--
 .../cassandra/db/compaction/PrecompactedRow.java   |   11 +-
 .../compaction/SizeTieredCompactionStrategy.java   |   25 ++-
 .../apache/cassandra/io/sstable/ColumnStats.java   |   42 +++
 .../apache/cassandra/io/sstable/Descriptor.java    |    5 +-
 .../org/apache/cassandra/io/sstable/SSTable.java   |    2 +
 .../cassandra/io/sstable/SSTableMetadata.java      |   62 ++++-
 .../apache/cassandra/io/sstable/SSTableReader.java |    5 +
 .../apache/cassandra/io/sstable/SSTableWriter.java |   32 +--
 .../cassandra/streaming/IncomingStreamReader.java  |    2 -
 .../apache/cassandra/utils/StreamingHistogram.java |  199 +++++++++++++++
 .../cassandra/db/compaction/CompactionsTest.java   |   62 ++++-
 .../cassandra/utils/StreamingHistogramTest.java    |  123 +++++++++
 20 files changed, 595 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 898e140..16f8f0c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,8 @@
+1.2-dev
+ * Track tombstone expiration and compact when tombstone content is
+   higher than a configurable threshold, default 20% (CASSANDRA-3442)
+
+
 1.1.1-dev
  * optimize commitlog checksumming (CASSANDRA-3610)
  * identify and blacklist corrupted SSTables from future compactions 
(CASSANDRA-2261)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java 
b/src/java/org/apache/cassandra/db/ColumnFamily.java
index dcf52f4..f7398ee 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -22,6 +22,8 @@ import static org.apache.cassandra.db.DBConstants.*;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.utils.*;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 
 import org.apache.cassandra.cache.IRowCacheEntry;
@@ -31,10 +33,7 @@ import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.io.IColumnSerializer;
-import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.HeapAllocator;
+import org.apache.cassandra.io.sstable.ColumnStats;
 
 public class ColumnFamily extends AbstractColumnContainer implements 
IRowCacheEntry
 {
@@ -364,4 +363,19 @@ public class ColumnFamily extends AbstractColumnContainer 
implements IRowCacheEn
             column.validateFields(metadata);
         }
     }
+
+    public ColumnStats getColumnStats()
+    {
+        long maxTimestampSeen = Long.MIN_VALUE;
+        StreamingHistogram tombstones = new 
StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
+
+        for (IColumn column : columns)
+        {
+            maxTimestampSeen = Math.max(maxTimestampSeen, 
column.maxTimestamp());
+            int deletionTime = column.getLocalDeletionTime();
+            if (deletionTime < Integer.MAX_VALUE)
+                tombstones.update(deletionTime);
+        }
+        return new ColumnStats(getColumnCount(), maxTimestampSeen, tombstones);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java 
b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
index 0f6165a..31c8d4b 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.sstable.ColumnStats;
 
 public class ColumnFamilySerializer implements ISerializer<ColumnFamily>
 {
@@ -70,7 +71,7 @@ public class ColumnFamilySerializer implements 
ISerializer<ColumnFamily>
         serializeForSSTable(columnFamily, dos);
     }
 
-    public int serializeForSSTable(ColumnFamily columnFamily, DataOutput dos)
+    public void serializeForSSTable(ColumnFamily columnFamily, DataOutput dos)
     {
         try
         {
@@ -79,14 +80,8 @@ public class ColumnFamilySerializer implements 
ISerializer<ColumnFamily>
             Collection<IColumn> columns = columnFamily.getSortedColumns();
             int count = columns.size();
             dos.writeInt(count);
-            int i = 0;
             for (IColumn column : columns)
-            {
                 columnFamily.getColumnSerializer().serialize(column, dos);
-                i++;
-            }
-            assert count == i: "CF size changed during serialization: was " + 
count + " initially but " + i + " written";
-            return count;
         }
         catch (IOException e)
         {
@@ -100,10 +95,10 @@ public class ColumnFamilySerializer implements 
ISerializer<ColumnFamily>
         dos.writeLong(columnFamily.getMarkedForDeleteAt());
     }
 
-    public int serializeWithIndexes(ColumnFamily columnFamily, 
ColumnIndexer.RowHeader index, DataOutput dos)
+    public void serializeWithIndexes(ColumnFamily columnFamily, 
ColumnIndexer.RowHeader index, DataOutput dos)
     {
         ColumnIndexer.serialize(index, dos);
-        return serializeForSSTable(columnFamily, dos);
+        serializeForSSTable(columnFamily, dos);
     }
 
     public ColumnFamily deserialize(DataInput dis) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/db/EchoedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/EchoedRow.java 
b/src/java/org/apache/cassandra/db/EchoedRow.java
index 9067a10..69ec3d7 100644
--- a/src/java/org/apache/cassandra/db/EchoedRow.java
+++ b/src/java/org/apache/cassandra/db/EchoedRow.java
@@ -22,7 +22,10 @@ import java.io.IOException;
 import java.security.MessageDigest;
 
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.io.sstable.ColumnStats;
+import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.utils.StreamingHistogram;
 
 /**
  * A CompactedRow implementation that just echos the original row bytes 
without deserializing.
@@ -60,13 +63,8 @@ public class EchoedRow extends AbstractCompactedRow
         return false;
     }
 
-    public int columnCount()
+    public ColumnStats columnStats()
     {
-        return row.getColumnCount();
-    }
-
-    public long maxTimestamp()
-    {
-        return Long.MIN_VALUE;
+        return new ColumnStats(row.getColumnCount(), Long.MIN_VALUE, new 
StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
index f04e7c1..5a892c1 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.security.MessageDigest;
 
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.io.sstable.ColumnStats;
 
 /**
  * a CompactedRow is an object that takes a bunch of rows (keys + 
columnfamilies)
@@ -60,14 +61,8 @@ public abstract class AbstractCompactedRow
     public abstract boolean isEmpty();
 
     /**
-     * @return the number of columns in the row
+     * @return aggregate information about the columns in this row.  Some 
fields may
+     * contain default values if computing them value would require extra 
effort we're not willing to make.
      */
-    public abstract int columnCount();
-
-    /**
-     * @return the max column timestamp in the row or Long.MIN_VALUE if
-     * computing this value would require extra effort we're not willing to
-     * make.
-     */
-    public abstract long maxTimestamp();
+    public abstract ColumnStats columnStats();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index e39a533..e031e07 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -29,11 +29,13 @@ public abstract class AbstractCompactionTask
 {
     protected final ColumnFamilyStore cfs;
     protected Collection<SSTableReader> sstables;
+    protected boolean isUserDefined;
 
     public AbstractCompactionTask(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables)
     {
         this.cfs = cfs;
         this.sstables = sstables;
+        this.isUserDefined = false;
     }
 
     public abstract int execute(CompactionExecutorStatsCollector collector) 
throws IOException;
@@ -58,7 +60,9 @@ public abstract class AbstractCompactionTask
      */
     public boolean markSSTablesForCompaction()
     {
-        return markSSTablesForCompaction(cfs.getMinimumCompactionThreshold(), 
cfs.getMaximumCompactionThreshold());
+        int min = isUserDefined ? 1 : cfs.getMinimumCompactionThreshold();
+        int max = isUserDefined ? Integer.MAX_VALUE : 
cfs.getMaximumCompactionThreshold();
+        return markSSTablesForCompaction(min, max);
     }
 
     public boolean markSSTablesForCompaction(int min, int max)
@@ -83,4 +87,10 @@ public abstract class AbstractCompactionTask
     // Can be overriden for action that need to be performed if the task won't
     // execute (if sstable can't be marked successfully)
     protected void cancel() {}
+
+    public AbstractCompactionTask isUserDefined(boolean isUserDefined)
+    {
+        this.isUserDefined = isUserDefined;
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index f8d123e..4d77af0 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -42,14 +42,12 @@ public class CompactionTask extends AbstractCompactionTask
 {
     protected static final Logger logger = 
LoggerFactory.getLogger(CompactionTask.class);
     protected final int gcBefore;
-    protected boolean isUserDefined;
     protected static long totalBytesCompacted = 0;
 
     public CompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> 
sstables, final int gcBefore)
     {
         super(cfs, sstables);
         this.gcBefore = gcBefore;
-        this.isUserDefined = false;
     }
 
     public static synchronized long addToTotalBytesCompacted(long 
bytesCompacted)
@@ -262,10 +260,4 @@ public class CompactionTask extends AbstractCompactionTask
         }
         return max;
     }
-
-    public CompactionTask isUserDefined(boolean isUserDefined)
-    {
-        this.isUserDefined = isUserDefined;
-        return this;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java 
b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 61546f3..eaf401a 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -26,6 +26,9 @@ import java.util.List;
 
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterators;
+import org.apache.cassandra.io.sstable.ColumnStats;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.utils.StreamingHistogram;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,8 +61,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow 
implements IIterabl
     private final DataOutputBuffer headerBuffer;
     private ColumnFamily emptyColumnFamily;
     private Reducer reducer;
-    private int columnCount;
-    private long maxTimestamp;
+    private final ColumnStats columnStats;
     private long columnSerializedSize;
     private boolean closed;
 
@@ -85,9 +87,10 @@ public class LazilyCompactedRow extends AbstractCompactedRow 
implements IIterabl
         ColumnIndexer.serialize(this, headerBuffer);
         // reach into the reducer used during iteration to get column count, 
size, max column timestamp
         // (however, if there are zero columns, iterator() will not be called 
by ColumnIndexer and reducer will be null)
-        columnCount = reducer == null ? 0 : reducer.size;
+        columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns, 
reducer == null ? Long.MIN_VALUE : reducer.maxTimestampSeen,
+                                      reducer == null ? new 
StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones
+        );
         columnSerializedSize = reducer == null ? 0 : reducer.serializedSize;
-        maxTimestamp = reducer == null ? Long.MIN_VALUE : 
reducer.maxTimestampSeen;
         reducer = null;
     }
 
@@ -106,7 +109,7 @@ public class LazilyCompactedRow extends 
AbstractCompactedRow implements IIterabl
         out.writeLong(dataSize);
         out.write(headerBuffer.getData(), 0, headerBuffer.getLength());
         out.write(clockOut.getData(), 0, clockOut.getLength());
-        out.writeInt(columnCount);
+        out.writeInt(columnStats.columnCount);
 
         Iterator<IColumn> iter = iterator();
         while (iter.hasNext())
@@ -133,7 +136,7 @@ public class LazilyCompactedRow extends 
AbstractCompactedRow implements IIterabl
         try
         {
             ColumnFamily.serializer().serializeCFInfo(emptyColumnFamily, out);
-            out.writeInt(columnCount);
+            out.writeInt(columnStats.columnCount);
             digest.update(out.getData(), 0, out.getLength());
         }
         catch (IOException e)
@@ -154,7 +157,7 @@ public class LazilyCompactedRow extends 
AbstractCompactedRow implements IIterabl
         boolean cfIrrelevant = shouldPurge
                              ? 
ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == 
null
                              : !emptyColumnFamily.isMarkedForDelete(); // 
tombstones are relevant
-        return cfIrrelevant && columnCount == 0;
+        return cfIrrelevant && columnStats.columnCount == 0;
     }
 
     public int getEstimatedColumnCount()
@@ -179,14 +182,9 @@ public class LazilyCompactedRow extends 
AbstractCompactedRow implements IIterabl
         return Iterators.filter(iter, Predicates.notNull());
     }
 
-    public int columnCount()
+    public ColumnStats columnStats()
     {
-        return columnCount;
-    }
-
-    public long maxTimestamp()
-    {
-        return maxTimestamp;
+        return columnStats;
     }
 
     private void close()
@@ -209,8 +207,9 @@ public class LazilyCompactedRow extends 
AbstractCompactedRow implements IIterabl
     {
         ColumnFamily container = emptyColumnFamily.cloneMeShallow();
         long serializedSize = 4; // int for column count
-        int size = 0;
+        int columns = 0;
         long maxTimestampSeen = Long.MIN_VALUE;
+        StreamingHistogram tombstones = new 
StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
 
         public void reduce(IColumn current)
         {
@@ -227,9 +226,15 @@ public class LazilyCompactedRow extends 
AbstractCompactedRow implements IIterabl
             }
             IColumn reduced = purged.iterator().next();
             container.clear();
+
             serializedSize += reduced.serializedSize();
-            size++;
+            columns++;
             maxTimestampSeen = Math.max(maxTimestampSeen, 
reduced.maxTimestamp());
+            int deletionTime = reduced.getLocalDeletionTime();
+            if (deletionTime < Integer.MAX_VALUE)
+            {
+                tombstones.update(deletionTime);
+            }
             return reduced;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java 
b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index 8522ea6..35cd33d 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.security.MessageDigest;
 import java.util.List;
 
+import org.apache.cassandra.io.sstable.ColumnStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,6 +66,7 @@ public class PrecompactedRow extends AbstractCompactedRow
 
         if (cf.hasExpiredTombstones(controller.gcBefore))
             shouldPurge = controller.shouldPurge(key);
+
         // We should only gc tombstone if shouldPurge == true. But otherwise,
         // it is still ok to collect column that shadowed by their (deleted)
         // container, which removeDeleted(cf, Integer.MAX_VALUE) will do
@@ -161,14 +163,9 @@ public class PrecompactedRow extends AbstractCompactedRow
         return compactedCf == null;
     }
 
-    public int columnCount()
-    {
-        return compactedCf == null ? 0 : compactedCf.getColumnCount();
-    }
-
-    public long maxTimestamp()
+    public ColumnStats columnStats()
     {
-        return compactedCf.maxTimestamp();
+        return compactedCf.getColumnStats();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 97396e4..d6f4025 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -32,9 +32,12 @@ public class SizeTieredCompactionStrategy extends 
AbstractCompactionStrategy
 {
     private static final Logger logger = 
LoggerFactory.getLogger(SizeTieredCompactionStrategy.class);
     protected static final long DEFAULT_MIN_SSTABLE_SIZE = 50L * 1024L * 1024L;
+    protected static final float DEFAULT_TOMBSTONE_THRESHOLD = 0.2f;
     protected static final String MIN_SSTABLE_SIZE_KEY = "min_sstable_size";
+    protected static final String TOMBSTONE_THRESHOLD_KEY = 
"tombstone_threshold";
     protected long minSSTableSize;
     protected volatile int estimatedRemainingTasks;
+    protected float tombstoneThreshold;
 
     public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, 
String> options)
     {
@@ -44,6 +47,8 @@ public class SizeTieredCompactionStrategy extends 
AbstractCompactionStrategy
         minSSTableSize = (null != optionValue) ? Long.parseLong(optionValue) : 
DEFAULT_MIN_SSTABLE_SIZE;
         
cfs.setMaximumCompactionThreshold(cfs.metadata.getMaxCompactionThreshold());
         
cfs.setMinimumCompactionThreshold(cfs.metadata.getMinCompactionThreshold());
+        optionValue = options.get(TOMBSTONE_THRESHOLD_KEY);
+        tombstoneThreshold = (null != optionValue) ? 
Float.parseFloat(optionValue) : DEFAULT_TOMBSTONE_THRESHOLD;
     }
 
     public AbstractCompactionTask getNextBackgroundTask(final int gcBefore)
@@ -75,7 +80,21 @@ public class SizeTieredCompactionStrategy extends 
AbstractCompactionStrategy
         }
 
         if (prunedBuckets.isEmpty())
-            return null;
+        {
+            // if there is no sstable to compact in standard way, try 
compacting single sstable whose droppable tombstone
+            // ratio is greater than threshold.
+            for (List<SSTableReader> bucket : buckets)
+            {
+                for (SSTableReader table : bucket)
+                {
+                    if (table.getEstimatedDroppableTombstoneRatio(gcBefore) > 
tombstoneThreshold)
+                        prunedBuckets.add(Collections.singletonList(table));
+                }
+            }
+
+            if (prunedBuckets.isEmpty())
+                return null;
+        }
 
         List<SSTableReader> smallestBucket = Collections.min(prunedBuckets, 
new Comparator<List<SSTableReader>>()
         {
@@ -97,7 +116,8 @@ public class SizeTieredCompactionStrategy extends 
AbstractCompactionStrategy
                 return n / sstables.size();
             }
         });
-        return new CompactionTask(cfs, smallestBucket, gcBefore);
+        // when bucket only contains just one sstable, set userDefined to true 
to force single sstable compaction
+        return new CompactionTask(cfs, smallestBucket, 
gcBefore).isUserDefined(smallestBucket.size() == 1);
     }
 
     public AbstractCompactionTask getMaximalTask(final int gcBefore)
@@ -174,7 +194,6 @@ public class SizeTieredCompactionStrategy extends 
AbstractCompactionStrategy
                 buckets.put(bucket, size);
             }
         }
-
         return new LinkedList<List<T>>(buckets.keySet());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ColumnStats.java 
b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
new file mode 100644
index 0000000..a7dcfec
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import org.apache.cassandra.utils.StreamingHistogram;
+
+/**
+ * ColumnStats holds information about the columns for one row inside sstable
+ */
+public class ColumnStats
+{
+    /** how many columns are there in the row */
+    public final int columnCount;
+
+    /** the largest (client-supplied) timestamp in the row */
+    public final long maxTimestamp;
+
+    /** histogram of tombstone drop time */
+    public final StreamingHistogram tombstoneHistogram;
+
+    public ColumnStats(int columnCount, long maxTimestamp, StreamingHistogram 
tombstoneHistogram)
+    {
+        this.maxTimestamp = maxTimestamp;
+        this.columnCount = columnCount;
+        this.tombstoneHistogram = tombstoneHistogram;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java 
b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index a4ff975..734d742 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -53,7 +53,8 @@ public class Descriptor
     // h (1.0): tracks max client timestamp in metadata component
     // hb (1.0.3): records compression ration in metadata component
     // hc (1.0.4): records partitioner in metadata component
-    public static final String CURRENT_VERSION = "hc";
+    // hd (1.2): records estimated histogram of deletion times in tombstones
+    public static final String CURRENT_VERSION = "hd";
 
     public final File directory;
     /** version has the following format: <code>[a-z]+</code> */
@@ -73,6 +74,7 @@ public class Descriptor
     public final boolean tracksMaxTimestamp;
     public final boolean hasCompressionRatio;
     public final boolean hasPartitioner;
+    public final boolean tracksTombstones;
 
     /**
      * A descriptor that assumes CURRENT_VERSION.
@@ -101,6 +103,7 @@ public class Descriptor
         tracksMaxTimestamp = version.compareTo("h") >= 0;
         hasCompressionRatio = version.compareTo("hb") >= 0;
         hasPartitioner = version.compareTo("hc") >= 0;
+        tracksTombstones = version.compareTo("hd") >= 0;
         isLatestVersion = version.compareTo(CURRENT_VERSION) == 0;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java 
b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 5ac3298..074c7fd 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -61,6 +61,8 @@ public abstract class SSTable
 
     public static final String TEMPFILE_MARKER = "tmp";
 
+    public static final int TOMBSTONE_HISTOGRAM_BIN_SIZE = 100;
+
     public static final Comparator<SSTableReader> maxTimestampComparator = new 
Comparator<SSTableReader>()
     {
         public int compare(SSTableReader o1, SSTableReader o2)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
index d1d514f..42a6c73 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 
+import org.apache.cassandra.utils.StreamingHistogram;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +41,7 @@ import org.apache.cassandra.utils.EstimatedHistogram;
  *  - max column timestamp
  *  - compression ratio
  *  - partitioner
+ *  - tombstone drop time histogram
  *
  * An SSTableMetadata should be instantiated via the Collector, 
openFromDescriptor()
  * or createDefaultInstance()
@@ -56,6 +58,7 @@ public class SSTableMetadata
     public final long maxTimestamp;
     public final double compressionRatio;
     public final String partitioner;
+    public final StreamingHistogram estimatedTombstoneDropTime;
 
     private SSTableMetadata()
     {
@@ -64,10 +67,12 @@ public class SSTableMetadata
              ReplayPosition.NONE,
              Long.MIN_VALUE,
              Double.MIN_VALUE,
-             null);
+             null,
+             defaultTombstoneDropTimeHistogram());
     }
 
-    private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram 
columnCounts, ReplayPosition replayPosition, long maxTimestamp, double cr, 
String partitioner)
+    private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram 
columnCounts, ReplayPosition replayPosition, long maxTimestamp,
+                            double cr, String partitioner, StreamingHistogram 
estimatedTombstoneDropTime)
     {
         this.estimatedRowSize = rowSizes;
         this.estimatedColumnCount = columnCounts;
@@ -75,6 +80,7 @@ public class SSTableMetadata
         this.maxTimestamp = maxTimestamp;
         this.compressionRatio = cr;
         this.partitioner = partitioner;
+        this.estimatedTombstoneDropTime = estimatedTombstoneDropTime;
     }
 
     public static SSTableMetadata createDefaultInstance()
@@ -99,6 +105,26 @@ public class SSTableMetadata
         return new EstimatedHistogram(150);
     }
 
+    static StreamingHistogram defaultTombstoneDropTimeHistogram()
+    {
+        return new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
+    }
+
+    /**
+     * @param gcBefore
+     * @return estimated droppable tombstone ratio at given gcBefore time.
+     */
+    public double getEstimatedDroppableTombstoneRatio(int gcBefore)
+    {
+        long estimatedColumnCount = this.estimatedColumnCount.mean() * 
this.estimatedColumnCount.count();
+        if (estimatedColumnCount > 0)
+        {
+            double droppable = estimatedTombstoneDropTime.sum(gcBefore);
+            return droppable / estimatedColumnCount;
+        }
+        return 0.0f;
+    }
+
     public static class Collector
     {
         protected EstimatedHistogram estimatedRowSize = 
defaultRowSizeHistogram();
@@ -106,6 +132,7 @@ public class SSTableMetadata
         protected ReplayPosition replayPosition = ReplayPosition.NONE;
         protected long maxTimestamp = Long.MIN_VALUE;
         protected double compressionRatio = Double.MIN_VALUE;
+        protected StreamingHistogram estimatedTombstoneDropTime = 
defaultTombstoneDropTimeHistogram();
 
         public void addRowSize(long rowSize)
         {
@@ -117,6 +144,11 @@ public class SSTableMetadata
             estimatedColumnCount.add(columnCount);
         }
 
+        public void mergeTombstoneHistogram(StreamingHistogram histogram)
+        {
+            estimatedTombstoneDropTime.merge(histogram);
+        }
+
         /**
          * Ratio is compressed/uncompressed and it is
          * if you have 1.x then compression isn't helping
@@ -138,7 +170,8 @@ public class SSTableMetadata
                                        replayPosition,
                                        maxTimestamp,
                                        compressionRatio,
-                                       partitioner);
+                                       partitioner,
+                                       estimatedTombstoneDropTime);
         }
 
         public Collector estimatedRowSize(EstimatedHistogram estimatedRowSize)
@@ -158,6 +191,21 @@ public class SSTableMetadata
             this.replayPosition = replayPosition;
             return this;
         }
+
+        void update(long size, ColumnStats stats)
+        {
+            /*
+             * The max timestamp is not always collected here (more precisely, 
row.maxTimestamp() may return Long.MIN_VALUE),
+             * to avoid deserializing an EchoedRow.
+             * This is the reason why it is collected first when calling 
ColumnFamilyStore.createCompactionWriter
+             * However, for old sstables without timestamp, we still want to 
update the timestamp (and we know
+             * that in this case we will not use EchoedRow, since 
CompactionControler.needsDeserialize() will be true).
+            */
+            updateMaxTimestamp(stats.maxTimestamp);
+            addRowSize(size);
+            addColumnCount(stats.columnCount);
+            mergeTombstoneHistogram(stats.tombstoneHistogram);
+        }
     }
 
     public static class SSTableMetadataSerializer
@@ -174,6 +222,7 @@ public class SSTableMetadata
             dos.writeLong(sstableStats.maxTimestamp);
             dos.writeDouble(sstableStats.compressionRatio);
             dos.writeUTF(sstableStats.partitioner);
+            
StreamingHistogram.serializer.serialize(sstableStats.estimatedTombstoneDropTime,
 dos);
         }
 
         public SSTableMetadata deserialize(Descriptor descriptor) throws 
IOException
@@ -207,9 +256,12 @@ public class SSTableMetadata
             long maxTimestamp = desc.tracksMaxTimestamp ? dis.readLong() : 
Long.MIN_VALUE;
             double compressionRatio = desc.hasCompressionRatio
                                     ? dis.readDouble()
-                                    : Double.MIN_VALUE;
+                                              : Double.MIN_VALUE;
             String partitioner = desc.hasPartitioner ? dis.readUTF() : null;
-            return new SSTableMetadata(rowSizes, columnCounts, replayPosition, 
maxTimestamp, compressionRatio, partitioner);
+            StreamingHistogram tombstoneHistogram = desc.tracksTombstones
+                                                   ? 
StreamingHistogram.serializer.deserialize(dis)
+                                                   : 
defaultTombstoneDropTimeHistogram();
+            return new SSTableMetadata(rowSizes, columnCounts, replayPosition, 
maxTimestamp, compressionRatio, partitioner, tombstoneHistogram);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index a33c89a..072b6f7 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -955,6 +955,11 @@ public class SSTableReader extends SSTable
         return sstableMetadata.estimatedColumnCount;
     }
 
+    public double getEstimatedDroppableTombstoneRatio(int gcBefore)
+    {
+        return sstableMetadata.getEstimatedDroppableTombstoneRatio(gcBefore);
+    }
+
     public double getCompressionRatio()
     {
         return sstableMetadata.compressionRatio;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index b146d81..e07e151 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -158,16 +158,8 @@ public class SSTableWriter extends SSTable
         long dataSize = row.write(dataFile.stream);
         assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
                 : "incorrect row data size " + dataSize + " written to " + 
dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart 
+ 8));
-        /*
-         * The max timestamp is not always collected here (more precisely, 
row.maxTimestamp() may return Long.MIN_VALUE),
-         * to avoid deserializing an EchoedRow.
-         * This is the reason why it is collected first when calling 
ColumnFamilyStore.createCompactionWriter
-         * However, for old sstables without timestamp, we still want to 
update the timestamp (and we know
-         * that in this case we will not use EchoedRow, since 
CompactionControler.needsDeserialize() will be true).
-        */
-        sstableMetadataCollector.updateMaxTimestamp(row.maxTimestamp());
-        sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - 
currentPosition);
-        sstableMetadataCollector.addColumnCount(row.columnCount());
+
+        sstableMetadataCollector.update(dataFile.getFilePointer() - 
currentPosition, row.columnStats());
         afterAppend(row.key, currentPosition);
         return currentPosition;
     }
@@ -184,13 +176,10 @@ public class SSTableWriter extends SSTable
         dataFile.stream.writeLong(header.serializedSize() + 
cf.serializedSizeForSSTable());
 
         // write out row header and data
-        int columnCount = ColumnFamily.serializer().serializeWithIndexes(cf, 
header, dataFile.stream);
+        ColumnFamily.serializer().serializeWithIndexes(cf, header, 
dataFile.stream);
         afterAppend(decoratedKey, startPosition);
 
-        // track max column timestamp
-        sstableMetadataCollector.updateMaxTimestamp(cf.maxTimestamp());
-        sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - 
startPosition);
-        sstableMetadataCollector.addColumnCount(columnCount);
+        sstableMetadataCollector.update(dataFile.getFilePointer() - 
startPosition, cf.getColumnStats());
     }
 
     public void append(DecoratedKey decoratedKey, ByteBuffer value) throws 
IOException
@@ -234,6 +223,7 @@ public class SSTableWriter extends SSTable
 
         // deserialize each column to obtain maxTimestamp and immediately 
serialize it.
         long maxTimestamp = Long.MIN_VALUE;
+        StreamingHistogram tombstones = new 
StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
         ColumnFamily cf = ColumnFamily.create(metadata, 
ArrayBackedSortedColumns.factory());
         for (int i = 0; i < columnCount; i++)
         {
@@ -256,6 +246,12 @@ public class SSTableWriter extends SSTable
                     }
                 }
             }
+
+            int deletionTime = column.getLocalDeletionTime();
+            if (deletionTime < Integer.MAX_VALUE)
+            {
+                tombstones.update(deletionTime);
+            }
             maxTimestamp = Math.max(maxTimestamp, column.maxTimestamp());
             cf.getColumnSerializer().serialize(column, dataFile.stream);
         }
@@ -265,15 +261,11 @@ public class SSTableWriter extends SSTable
         sstableMetadataCollector.updateMaxTimestamp(maxTimestamp);
         sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - 
currentPosition);
         sstableMetadataCollector.addColumnCount(columnCount);
+        sstableMetadataCollector.mergeTombstoneHistogram(tombstones);
         afterAppend(key, currentPosition);
         return currentPosition;
     }
 
-    public void updateMaxTimestamp(long timestamp)
-    {
-        sstableMetadataCollector.updateMaxTimestamp(timestamp);
-    }
-
     /**
      * After failure, attempt to close the index writer and data file before 
deleting all temp components for the sstable
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java 
b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index a42ff85..64ef2f8 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -132,8 +132,6 @@ public class IncomingStreamReader
                         // We don't expire anything so the row shouldn't be 
empty
                         assert !row.isEmpty();
                         writer.append(row);
-                        // row append does not update the max timestamp on its 
own
-                        writer.updateMaxTimestamp(row.maxTimestamp());
 
                         // update cache
                         ColumnFamily cf = row.getFullColumnFamily();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/utils/StreamingHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StreamingHistogram.java 
b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
new file mode 100644
index 0000000..a95c259
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.io.ISerializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Histogram that can be constructed from streaming of data.
+ *
+ * The algorithm is taken from following paper:
+ * Yael Ben-Haim and Elad Tom-Tov, "A Streaming Parallel Decision Tree 
Algorithm" (2010)
+ * http://jmlr.csail.mit.edu/papers/volume11/ben-haim10a/ben-haim10a.pdf
+ */
+public class StreamingHistogram
+{
+    public static final StreamingHistogramSerializer serializer = new 
StreamingHistogramSerializer();
+
+    // TreeMap to hold bins of histogram.
+    private final TreeMap<Double, Long> bin;
+
+    // maximum bin size for this histogram
+    private final int maxBinSize;
+
+    /**
+     * Creates a new histogram with max bin size of maxBinSize
+     * @param maxBinSize maximum number of bins this histogram can have
+     */
+    public StreamingHistogram(int maxBinSize)
+    {
+        this.maxBinSize = maxBinSize;
+        bin = new TreeMap<Double, Long>();
+    }
+
+    private StreamingHistogram(int maxBinSize, Map<Double, Long> bin)
+    {
+        this.maxBinSize = maxBinSize;
+        this.bin = new TreeMap<Double, Long>(bin);
+    }
+
+    /**
+     * Adds new point p to this histogram.
+     * @param p
+     */
+    public void update(double p)
+    {
+        update(p, 1);
+    }
+
+    /**
+     * Adds new point p with value m to this histogram.
+     * @param p
+     * @param m
+     */
+    public void update(double p, long m)
+    {
+        Long mi = bin.get(p);
+        if (mi != null)
+        {
+            // we found the same p so increment that counter
+            bin.put(p, mi + m);
+        }
+        else
+        {
+            bin.put(p, m);
+            // if bin size exceeds maximum bin size then trim down to max size
+            while (bin.size() > maxBinSize)
+            {
+                // find points p1, p2 which have smallest difference
+                Iterator<Double> keys = bin.keySet().iterator();
+                double p1 = keys.next();
+                double p2 = keys.next();
+                double smallestDiff = p2 - p1;
+                double q1 = p1, q2 = p2;
+                while (keys.hasNext())
+                {
+                    p1 = p2;
+                    p2 = keys.next();
+                    double diff = p2 - p1;
+                    if (diff < smallestDiff)
+                    {
+                        smallestDiff = diff;
+                        q1 = p1;
+                        q2 = p2;
+                    }
+                }
+                // merge those two
+                long k1 = bin.remove(q1);
+                long k2 = bin.remove(q2);
+                bin.put((q1 * k1 + q2 * k2) / (k1 + k2), k1 + k2);
+            }
+        }
+    }
+
+    /**
+     * Merges given histogram with this histogram.
+     *
+     * @param other histogram to merge
+     */
+    public void merge(StreamingHistogram other)
+    {
+        if (other == null)
+            return;
+
+        for (Map.Entry<Double, Long> entry : other.getAsMap().entrySet())
+            update(entry.getKey(), entry.getValue());
+    }
+
+    /**
+     * Calculates estimated number of points in interval [-∞,b].
+     *
+     * @param b upper bound of a interval to calculate sum
+     * @return estimated number of points in a interval [-∞,b].
+     */
+    public double sum(double b)
+    {
+        double sum = 0;
+        // find the points pi, pnext which satisfy pi <= b < pnext
+        Map.Entry<Double, Long> pnext = bin.higherEntry(b);
+        if (pnext == null)
+        {
+            // if b is greater than any key in this histogram,
+            // just count all appearance and return
+            for (Long value : bin.values())
+                sum += value;
+        }
+        else
+        {
+            Map.Entry<Double, Long> pi = bin.floorEntry(b);
+            // calculate estimated count mb for point b
+            double weight = (b - pi.getKey()) / (pnext.getKey() - pi.getKey());
+            double mb = pi.getValue() + (pnext.getValue() - pi.getValue()) * 
weight;
+            sum += (pi.getValue() + mb) * weight / 2;
+
+            sum += pi.getValue() / 2;
+            for (Long value : bin.headMap(pi.getKey(), false).values())
+                sum += value;
+        }
+        return sum;
+    }
+
+    public Map<Double, Long> getAsMap()
+    {
+        return Collections.unmodifiableMap(bin);
+    }
+
+    public static class StreamingHistogramSerializer implements 
ISerializer<StreamingHistogram>
+    {
+        public void serialize(StreamingHistogram histogram, DataOutput dos) 
throws IOException
+        {
+            dos.writeInt(histogram.maxBinSize);
+            Map<Double, Long> entries = histogram.getAsMap();
+            dos.writeInt(entries.size());
+            for (Map.Entry<Double, Long> entry : entries.entrySet())
+            {
+                dos.writeDouble(entry.getKey());
+                dos.writeLong(entry.getValue());
+            }
+        }
+
+        public StreamingHistogram deserialize(DataInput dis) throws IOException
+        {
+            int maxBinSize = dis.readInt();
+            int size = dis.readInt();
+            Map<Double, Long> tmp = new HashMap<Double, Long>(size);
+            for (int i = 0; i < size; i++)
+            {
+                tmp.put(dis.readDouble(), dis.readLong());
+            }
+
+            return new StreamingHistogram(maxBinSize, tmp);
+        }
+
+        public long serializedSize(StreamingHistogram histogram)
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 23a2657..a9c66bf 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -20,17 +20,15 @@ package org.apache.cassandra.db.compaction;
 
 import java.io.*;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
 
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.Util;
@@ -100,6 +98,60 @@ public class CompactionsTest extends CleanupHelper
         store.truncate();
     }
 
+    /**
+     * Test to see if sstable has enough expired columns, it is compacted 
itself.
+     */
+    @Test
+    public void testSingleSSTableCompaction() throws Exception
+    {
+        Table table = Table.open(TABLE1);
+        ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+        store.clearUnsafe();
+        store.metadata.gcGraceSeconds(5);
+
+        // update SizeTieredCompactionStrategy's min_sstable_size to something 
small
+        // to split bucket for ttl'd sstable from others
+        Map<String, String> opts = new HashMap<String, String>();
+        opts.put(SizeTieredCompactionStrategy.MIN_SSTABLE_SIZE_KEY, "512");
+        store.metadata.compactionStrategyOptions(opts);
+        
store.setCompactionStrategyClass(SizeTieredCompactionStrategy.class.getCanonicalName());
+        // disable compaction while flushing
+        store.disableAutoCompaction();
+
+        long timestamp = System.currentTimeMillis();
+        for (int i = 0; i < 10; i++)
+        {
+            DecoratedKey key = Util.dk(Integer.toString(i));
+            RowMutation rm = new RowMutation(TABLE1, key.key);
+            for (int j = 0; j < 10; j++)
+                rm.add(new QueryPath("Standard1", null, 
ByteBufferUtil.bytes(Integer.toString(j))),
+                       ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                       timestamp,
+                       j > 0 ? 3 : 0); // let first column never expire, since 
deleting all columns does not produce sstable
+            rm.apply();
+        }
+        store.forceBlockingFlush();
+        assertEquals(1, store.getSSTables().size());
+        long originalSize = 
store.getSSTables().iterator().next().uncompressedLength();
+
+        // wait enough to force single compaction
+        TimeUnit.SECONDS.sleep(5);
+
+        // enable compaction, submit background and wait for it to complete
+        store.setMinimumCompactionThreshold(2);
+        store.setMaximumCompactionThreshold(4);
+        
FBUtilities.waitOnFuture(CompactionManager.instance.submitBackground(store));
+        while (CompactionManager.instance.getPendingTasks() > 0 || 
CompactionManager.instance.getActiveCompactions() > 0)
+            TimeUnit.SECONDS.sleep(1);
+
+        // and sstable with ttl should be compacted
+        assertEquals(1, store.getSSTables().size());
+        long size = store.getSSTables().iterator().next().uncompressedLength();
+        assertTrue("should be less than " + originalSize + ", but was " + 
size, size < originalSize);
+
+        // make sure max timestamp of compacted sstables is recorded properly 
after compaction.
+        assertMaxTimestamp(store, timestamp);
+    }
 
     @Test
     public void testSuperColumnCompactions() throws IOException, 
ExecutionException, InterruptedException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java 
b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
new file mode 100644
index 0000000..0da6849
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import com.google.common.io.ByteArrayDataOutput;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.*;
+
+import static junit.framework.Assert.assertEquals;
+
+public class StreamingHistogramTest
+{
+    @Test
+    public void testFunction() throws Exception
+    {
+        StreamingHistogram hist = new StreamingHistogram(5);
+        long[] samples = new long[]{23, 19, 10, 16, 36, 2, 9, 32, 30, 45};
+
+        // add 7 points to histogram of 5 bins
+        for (int i = 0; i < 7; i++)
+        {
+            hist.update(samples[i]);
+        }
+
+        // should end up (2,1),(9.5,2),(17.5,2),(23,1),(36,1)
+        Map<Double, Long> expected1 = new LinkedHashMap<Double, Long>(5);
+        expected1.put(2.0, 1L);
+        expected1.put(9.5, 2L);
+        expected1.put(17.5, 2L);
+        expected1.put(23.0, 1L);
+        expected1.put(36.0, 1L);
+
+        Iterator<Map.Entry<Double, Long>> expectedItr = 
expected1.entrySet().iterator();
+        for (Map.Entry<Double, Long> actual : hist.getAsMap().entrySet())
+        {
+            Map.Entry<Double, Long> entry = expectedItr.next();
+            assertEquals(entry.getKey(), actual.getKey(), 0.01);
+            assertEquals(entry.getValue(), actual.getValue());
+        }
+
+        // merge test
+        StreamingHistogram hist2 = new StreamingHistogram(3);
+        for (int i = 7; i < samples.length; i++)
+        {
+            hist2.update(samples[i]);
+        }
+        hist.merge(hist2);
+        // should end up (2,1),(9.5,2),(19.33,3),(32.67,3),(45,1)
+        Map<Double, Long> expected2 = new LinkedHashMap<Double, Long>(5);
+        expected2.put(2.0, 1L);
+        expected2.put(9.5, 2L);
+        expected2.put(19.33, 3L);
+        expected2.put(32.67, 3L);
+        expected2.put(45.0, 1L);
+        expectedItr = expected2.entrySet().iterator();
+        for (Map.Entry<Double, Long> actual : hist.getAsMap().entrySet())
+        {
+            Map.Entry<Double, Long> entry = expectedItr.next();
+            assertEquals(entry.getKey(), actual.getKey(), 0.01);
+            assertEquals(entry.getValue(), actual.getValue());
+        }
+
+        // sum test
+        assertEquals(3.28, hist.sum(15), 0.01);
+        // sum test (b > max(hist))
+        assertEquals(10.0, hist.sum(50), 0.01);
+    }
+
+    @Test
+    public void testSerDe() throws Exception
+    {
+        StreamingHistogram hist = new StreamingHistogram(5);
+        long[] samples = new long[]{23, 19, 10, 16, 36, 2, 9};
+
+        // add 7 points to histogram of 5 bins
+        for (int i = 0; i < samples.length; i++)
+        {
+            hist.update(samples[i]);
+        }
+
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        StreamingHistogram.serializer.serialize(hist, new 
DataOutputStream(out));
+        byte[] bytes = out.toByteArray();
+
+        StreamingHistogram deserialized = 
StreamingHistogram.serializer.deserialize(new DataInputStream(new 
ByteArrayInputStream(bytes)));
+
+        // deserialized histogram should have following values
+        Map<Double, Long> expected1 = new LinkedHashMap<Double, Long>(5);
+        expected1.put(2.0, 1L);
+        expected1.put(9.5, 2L);
+        expected1.put(17.5, 2L);
+        expected1.put(23.0, 1L);
+        expected1.put(36.0, 1L);
+
+        Iterator<Map.Entry<Double, Long>> expectedItr = 
expected1.entrySet().iterator();
+        for (Map.Entry<Double, Long> actual : 
deserialized.getAsMap().entrySet())
+        {
+            Map.Entry<Double, Long> entry = expectedItr.next();
+            assertEquals(entry.getKey(), actual.getKey(), 0.01);
+            assertEquals(entry.getValue(), actual.getValue());
+        }
+    }
+}

Reply via email to