Updated Branches: refs/heads/trunk 29804fa9a -> 6b2ea2647
Track tombstone expiration patch by yukim; reviewed by jbellis for CASSANDRA-3442 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6b2ea264 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6b2ea264 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6b2ea264 Branch: refs/heads/trunk Commit: 6b2ea264702f80518a1147ab0aac44e0cf40dfc3 Parents: 29804fa Author: Jonathan Ellis <jbel...@apache.org> Authored: Wed Mar 7 15:21:11 2012 -0600 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Wed Mar 7 15:34:28 2012 -0600 ---------------------------------------------------------------------- CHANGES.txt | 5 + src/java/org/apache/cassandra/db/ColumnFamily.java | 22 ++- .../cassandra/db/ColumnFamilySerializer.java | 13 +- src/java/org/apache/cassandra/db/EchoedRow.java | 12 +- .../db/compaction/AbstractCompactedRow.java | 13 +- .../db/compaction/AbstractCompactionTask.java | 12 +- .../cassandra/db/compaction/CompactionTask.java | 8 - .../db/compaction/LazilyCompactedRow.java | 37 ++-- .../cassandra/db/compaction/PrecompactedRow.java | 11 +- .../compaction/SizeTieredCompactionStrategy.java | 25 ++- .../apache/cassandra/io/sstable/ColumnStats.java | 42 +++ .../apache/cassandra/io/sstable/Descriptor.java | 5 +- .../org/apache/cassandra/io/sstable/SSTable.java | 2 + .../cassandra/io/sstable/SSTableMetadata.java | 62 ++++- .../apache/cassandra/io/sstable/SSTableReader.java | 5 + .../apache/cassandra/io/sstable/SSTableWriter.java | 32 +-- .../cassandra/streaming/IncomingStreamReader.java | 2 - .../apache/cassandra/utils/StreamingHistogram.java | 199 +++++++++++++++ .../cassandra/db/compaction/CompactionsTest.java | 62 ++++- .../cassandra/utils/StreamingHistogramTest.java | 123 +++++++++ 20 files changed, 595 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 898e140..16f8f0c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,8 @@ +1.2-dev + * Track tombstone expiration and compact when tombstone content is + higher than a configurable threshold, default 20% (CASSANDRA-3442) + + 1.1.1-dev * optimize commitlog checksumming (CASSANDRA-3610) * identify and blacklist corrupted SSTables from future compactions (CASSANDRA-2261) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/db/ColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java index dcf52f4..f7398ee 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamily.java +++ b/src/java/org/apache/cassandra/db/ColumnFamily.java @@ -22,6 +22,8 @@ import static org.apache.cassandra.db.DBConstants.*; import java.nio.ByteBuffer; import java.security.MessageDigest; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.utils.*; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.cassandra.cache.IRowCacheEntry; @@ -31,10 +33,7 @@ import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.MarshalException; import org.apache.cassandra.io.IColumnSerializer; -import org.apache.cassandra.utils.Allocator; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.HeapAllocator; +import org.apache.cassandra.io.sstable.ColumnStats; public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEntry { @@ -364,4 +363,19 @@ public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEn column.validateFields(metadata); } } + + public ColumnStats getColumnStats() + { + long maxTimestampSeen = Long.MIN_VALUE; + StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE); + + for (IColumn column : columns) + { + maxTimestampSeen = Math.max(maxTimestampSeen, column.maxTimestamp()); + int deletionTime = column.getLocalDeletionTime(); + if (deletionTime < Integer.MAX_VALUE) + tombstones.update(deletionTime); + } + return new ColumnStats(getColumnCount(), maxTimestampSeen, tombstones); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java index 0f6165a..31c8d4b 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.io.IColumnSerializer; import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.sstable.ColumnStats; public class ColumnFamilySerializer implements ISerializer<ColumnFamily> { @@ -70,7 +71,7 @@ public class ColumnFamilySerializer implements ISerializer<ColumnFamily> serializeForSSTable(columnFamily, dos); } - public int serializeForSSTable(ColumnFamily columnFamily, DataOutput dos) + public void serializeForSSTable(ColumnFamily columnFamily, DataOutput dos) { try { @@ -79,14 +80,8 @@ public class ColumnFamilySerializer implements ISerializer<ColumnFamily> Collection<IColumn> columns = columnFamily.getSortedColumns(); int count = columns.size(); dos.writeInt(count); - int i = 0; for (IColumn column : columns) - { columnFamily.getColumnSerializer().serialize(column, dos); - i++; - } - assert count == i: "CF size changed during serialization: was " + count + " initially but " + i + " written"; - return count; } catch (IOException e) { @@ -100,10 +95,10 @@ public class ColumnFamilySerializer implements ISerializer<ColumnFamily> dos.writeLong(columnFamily.getMarkedForDeleteAt()); } - public int serializeWithIndexes(ColumnFamily columnFamily, ColumnIndexer.RowHeader index, DataOutput dos) + public void serializeWithIndexes(ColumnFamily columnFamily, ColumnIndexer.RowHeader index, DataOutput dos) { ColumnIndexer.serialize(index, dos); - return serializeForSSTable(columnFamily, dos); + serializeForSSTable(columnFamily, dos); } public ColumnFamily deserialize(DataInput dis) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/db/EchoedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/EchoedRow.java b/src/java/org/apache/cassandra/db/EchoedRow.java index 9067a10..69ec3d7 100644 --- a/src/java/org/apache/cassandra/db/EchoedRow.java +++ b/src/java/org/apache/cassandra/db/EchoedRow.java @@ -22,7 +22,10 @@ import java.io.IOException; import java.security.MessageDigest; import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.io.sstable.ColumnStats; +import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; +import org.apache.cassandra.utils.StreamingHistogram; /** * A CompactedRow implementation that just echos the original row bytes without deserializing. @@ -60,13 +63,8 @@ public class EchoedRow extends AbstractCompactedRow return false; } - public int columnCount() + public ColumnStats columnStats() { - return row.getColumnCount(); - } - - public long maxTimestamp() - { - return Long.MIN_VALUE; + return new ColumnStats(row.getColumnCount(), Long.MIN_VALUE, new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java index f04e7c1..5a892c1 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.security.MessageDigest; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.io.sstable.ColumnStats; /** * a CompactedRow is an object that takes a bunch of rows (keys + columnfamilies) @@ -60,14 +61,8 @@ public abstract class AbstractCompactedRow public abstract boolean isEmpty(); /** - * @return the number of columns in the row + * @return aggregate information about the columns in this row. Some fields may + * contain default values if computing them value would require extra effort we're not willing to make. */ - public abstract int columnCount(); - - /** - * @return the max column timestamp in the row or Long.MIN_VALUE if - * computing this value would require extra effort we're not willing to - * make. - */ - public abstract long maxTimestamp(); + public abstract ColumnStats columnStats(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java index e39a533..e031e07 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java @@ -29,11 +29,13 @@ public abstract class AbstractCompactionTask { protected final ColumnFamilyStore cfs; protected Collection<SSTableReader> sstables; + protected boolean isUserDefined; public AbstractCompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables) { this.cfs = cfs; this.sstables = sstables; + this.isUserDefined = false; } public abstract int execute(CompactionExecutorStatsCollector collector) throws IOException; @@ -58,7 +60,9 @@ public abstract class AbstractCompactionTask */ public boolean markSSTablesForCompaction() { - return markSSTablesForCompaction(cfs.getMinimumCompactionThreshold(), cfs.getMaximumCompactionThreshold()); + int min = isUserDefined ? 1 : cfs.getMinimumCompactionThreshold(); + int max = isUserDefined ? Integer.MAX_VALUE : cfs.getMaximumCompactionThreshold(); + return markSSTablesForCompaction(min, max); } public boolean markSSTablesForCompaction(int min, int max) @@ -83,4 +87,10 @@ public abstract class AbstractCompactionTask // Can be overriden for action that need to be performed if the task won't // execute (if sstable can't be marked successfully) protected void cancel() {} + + public AbstractCompactionTask isUserDefined(boolean isUserDefined) + { + this.isUserDefined = isUserDefined; + return this; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index f8d123e..4d77af0 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -42,14 +42,12 @@ public class CompactionTask extends AbstractCompactionTask { protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class); protected final int gcBefore; - protected boolean isUserDefined; protected static long totalBytesCompacted = 0; public CompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, final int gcBefore) { super(cfs, sstables); this.gcBefore = gcBefore; - this.isUserDefined = false; } public static synchronized long addToTotalBytesCompacted(long bytesCompacted) @@ -262,10 +260,4 @@ public class CompactionTask extends AbstractCompactionTask } return max; } - - public CompactionTask isUserDefined(boolean isUserDefined) - { - this.isUserDefined = isUserDefined; - return this; - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java index 61546f3..eaf401a 100644 --- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java @@ -26,6 +26,9 @@ import java.util.List; import com.google.common.base.Predicates; import com.google.common.collect.Iterators; +import org.apache.cassandra.io.sstable.ColumnStats; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.utils.StreamingHistogram; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,8 +61,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl private final DataOutputBuffer headerBuffer; private ColumnFamily emptyColumnFamily; private Reducer reducer; - private int columnCount; - private long maxTimestamp; + private final ColumnStats columnStats; private long columnSerializedSize; private boolean closed; @@ -85,9 +87,10 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl ColumnIndexer.serialize(this, headerBuffer); // reach into the reducer used during iteration to get column count, size, max column timestamp // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null) - columnCount = reducer == null ? 0 : reducer.size; + columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns, reducer == null ? Long.MIN_VALUE : reducer.maxTimestampSeen, + reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones + ); columnSerializedSize = reducer == null ? 0 : reducer.serializedSize; - maxTimestamp = reducer == null ? Long.MIN_VALUE : reducer.maxTimestampSeen; reducer = null; } @@ -106,7 +109,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl out.writeLong(dataSize); out.write(headerBuffer.getData(), 0, headerBuffer.getLength()); out.write(clockOut.getData(), 0, clockOut.getLength()); - out.writeInt(columnCount); + out.writeInt(columnStats.columnCount); Iterator<IColumn> iter = iterator(); while (iter.hasNext()) @@ -133,7 +136,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl try { ColumnFamily.serializer().serializeCFInfo(emptyColumnFamily, out); - out.writeInt(columnCount); + out.writeInt(columnStats.columnCount); digest.update(out.getData(), 0, out.getLength()); } catch (IOException e) @@ -154,7 +157,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl boolean cfIrrelevant = shouldPurge ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant - return cfIrrelevant && columnCount == 0; + return cfIrrelevant && columnStats.columnCount == 0; } public int getEstimatedColumnCount() @@ -179,14 +182,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl return Iterators.filter(iter, Predicates.notNull()); } - public int columnCount() + public ColumnStats columnStats() { - return columnCount; - } - - public long maxTimestamp() - { - return maxTimestamp; + return columnStats; } private void close() @@ -209,8 +207,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl { ColumnFamily container = emptyColumnFamily.cloneMeShallow(); long serializedSize = 4; // int for column count - int size = 0; + int columns = 0; long maxTimestampSeen = Long.MIN_VALUE; + StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE); public void reduce(IColumn current) { @@ -227,9 +226,15 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl } IColumn reduced = purged.iterator().next(); container.clear(); + serializedSize += reduced.serializedSize(); - size++; + columns++; maxTimestampSeen = Math.max(maxTimestampSeen, reduced.maxTimestamp()); + int deletionTime = reduced.getLocalDeletionTime(); + if (deletionTime < Integer.MAX_VALUE) + { + tombstones.update(deletionTime); + } return reduced; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java index 8522ea6..35cd33d 100644 --- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.security.MessageDigest; import java.util.List; +import org.apache.cassandra.io.sstable.ColumnStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +66,7 @@ public class PrecompactedRow extends AbstractCompactedRow if (cf.hasExpiredTombstones(controller.gcBefore)) shouldPurge = controller.shouldPurge(key); + // We should only gc tombstone if shouldPurge == true. But otherwise, // it is still ok to collect column that shadowed by their (deleted) // container, which removeDeleted(cf, Integer.MAX_VALUE) will do @@ -161,14 +163,9 @@ public class PrecompactedRow extends AbstractCompactedRow return compactedCf == null; } - public int columnCount() - { - return compactedCf == null ? 0 : compactedCf.getColumnCount(); - } - - public long maxTimestamp() + public ColumnStats columnStats() { - return compactedCf.maxTimestamp(); + return compactedCf.getColumnStats(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 97396e4..d6f4025 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -32,9 +32,12 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy { private static final Logger logger = LoggerFactory.getLogger(SizeTieredCompactionStrategy.class); protected static final long DEFAULT_MIN_SSTABLE_SIZE = 50L * 1024L * 1024L; + protected static final float DEFAULT_TOMBSTONE_THRESHOLD = 0.2f; protected static final String MIN_SSTABLE_SIZE_KEY = "min_sstable_size"; + protected static final String TOMBSTONE_THRESHOLD_KEY = "tombstone_threshold"; protected long minSSTableSize; protected volatile int estimatedRemainingTasks; + protected float tombstoneThreshold; public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options) { @@ -44,6 +47,8 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy minSSTableSize = (null != optionValue) ? Long.parseLong(optionValue) : DEFAULT_MIN_SSTABLE_SIZE; cfs.setMaximumCompactionThreshold(cfs.metadata.getMaxCompactionThreshold()); cfs.setMinimumCompactionThreshold(cfs.metadata.getMinCompactionThreshold()); + optionValue = options.get(TOMBSTONE_THRESHOLD_KEY); + tombstoneThreshold = (null != optionValue) ? Float.parseFloat(optionValue) : DEFAULT_TOMBSTONE_THRESHOLD; } public AbstractCompactionTask getNextBackgroundTask(final int gcBefore) @@ -75,7 +80,21 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy } if (prunedBuckets.isEmpty()) - return null; + { + // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone + // ratio is greater than threshold. + for (List<SSTableReader> bucket : buckets) + { + for (SSTableReader table : bucket) + { + if (table.getEstimatedDroppableTombstoneRatio(gcBefore) > tombstoneThreshold) + prunedBuckets.add(Collections.singletonList(table)); + } + } + + if (prunedBuckets.isEmpty()) + return null; + } List<SSTableReader> smallestBucket = Collections.min(prunedBuckets, new Comparator<List<SSTableReader>>() { @@ -97,7 +116,8 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy return n / sstables.size(); } }); - return new CompactionTask(cfs, smallestBucket, gcBefore); + // when bucket only contains just one sstable, set userDefined to true to force single sstable compaction + return new CompactionTask(cfs, smallestBucket, gcBefore).isUserDefined(smallestBucket.size() == 1); } public AbstractCompactionTask getMaximalTask(final int gcBefore) @@ -174,7 +194,6 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy buckets.put(bucket, size); } } - return new LinkedList<List<T>>(buckets.keySet()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/io/sstable/ColumnStats.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/ColumnStats.java b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java new file mode 100644 index 0000000..a7dcfec --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable; + +import org.apache.cassandra.utils.StreamingHistogram; + +/** + * ColumnStats holds information about the columns for one row inside sstable + */ +public class ColumnStats +{ + /** how many columns are there in the row */ + public final int columnCount; + + /** the largest (client-supplied) timestamp in the row */ + public final long maxTimestamp; + + /** histogram of tombstone drop time */ + public final StreamingHistogram tombstoneHistogram; + + public ColumnStats(int columnCount, long maxTimestamp, StreamingHistogram tombstoneHistogram) + { + this.maxTimestamp = maxTimestamp; + this.columnCount = columnCount; + this.tombstoneHistogram = tombstoneHistogram; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/io/sstable/Descriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java index a4ff975..734d742 100644 --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java @@ -53,7 +53,8 @@ public class Descriptor // h (1.0): tracks max client timestamp in metadata component // hb (1.0.3): records compression ration in metadata component // hc (1.0.4): records partitioner in metadata component - public static final String CURRENT_VERSION = "hc"; + // hd (1.2): records estimated histogram of deletion times in tombstones + public static final String CURRENT_VERSION = "hd"; public final File directory; /** version has the following format: <code>[a-z]+</code> */ @@ -73,6 +74,7 @@ public class Descriptor public final boolean tracksMaxTimestamp; public final boolean hasCompressionRatio; public final boolean hasPartitioner; + public final boolean tracksTombstones; /** * A descriptor that assumes CURRENT_VERSION. @@ -101,6 +103,7 @@ public class Descriptor tracksMaxTimestamp = version.compareTo("h") >= 0; hasCompressionRatio = version.compareTo("hb") >= 0; hasPartitioner = version.compareTo("hc") >= 0; + tracksTombstones = version.compareTo("hd") >= 0; isLatestVersion = version.compareTo(CURRENT_VERSION) == 0; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/io/sstable/SSTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java index 5ac3298..074c7fd 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTable.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java @@ -61,6 +61,8 @@ public abstract class SSTable public static final String TEMPFILE_MARKER = "tmp"; + public static final int TOMBSTONE_HISTOGRAM_BIN_SIZE = 100; + public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>() { public int compare(SSTableReader o1, SSTableReader o2) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java index d1d514f..42a6c73 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import org.apache.cassandra.utils.StreamingHistogram; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,7 @@ import org.apache.cassandra.utils.EstimatedHistogram; * - max column timestamp * - compression ratio * - partitioner + * - tombstone drop time histogram * * An SSTableMetadata should be instantiated via the Collector, openFromDescriptor() * or createDefaultInstance() @@ -56,6 +58,7 @@ public class SSTableMetadata public final long maxTimestamp; public final double compressionRatio; public final String partitioner; + public final StreamingHistogram estimatedTombstoneDropTime; private SSTableMetadata() { @@ -64,10 +67,12 @@ public class SSTableMetadata ReplayPosition.NONE, Long.MIN_VALUE, Double.MIN_VALUE, - null); + null, + defaultTombstoneDropTimeHistogram()); } - private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition, long maxTimestamp, double cr, String partitioner) + private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition, long maxTimestamp, + double cr, String partitioner, StreamingHistogram estimatedTombstoneDropTime) { this.estimatedRowSize = rowSizes; this.estimatedColumnCount = columnCounts; @@ -75,6 +80,7 @@ public class SSTableMetadata this.maxTimestamp = maxTimestamp; this.compressionRatio = cr; this.partitioner = partitioner; + this.estimatedTombstoneDropTime = estimatedTombstoneDropTime; } public static SSTableMetadata createDefaultInstance() @@ -99,6 +105,26 @@ public class SSTableMetadata return new EstimatedHistogram(150); } + static StreamingHistogram defaultTombstoneDropTimeHistogram() + { + return new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE); + } + + /** + * @param gcBefore + * @return estimated droppable tombstone ratio at given gcBefore time. + */ + public double getEstimatedDroppableTombstoneRatio(int gcBefore) + { + long estimatedColumnCount = this.estimatedColumnCount.mean() * this.estimatedColumnCount.count(); + if (estimatedColumnCount > 0) + { + double droppable = estimatedTombstoneDropTime.sum(gcBefore); + return droppable / estimatedColumnCount; + } + return 0.0f; + } + public static class Collector { protected EstimatedHistogram estimatedRowSize = defaultRowSizeHistogram(); @@ -106,6 +132,7 @@ public class SSTableMetadata protected ReplayPosition replayPosition = ReplayPosition.NONE; protected long maxTimestamp = Long.MIN_VALUE; protected double compressionRatio = Double.MIN_VALUE; + protected StreamingHistogram estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogram(); public void addRowSize(long rowSize) { @@ -117,6 +144,11 @@ public class SSTableMetadata estimatedColumnCount.add(columnCount); } + public void mergeTombstoneHistogram(StreamingHistogram histogram) + { + estimatedTombstoneDropTime.merge(histogram); + } + /** * Ratio is compressed/uncompressed and it is * if you have 1.x then compression isn't helping @@ -138,7 +170,8 @@ public class SSTableMetadata replayPosition, maxTimestamp, compressionRatio, - partitioner); + partitioner, + estimatedTombstoneDropTime); } public Collector estimatedRowSize(EstimatedHistogram estimatedRowSize) @@ -158,6 +191,21 @@ public class SSTableMetadata this.replayPosition = replayPosition; return this; } + + void update(long size, ColumnStats stats) + { + /* + * The max timestamp is not always collected here (more precisely, row.maxTimestamp() may return Long.MIN_VALUE), + * to avoid deserializing an EchoedRow. + * This is the reason why it is collected first when calling ColumnFamilyStore.createCompactionWriter + * However, for old sstables without timestamp, we still want to update the timestamp (and we know + * that in this case we will not use EchoedRow, since CompactionControler.needsDeserialize() will be true). + */ + updateMaxTimestamp(stats.maxTimestamp); + addRowSize(size); + addColumnCount(stats.columnCount); + mergeTombstoneHistogram(stats.tombstoneHistogram); + } } public static class SSTableMetadataSerializer @@ -174,6 +222,7 @@ public class SSTableMetadata dos.writeLong(sstableStats.maxTimestamp); dos.writeDouble(sstableStats.compressionRatio); dos.writeUTF(sstableStats.partitioner); + StreamingHistogram.serializer.serialize(sstableStats.estimatedTombstoneDropTime, dos); } public SSTableMetadata deserialize(Descriptor descriptor) throws IOException @@ -207,9 +256,12 @@ public class SSTableMetadata long maxTimestamp = desc.tracksMaxTimestamp ? dis.readLong() : Long.MIN_VALUE; double compressionRatio = desc.hasCompressionRatio ? dis.readDouble() - : Double.MIN_VALUE; + : Double.MIN_VALUE; String partitioner = desc.hasPartitioner ? dis.readUTF() : null; - return new SSTableMetadata(rowSizes, columnCounts, replayPosition, maxTimestamp, compressionRatio, partitioner); + StreamingHistogram tombstoneHistogram = desc.tracksTombstones + ? StreamingHistogram.serializer.deserialize(dis) + : defaultTombstoneDropTimeHistogram(); + return new SSTableMetadata(rowSizes, columnCounts, replayPosition, maxTimestamp, compressionRatio, partitioner, tombstoneHistogram); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index a33c89a..072b6f7 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -955,6 +955,11 @@ public class SSTableReader extends SSTable return sstableMetadata.estimatedColumnCount; } + public double getEstimatedDroppableTombstoneRatio(int gcBefore) + { + return sstableMetadata.getEstimatedDroppableTombstoneRatio(gcBefore); + } + public double getCompressionRatio() { return sstableMetadata.compressionRatio; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index b146d81..e07e151 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -158,16 +158,8 @@ public class SSTableWriter extends SSTable long dataSize = row.write(dataFile.stream); assert dataSize == dataFile.getFilePointer() - (dataStart + 8) : "incorrect row data size " + dataSize + " written to " + dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart + 8)); - /* - * The max timestamp is not always collected here (more precisely, row.maxTimestamp() may return Long.MIN_VALUE), - * to avoid deserializing an EchoedRow. - * This is the reason why it is collected first when calling ColumnFamilyStore.createCompactionWriter - * However, for old sstables without timestamp, we still want to update the timestamp (and we know - * that in this case we will not use EchoedRow, since CompactionControler.needsDeserialize() will be true). - */ - sstableMetadataCollector.updateMaxTimestamp(row.maxTimestamp()); - sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - currentPosition); - sstableMetadataCollector.addColumnCount(row.columnCount()); + + sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats()); afterAppend(row.key, currentPosition); return currentPosition; } @@ -184,13 +176,10 @@ public class SSTableWriter extends SSTable dataFile.stream.writeLong(header.serializedSize() + cf.serializedSizeForSSTable()); // write out row header and data - int columnCount = ColumnFamily.serializer().serializeWithIndexes(cf, header, dataFile.stream); + ColumnFamily.serializer().serializeWithIndexes(cf, header, dataFile.stream); afterAppend(decoratedKey, startPosition); - // track max column timestamp - sstableMetadataCollector.updateMaxTimestamp(cf.maxTimestamp()); - sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - startPosition); - sstableMetadataCollector.addColumnCount(columnCount); + sstableMetadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats()); } public void append(DecoratedKey decoratedKey, ByteBuffer value) throws IOException @@ -234,6 +223,7 @@ public class SSTableWriter extends SSTable // deserialize each column to obtain maxTimestamp and immediately serialize it. long maxTimestamp = Long.MIN_VALUE; + StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE); ColumnFamily cf = ColumnFamily.create(metadata, ArrayBackedSortedColumns.factory()); for (int i = 0; i < columnCount; i++) { @@ -256,6 +246,12 @@ public class SSTableWriter extends SSTable } } } + + int deletionTime = column.getLocalDeletionTime(); + if (deletionTime < Integer.MAX_VALUE) + { + tombstones.update(deletionTime); + } maxTimestamp = Math.max(maxTimestamp, column.maxTimestamp()); cf.getColumnSerializer().serialize(column, dataFile.stream); } @@ -265,15 +261,11 @@ public class SSTableWriter extends SSTable sstableMetadataCollector.updateMaxTimestamp(maxTimestamp); sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - currentPosition); sstableMetadataCollector.addColumnCount(columnCount); + sstableMetadataCollector.mergeTombstoneHistogram(tombstones); afterAppend(key, currentPosition); return currentPosition; } - public void updateMaxTimestamp(long timestamp) - { - sstableMetadataCollector.updateMaxTimestamp(timestamp); - } - /** * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java index a42ff85..64ef2f8 100644 --- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java @@ -132,8 +132,6 @@ public class IncomingStreamReader // We don't expire anything so the row shouldn't be empty assert !row.isEmpty(); writer.append(row); - // row append does not update the max timestamp on its own - writer.updateMaxTimestamp(row.maxTimestamp()); // update cache ColumnFamily cf = row.getFullColumnFamily(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/src/java/org/apache/cassandra/utils/StreamingHistogram.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/StreamingHistogram.java b/src/java/org/apache/cassandra/utils/StreamingHistogram.java new file mode 100644 index 0000000..a95c259 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/StreamingHistogram.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import org.apache.cassandra.db.DBConstants; +import org.apache.cassandra.io.ISerializer; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.*; + +/** + * Histogram that can be constructed from streaming of data. + * + * The algorithm is taken from following paper: + * Yael Ben-Haim and Elad Tom-Tov, "A Streaming Parallel Decision Tree Algorithm" (2010) + * http://jmlr.csail.mit.edu/papers/volume11/ben-haim10a/ben-haim10a.pdf + */ +public class StreamingHistogram +{ + public static final StreamingHistogramSerializer serializer = new StreamingHistogramSerializer(); + + // TreeMap to hold bins of histogram. + private final TreeMap<Double, Long> bin; + + // maximum bin size for this histogram + private final int maxBinSize; + + /** + * Creates a new histogram with max bin size of maxBinSize + * @param maxBinSize maximum number of bins this histogram can have + */ + public StreamingHistogram(int maxBinSize) + { + this.maxBinSize = maxBinSize; + bin = new TreeMap<Double, Long>(); + } + + private StreamingHistogram(int maxBinSize, Map<Double, Long> bin) + { + this.maxBinSize = maxBinSize; + this.bin = new TreeMap<Double, Long>(bin); + } + + /** + * Adds new point p to this histogram. + * @param p + */ + public void update(double p) + { + update(p, 1); + } + + /** + * Adds new point p with value m to this histogram. + * @param p + * @param m + */ + public void update(double p, long m) + { + Long mi = bin.get(p); + if (mi != null) + { + // we found the same p so increment that counter + bin.put(p, mi + m); + } + else + { + bin.put(p, m); + // if bin size exceeds maximum bin size then trim down to max size + while (bin.size() > maxBinSize) + { + // find points p1, p2 which have smallest difference + Iterator<Double> keys = bin.keySet().iterator(); + double p1 = keys.next(); + double p2 = keys.next(); + double smallestDiff = p2 - p1; + double q1 = p1, q2 = p2; + while (keys.hasNext()) + { + p1 = p2; + p2 = keys.next(); + double diff = p2 - p1; + if (diff < smallestDiff) + { + smallestDiff = diff; + q1 = p1; + q2 = p2; + } + } + // merge those two + long k1 = bin.remove(q1); + long k2 = bin.remove(q2); + bin.put((q1 * k1 + q2 * k2) / (k1 + k2), k1 + k2); + } + } + } + + /** + * Merges given histogram with this histogram. + * + * @param other histogram to merge + */ + public void merge(StreamingHistogram other) + { + if (other == null) + return; + + for (Map.Entry<Double, Long> entry : other.getAsMap().entrySet()) + update(entry.getKey(), entry.getValue()); + } + + /** + * Calculates estimated number of points in interval [-â,b]. + * + * @param b upper bound of a interval to calculate sum + * @return estimated number of points in a interval [-â,b]. + */ + public double sum(double b) + { + double sum = 0; + // find the points pi, pnext which satisfy pi <= b < pnext + Map.Entry<Double, Long> pnext = bin.higherEntry(b); + if (pnext == null) + { + // if b is greater than any key in this histogram, + // just count all appearance and return + for (Long value : bin.values()) + sum += value; + } + else + { + Map.Entry<Double, Long> pi = bin.floorEntry(b); + // calculate estimated count mb for point b + double weight = (b - pi.getKey()) / (pnext.getKey() - pi.getKey()); + double mb = pi.getValue() + (pnext.getValue() - pi.getValue()) * weight; + sum += (pi.getValue() + mb) * weight / 2; + + sum += pi.getValue() / 2; + for (Long value : bin.headMap(pi.getKey(), false).values()) + sum += value; + } + return sum; + } + + public Map<Double, Long> getAsMap() + { + return Collections.unmodifiableMap(bin); + } + + public static class StreamingHistogramSerializer implements ISerializer<StreamingHistogram> + { + public void serialize(StreamingHistogram histogram, DataOutput dos) throws IOException + { + dos.writeInt(histogram.maxBinSize); + Map<Double, Long> entries = histogram.getAsMap(); + dos.writeInt(entries.size()); + for (Map.Entry<Double, Long> entry : entries.entrySet()) + { + dos.writeDouble(entry.getKey()); + dos.writeLong(entry.getValue()); + } + } + + public StreamingHistogram deserialize(DataInput dis) throws IOException + { + int maxBinSize = dis.readInt(); + int size = dis.readInt(); + Map<Double, Long> tmp = new HashMap<Double, Long>(size); + for (int i = 0; i < size; i++) + { + tmp.put(dis.readDouble(), dis.readLong()); + } + + return new StreamingHistogram(maxBinSize, tmp); + } + + public long serializedSize(StreamingHistogram histogram) + { + throw new UnsupportedOperationException(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index 23a2657..a9c66bf 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -20,17 +20,15 @@ package org.apache.cassandra.db.compaction; import java.io.*; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Collection; -import java.util.Collections; -import java.util.Set; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.junit.Test; import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; import org.apache.cassandra.CleanupHelper; import org.apache.cassandra.Util; @@ -100,6 +98,60 @@ public class CompactionsTest extends CleanupHelper store.truncate(); } + /** + * Test to see if sstable has enough expired columns, it is compacted itself. + */ + @Test + public void testSingleSSTableCompaction() throws Exception + { + Table table = Table.open(TABLE1); + ColumnFamilyStore store = table.getColumnFamilyStore("Standard1"); + store.clearUnsafe(); + store.metadata.gcGraceSeconds(5); + + // update SizeTieredCompactionStrategy's min_sstable_size to something small + // to split bucket for ttl'd sstable from others + Map<String, String> opts = new HashMap<String, String>(); + opts.put(SizeTieredCompactionStrategy.MIN_SSTABLE_SIZE_KEY, "512"); + store.metadata.compactionStrategyOptions(opts); + store.setCompactionStrategyClass(SizeTieredCompactionStrategy.class.getCanonicalName()); + // disable compaction while flushing + store.disableAutoCompaction(); + + long timestamp = System.currentTimeMillis(); + for (int i = 0; i < 10; i++) + { + DecoratedKey key = Util.dk(Integer.toString(i)); + RowMutation rm = new RowMutation(TABLE1, key.key); + for (int j = 0; j < 10; j++) + rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes(Integer.toString(j))), + ByteBufferUtil.EMPTY_BYTE_BUFFER, + timestamp, + j > 0 ? 3 : 0); // let first column never expire, since deleting all columns does not produce sstable + rm.apply(); + } + store.forceBlockingFlush(); + assertEquals(1, store.getSSTables().size()); + long originalSize = store.getSSTables().iterator().next().uncompressedLength(); + + // wait enough to force single compaction + TimeUnit.SECONDS.sleep(5); + + // enable compaction, submit background and wait for it to complete + store.setMinimumCompactionThreshold(2); + store.setMaximumCompactionThreshold(4); + FBUtilities.waitOnFuture(CompactionManager.instance.submitBackground(store)); + while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0) + TimeUnit.SECONDS.sleep(1); + + // and sstable with ttl should be compacted + assertEquals(1, store.getSSTables().size()); + long size = store.getSSTables().iterator().next().uncompressedLength(); + assertTrue("should be less than " + originalSize + ", but was " + size, size < originalSize); + + // make sure max timestamp of compacted sstables is recorded properly after compaction. + assertMaxTimestamp(store, timestamp); + } @Test public void testSuperColumnCompactions() throws IOException, ExecutionException, InterruptedException http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b2ea264/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java new file mode 100644 index 0000000..0da6849 --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import com.google.common.io.ByteArrayDataOutput; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.util.*; + +import static junit.framework.Assert.assertEquals; + +public class StreamingHistogramTest +{ + @Test + public void testFunction() throws Exception + { + StreamingHistogram hist = new StreamingHistogram(5); + long[] samples = new long[]{23, 19, 10, 16, 36, 2, 9, 32, 30, 45}; + + // add 7 points to histogram of 5 bins + for (int i = 0; i < 7; i++) + { + hist.update(samples[i]); + } + + // should end up (2,1),(9.5,2),(17.5,2),(23,1),(36,1) + Map<Double, Long> expected1 = new LinkedHashMap<Double, Long>(5); + expected1.put(2.0, 1L); + expected1.put(9.5, 2L); + expected1.put(17.5, 2L); + expected1.put(23.0, 1L); + expected1.put(36.0, 1L); + + Iterator<Map.Entry<Double, Long>> expectedItr = expected1.entrySet().iterator(); + for (Map.Entry<Double, Long> actual : hist.getAsMap().entrySet()) + { + Map.Entry<Double, Long> entry = expectedItr.next(); + assertEquals(entry.getKey(), actual.getKey(), 0.01); + assertEquals(entry.getValue(), actual.getValue()); + } + + // merge test + StreamingHistogram hist2 = new StreamingHistogram(3); + for (int i = 7; i < samples.length; i++) + { + hist2.update(samples[i]); + } + hist.merge(hist2); + // should end up (2,1),(9.5,2),(19.33,3),(32.67,3),(45,1) + Map<Double, Long> expected2 = new LinkedHashMap<Double, Long>(5); + expected2.put(2.0, 1L); + expected2.put(9.5, 2L); + expected2.put(19.33, 3L); + expected2.put(32.67, 3L); + expected2.put(45.0, 1L); + expectedItr = expected2.entrySet().iterator(); + for (Map.Entry<Double, Long> actual : hist.getAsMap().entrySet()) + { + Map.Entry<Double, Long> entry = expectedItr.next(); + assertEquals(entry.getKey(), actual.getKey(), 0.01); + assertEquals(entry.getValue(), actual.getValue()); + } + + // sum test + assertEquals(3.28, hist.sum(15), 0.01); + // sum test (b > max(hist)) + assertEquals(10.0, hist.sum(50), 0.01); + } + + @Test + public void testSerDe() throws Exception + { + StreamingHistogram hist = new StreamingHistogram(5); + long[] samples = new long[]{23, 19, 10, 16, 36, 2, 9}; + + // add 7 points to histogram of 5 bins + for (int i = 0; i < samples.length; i++) + { + hist.update(samples[i]); + } + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + StreamingHistogram.serializer.serialize(hist, new DataOutputStream(out)); + byte[] bytes = out.toByteArray(); + + StreamingHistogram deserialized = StreamingHistogram.serializer.deserialize(new DataInputStream(new ByteArrayInputStream(bytes))); + + // deserialized histogram should have following values + Map<Double, Long> expected1 = new LinkedHashMap<Double, Long>(5); + expected1.put(2.0, 1L); + expected1.put(9.5, 2L); + expected1.put(17.5, 2L); + expected1.put(23.0, 1L); + expected1.put(36.0, 1L); + + Iterator<Map.Entry<Double, Long>> expectedItr = expected1.entrySet().iterator(); + for (Map.Entry<Double, Long> actual : deserialized.getAsMap().entrySet()) + { + Map.Entry<Double, Long> entry = expectedItr.next(); + assertEquals(entry.getKey(), actual.getKey(), 0.01); + assertEquals(entry.getValue(), actual.getValue()); + } + } +}