Track tombstone for LCS; patch by yukim reviewed by jbellis for CASSANDRA-4234
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0091af93 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0091af93 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0091af93 Branch: refs/heads/cassandra-1.1 Commit: 0091af932c6ef65a0a5917f123fe24398b79c079 Parents: 13f8eee Author: Yuki Morishita <yu...@apache.org> Authored: Fri Jul 13 13:17:51 2012 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Fri Jul 13 13:17:51 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../db/compaction/AbstractCompactionStrategy.java | 44 ++++++++++++ .../db/compaction/LeveledCompactionStrategy.java | 45 ++++++++++++- .../cassandra/db/compaction/LeveledManifest.java | 6 ++ .../cassandra/db/compaction/OperationType.java | 2 + .../compaction/SizeTieredCompactionStrategy.java | 35 +--------- .../cassandra/db/compaction/CompactionsTest.java | 53 ++++++++++++++- 7 files changed, 148 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 03e3fba..fb66fd6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -12,7 +12,7 @@ * add inter-node message compression (CASSANDRA-3127) * remove COPP (CASSANDRA-2479) * Track tombstone expiration and compact when tombstone content is - higher than a configurable threshold, default 20% (CASSANDRA-3442) + higher than a configurable threshold, default 20% (CASSANDRA-3442, 4234) * update MurmurHash to version 3 (CASSANDRA-2975) * (CLI) track elapsed time for `delete' operation (CASSANDRA-4060) * (CLI) jline version is bumped to 1.0 to properly support http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 41128b0..bf6c87f 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -38,15 +38,23 @@ import org.apache.cassandra.service.StorageService; */ public abstract class AbstractCompactionStrategy { + protected static final float DEFAULT_TOMBSTONE_THRESHOLD = 0.2f; + protected static final String TOMBSTONE_THRESHOLD_KEY = "tombstone_threshold"; + protected final ColumnFamilyStore cfs; protected final Map<String, String> options; + protected float tombstoneThreshold; + protected AbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options) { assert cfs != null; this.cfs = cfs; this.options = options; + String optionValue = options.get(TOMBSTONE_THRESHOLD_KEY); + tombstoneThreshold = (null != optionValue) ? Float.parseFloat(optionValue) : DEFAULT_TOMBSTONE_THRESHOLD; + // start compactions in five minutes (if no flushes have occurred by then to do so) Runnable runnable = new Runnable() { @@ -146,4 +154,40 @@ public abstract class AbstractCompactionStrategy { return getScanners(toCompact, null); } + + /** + * @param sstable SSTable to check + * @param gcBefore time to drop tombstones + * @return true if given sstable's tombstones are expected to be removed + */ + protected boolean worthDroppingTombstones(SSTableReader sstable, int gcBefore) + { + double droppableRatio = sstable.getEstimatedDroppableTombstoneRatio(gcBefore); + if (droppableRatio <= tombstoneThreshold) + return false; + + Set<SSTableReader> overlaps = cfs.getOverlappingSSTables(Collections.singleton(sstable)); + if (overlaps.isEmpty()) + { + // there is no overlap, tombstones are safely droppable + return true; + } + else + { + // what percentage of columns do we expect to compact outside of overlap? + // first, calculate estimated keys that do not overlap + long keys = sstable.estimatedKeys(); + Set<Range<Token>> ranges = new HashSet<Range<Token>>(); + for (SSTableReader overlap : overlaps) + ranges.add(new Range<Token>(overlap.first.token, overlap.last.token, overlap.partitioner)); + long remainingKeys = keys - sstable.estimatedKeysForRanges(ranges); + // next, calculate what percentage of columns we have within those keys + double remainingKeysRatio = ((double) remainingKeys) / keys; + long columns = sstable.getEstimatedColumnCount().percentile(remainingKeysRatio) * remainingKeys; + double remainingColumnsRatio = ((double) columns) / (sstable.getEstimatedColumnCount().count() * sstable.getEstimatedColumnCount().mean()); + + // return if we still expect to have droppable tombstones in rest of columns + return remainingColumnsRatio * droppableRatio > tombstoneThreshold; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 3e124de..9ac4fce 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -23,11 +23,11 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.common.base.Joiner; import com.google.common.collect.*; +import com.google.common.primitives.Doubles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -110,13 +110,22 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem } Collection<SSTableReader> sstables = manifest.getCompactionCandidates(); + OperationType op = OperationType.COMPACTION; if (sstables.isEmpty()) { - logger.debug("No compaction necessary for {}", this); - return null; + // if there is no sstable to compact in standard way, try compacting based on droppable tombstone ratio + SSTableReader sstable = findDroppableSSTable(gcBefore); + if (sstable == null) + { + logger.debug("No compaction necessary for {}", this); + return null; + } + sstables = Collections.singleton(sstable); + op = OperationType.TOMBSTONE_COMPACTION; } LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, sstables, gcBefore, this.maxSSTableSizeInMB); + newTask.setCompactionType(op); return task.compareAndSet(currentTask, newTask) ? newTask : null; @@ -148,6 +157,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem case CLEANUP: case SCRUB: case UPGRADE_SSTABLES: + case TOMBSTONE_COMPACTION: // Also when performing tombstone removal. manifest.replace(listChangedNotification.removed, listChangedNotification.added); break; default: @@ -280,4 +290,33 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem { return String.format("LCS@%d(%s)", hashCode(), cfs.columnFamily); } + + private SSTableReader findDroppableSSTable(final int gcBefore) + { + level: + for (int i = manifest.getLevelCount(); i >= 0; i--) + { + // sort sstables by droppable ratio in descending order + SortedSet<SSTableReader> sstables = manifest.getLevelSorted(i, new Comparator<SSTableReader>() + { + public int compare(SSTableReader o1, SSTableReader o2) + { + double r1 = o1.getEstimatedDroppableTombstoneRatio(gcBefore); + double r2 = o2.getEstimatedDroppableTombstoneRatio(gcBefore); + return -1 * Doubles.compare(r1, r2); + } + }); + if (sstables.isEmpty()) + continue; + + for (SSTableReader sstable : sstables) + { + if (sstable.getEstimatedDroppableTombstoneRatio(gcBefore) <= tombstoneThreshold) + continue level; + else if (!sstable.isMarkedSuspect() && worthDroppingTombstones(sstable, gcBefore)) + return sstable; + } + } + return null; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 567c919..53efb80 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -22,6 +22,7 @@ import java.io.IOError; import java.io.IOException; import java.util.*; +import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; @@ -568,6 +569,11 @@ public class LeveledManifest return 0; } + public synchronized SortedSet<SSTableReader> getLevelSorted(int level, Comparator<SSTableReader> comparator) + { + return ImmutableSortedSet.copyOf(comparator, generations[level]); + } + public List<SSTableReader> getLevel(int i) { return generations[i]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/src/java/org/apache/cassandra/db/compaction/OperationType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java index 79f6c5e..e46e92d 100644 --- a/src/java/org/apache/cassandra/db/compaction/OperationType.java +++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java @@ -27,6 +27,8 @@ public enum OperationType SCRUB("Scrub"), UPGRADE_SSTABLES("Upgrade sstables"), INDEX_BUILD("Secondary index build"), + /** Compaction for tombstone removal */ + TOMBSTONE_COMPACTION("Tombstone Compaction"), UNKNOWN("Unkown compaction type"); private final String type; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 67d2e77..d1ac516 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -24,8 +24,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.Pair; @@ -34,12 +32,9 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy { private static final Logger logger = LoggerFactory.getLogger(SizeTieredCompactionStrategy.class); protected static final long DEFAULT_MIN_SSTABLE_SIZE = 50L * 1024L * 1024L; - protected static final float DEFAULT_TOMBSTONE_THRESHOLD = 0.2f; protected static final String MIN_SSTABLE_SIZE_KEY = "min_sstable_size"; - protected static final String TOMBSTONE_THRESHOLD_KEY = "tombstone_threshold"; protected long minSSTableSize; protected volatile int estimatedRemainingTasks; - protected float tombstoneThreshold; public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options) { @@ -49,8 +44,6 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy minSSTableSize = (null != optionValue) ? Long.parseLong(optionValue) : DEFAULT_MIN_SSTABLE_SIZE; cfs.setMaximumCompactionThreshold(cfs.metadata.getMaxCompactionThreshold()); cfs.setMinimumCompactionThreshold(cfs.metadata.getMinCompactionThreshold()); - optionValue = options.get(TOMBSTONE_THRESHOLD_KEY); - tombstoneThreshold = (null != optionValue) ? Float.parseFloat(optionValue) : DEFAULT_TOMBSTONE_THRESHOLD; } public AbstractCompactionTask getNextBackgroundTask(final int gcBefore) @@ -90,34 +83,8 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy { for (SSTableReader table : bucket) { - double droppableRatio = table.getEstimatedDroppableTombstoneRatio(gcBefore); - if (droppableRatio <= tombstoneThreshold) - continue; - - Set<SSTableReader> overlaps = cfs.getOverlappingSSTables(Collections.singleton(table)); - if (overlaps.isEmpty()) - { - // there is no overlap, tombstones are safely droppable + if (worthDroppingTombstones(table, gcBefore)) prunedBuckets.add(Collections.singletonList(table)); - } - else - { - // what percentage of columns do we expect to compact outside of overlap? - // first, calculate estimated keys that do not overlap - long keys = table.estimatedKeys(); - Set<Range<Token>> ranges = new HashSet<Range<Token>>(); - for (SSTableReader overlap : overlaps) - ranges.add(new Range<Token>(overlap.first.token, overlap.last.token, overlap.partitioner)); - long remainingKeys = keys - table.estimatedKeysForRanges(ranges); - // next, calculate what percentage of columns we have within those keys - double remainingKeysRatio = ((double) remainingKeys) / keys; - long columns = table.getEstimatedColumnCount().percentile(remainingKeysRatio) * remainingKeys; - double remainingColumnsRatio = ((double) columns) / (table.getEstimatedColumnCount().count() * table.getEstimatedColumnCount().mean()); - - // if we still expect to have droppable tombstones in rest of columns, then try compacting it - if (remainingColumnsRatio * droppableRatio > tombstoneThreshold) - prunedBuckets.add(Collections.singletonList(table)); - } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index b1ae773..2057822 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -107,7 +107,7 @@ public class CompactionsTest extends SchemaLoader * Test to see if sstable has enough expired columns, it is compacted itself. */ @Test - public void testSingleSSTableCompaction() throws Exception + public void testSingleSSTableCompactionWithSizeTieredCompaction() throws Exception { Table table = Table.open(TABLE1); ColumnFamilyStore store = table.getColumnFamilyStore("Standard1"); @@ -154,6 +154,57 @@ public class CompactionsTest extends SchemaLoader } @Test + public void testSingleSSTableCompactionWithLeveledCompaction() throws Exception + { + Table table = Table.open(TABLE1); + ColumnFamilyStore store = table.getColumnFamilyStore("Standard1"); + store.clearUnsafe(); + store.metadata.gcGraceSeconds(1); + store.setCompactionStrategyClass(LeveledCompactionStrategy.class.getCanonicalName()); + + LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) store.getCompactionStrategy(); + + // disable compaction while flushing + store.disableAutoCompaction(); + + long timestamp = System.currentTimeMillis(); + for (int i = 0; i < 10; i++) + { + DecoratedKey key = Util.dk(Integer.toString(i)); + RowMutation rm = new RowMutation(TABLE1, key.key); + for (int j = 0; j < 10; j++) + rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes(Integer.toString(j))), + ByteBufferUtil.EMPTY_BYTE_BUFFER, + timestamp, + j > 0 ? 3 : 0); // let first column never expire, since deleting all columns does not produce sstable + rm.apply(); + } + store.forceBlockingFlush(); + assertEquals(1, store.getSSTables().size()); + long originalSize = store.getSSTables().iterator().next().uncompressedLength(); + + // wait enough to force single compaction + TimeUnit.SECONDS.sleep(5); + + store.setMinimumCompactionThreshold(2); + store.setMaximumCompactionThreshold(4); + FBUtilities.waitOnFuture(CompactionManager.instance.submitBackground(store)); + while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0) + TimeUnit.SECONDS.sleep(1); + + // and sstable with ttl should be compacted + assertEquals(1, store.getSSTables().size()); + long size = store.getSSTables().iterator().next().uncompressedLength(); + assertTrue("should be less than " + originalSize + ", but was " + size, size < originalSize); + + // make sure max timestamp of compacted sstables is recorded properly after compaction. + assertMaxTimestamp(store, timestamp); + + // tombstone removal compaction should not promote level + assert strategy.getLevelSize(0) == 1; + } + + @Test public void testSuperColumnCompactions() throws IOException, ExecutionException, InterruptedException { Table table = Table.open(TABLE1);