Don't promote sstables for cleanup, scrub and updateSSTables patch by slebresne; reviewed by jbellis for CASSANDRA-3989
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/048c8a98 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/048c8a98 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/048c8a98 Branch: refs/heads/cassandra-1.1 Commit: 048c8a98d83b41d463b557ede8a0bf98d3def022 Parents: c7895e9 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Wed Mar 7 18:35:15 2012 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Wed Mar 7 18:35:15 2012 +0100 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 11 +++-- src/java/org/apache/cassandra/db/DataTracker.java | 15 +++--- .../cassandra/db/compaction/CompactionManager.java | 8 ++- .../cassandra/db/compaction/CompactionTask.java | 16 +++++-- .../db/compaction/LeveledCompactionStrategy.java | 13 +++++- .../cassandra/db/compaction/LeveledManifest.java | 35 +++++++++++--- .../cassandra/db/compaction/OperationType.java | 1 + .../SSTableListChangedNotification.java | 11 +++- .../cassandra/db/compaction/CompactionsTest.java | 3 +- 10 files changed, 83 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 91c45dc..c453943 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,8 @@ * Pig: Composite column support (CASSANDRA-384) * Avoid NPE during repair when a keyspace has no CFs (CASSANDRA-3988) * Fix division-by-zero error on get_slice (CASSANDRA-4000) + * don't change manifest level for cleanup, scrub, and upgradesstables + operations under LeveledCompactionStrategy (CASSANDRA-3989) 1.0.8 http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 6f8392d..b50662a 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -46,6 +46,7 @@ import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.LeveledCompactionStrategy; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.IFilter; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; @@ -995,15 +996,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this); } - public void markCompacted(Collection<SSTableReader> sstables) + public void markCompacted(Collection<SSTableReader> sstables, OperationType compactionType) { assert !sstables.isEmpty(); - data.markCompacted(sstables); + data.markCompacted(sstables, compactionType); } - public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements) + public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements, OperationType compactionType) { - data.replaceCompactedSSTables(sstables, replacements); + data.replaceCompactedSSTables(sstables, replacements, compactionType); } void replaceFlushed(Memtable memtable, SSTableReader sstable) @@ -1957,6 +1958,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } if (!truncatedSSTables.isEmpty()) - markCompacted(truncatedSSTables); + markCompacted(truncatedSSTables, OperationType.UNKNOWN); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index aecc083..c4b8a7b 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableReader; @@ -222,16 +223,16 @@ public class DataTracker while (!view.compareAndSet(currentView, newView)); } - public void markCompacted(Collection<SSTableReader> sstables) + public void markCompacted(Collection<SSTableReader> sstables, OperationType compactionType) { replace(sstables, Collections.<SSTableReader>emptyList()); - notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList()); + notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList(), compactionType); } - public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements) + public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements, OperationType compactionType) { replace(sstables, replacements); - notifySSTablesChanged(sstables, replacements); + notifySSTablesChanged(sstables, replacements, compactionType); } public void addInitialSSTables(Collection<SSTableReader> sstables) @@ -260,7 +261,7 @@ public class DataTracker } replace(sstables, Collections.<SSTableReader>emptyList()); - notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList()); + notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList(), OperationType.UNKNOWN); } /** (Re)initializes the tracker, purging all references. */ @@ -486,11 +487,11 @@ public class DataTracker return (double) falseCount / (trueCount + falseCount); } - public void notifySSTablesChanged(Iterable<SSTableReader> removed, Iterable<SSTableReader> added) + public void notifySSTablesChanged(Iterable<SSTableReader> removed, Iterable<SSTableReader> added, OperationType compactionType) { for (INotificationConsumer subscriber : subscribers) { - INotification notification = new SSTableListChangedNotification(added, removed); + INotification notification = new SSTableListChangedNotification(added, removed, compactionType); subscriber.handleNotification(notification, this); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 97e5067..55fab3c 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -225,8 +225,10 @@ public class CompactionManager implements CompactionManagerMBean for (final SSTableReader sstable : sstables) { // SSTables are marked by the caller + // NOTE: it is important that the task create one and only one sstable, even for Leveled compaction (see LeveledManifest.replace()) CompactionTask task = new CompactionTask(cfs, Collections.singletonList(sstable), Integer.MAX_VALUE); task.isUserDefined(true); + task.setCompactionType(OperationType.UPGRADE_SSTABLES); task.execute(executor); } } @@ -646,7 +648,7 @@ public class CompactionManager implements CompactionManagerMBean if (newSstable == null) { - cfs.markCompacted(Arrays.asList(sstable)); + cfs.markCompacted(Arrays.asList(sstable), OperationType.SCRUB); if (badRows > 0) logger.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot"); else @@ -654,7 +656,7 @@ public class CompactionManager implements CompactionManagerMBean } else { - cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable)); + cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable), OperationType.SCRUB); logger.info("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped"); if (badRows > 0) logger.warn("Unable to recover " + badRows + " rows that were skipped. You can attempt manual recovery from the pre-scrub snapshot. You can also run nodetool repair to transfer the data from a healthy replica, if any"); @@ -797,7 +799,7 @@ public class CompactionManager implements CompactionManagerMBean // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd cfs.indexManager.flushIndexesBlocking(); - cfs.replaceCompactedSSTables(Arrays.asList(sstable), results); + cfs.replaceCompactedSSTables(Arrays.asList(sstable), results, OperationType.CLEANUP); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index ece5f19..5847bf7 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -44,6 +44,7 @@ public class CompactionTask extends AbstractCompactionTask protected String compactionFileLocation; protected final int gcBefore; protected boolean isUserDefined; + protected OperationType compactionType; protected static long totalBytesCompacted = 0; public CompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, final int gcBefore) @@ -52,6 +53,7 @@ public class CompactionTask extends AbstractCompactionTask compactionFileLocation = null; this.gcBefore = gcBefore; this.isUserDefined = false; + this.compactionType = OperationType.COMPACTION; } public static synchronized long addToTotalBytesCompacted(long bytesCompacted) @@ -122,8 +124,8 @@ public class CompactionTask extends AbstractCompactionTask logger.debug("Expected bloom filter size : " + keysPerSSTable); AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction() - ? new ParallelCompactionIterable(OperationType.COMPACTION, toCompact, controller) - : new CompactionIterable(OperationType.COMPACTION, toCompact, controller); + ? new ParallelCompactionIterable(compactionType, toCompact, controller) + : new CompactionIterable(compactionType, toCompact, controller); CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull()); Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>(); @@ -144,7 +146,7 @@ public class CompactionTask extends AbstractCompactionTask // don't mark compacted in the finally block, since if there _is_ nondeleted data, // we need to sync it (via closeAndOpen) first, so there is no period during which // a crash could cause data loss. - cfs.markCompacted(toCompact); + cfs.markCompacted(toCompact, compactionType); return 0; } @@ -197,7 +199,7 @@ public class CompactionTask extends AbstractCompactionTask collector.finishCompaction(ci); } - cfs.replaceCompactedSSTables(toCompact, sstables); + cfs.replaceCompactedSSTables(toCompact, sstables, compactionType); // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet()) { @@ -270,4 +272,10 @@ public class CompactionTask extends AbstractCompactionTask this.isUserDefined = isUserDefined; return this; } + + public CompactionTask setCompactionType(OperationType compactionType) + { + this.compactionType = compactionType; + return this; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index d4402ee..de0ff75 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -138,7 +138,18 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem else if (notification instanceof SSTableListChangedNotification) { SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification; - manifest.promote(listChangedNotification.removed, listChangedNotification.added); + switch (listChangedNotification.compactionType) + { + // Cleanup, scrub and updateSSTable shouldn't promote (see #3989) + case CLEANUP: + case SCRUB: + case UPGRADE_SSTABLES: + manifest.replace(listChangedNotification.removed, listChangedNotification.added); + break; + default: + manifest.promote(listChangedNotification.removed, listChangedNotification.added); + break; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 189de8e..6dc6ce9 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -61,7 +61,6 @@ public class LeveledManifest private final List<SSTableReader>[] generations; private final DecoratedKey[] lastCompactedKeys; private final int maxSSTableSizeInMB; - private int levelCount; private LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB) { @@ -165,10 +164,9 @@ public class LeveledManifest int maximumLevel = 0; for (SSTableReader sstable : removed) { - int thisLevel = levelOf(sstable); + int thisLevel = remove(sstable); maximumLevel = Math.max(maximumLevel, thisLevel); minimumLevel = Math.min(minimumLevel, thisLevel); - remove(sstable); } // it's valid to do a remove w/o an add (e.g. on truncate) @@ -188,6 +186,22 @@ public class LeveledManifest serialize(); } + public synchronized void replace(Iterable<SSTableReader> removed, Iterable<SSTableReader> added) + { + // replace is for compaction operation that don't really change the + // content of a sstable (cleanup, scrub) and much replace one sstable by another + assert Iterables.size(removed) == 1; + assert Iterables.size(added) == 1; + SSTableReader toRemove = removed.iterator().next(); + SSTableReader toAdd = added.iterator().next(); + logDistribution(); + if (logger.isDebugEnabled()) + logger.debug("Replacing " + removed + " by " + toAdd); + + add(toAdd, remove(toRemove)); + serialize(); + } + private String toString(Iterable<SSTableReader> sstables) { StringBuilder builder = new StringBuilder(); @@ -266,12 +280,15 @@ public class LeveledManifest private void logDistribution() { - for (int i = 0; i < generations.length; i++) + if (logger.isDebugEnabled()) { - if (!generations[i].isEmpty()) + for (int i = 0; i < generations.length; i++) { - logger.debug("L{} contains {} SSTables ({} bytes) in {}", - new Object[] {i, generations[i].size(), SSTableReader.getTotalBytes(generations[i]), this}); + if (!generations[i].isEmpty()) + { + logger.debug("L{} contains {} SSTables ({} bytes) in {}", + new Object[] {i, generations[i].size(), SSTableReader.getTotalBytes(generations[i]), this}); + } } } } @@ -286,15 +303,17 @@ public class LeveledManifest return -1; } - private void remove(SSTableReader reader) + private int remove(SSTableReader reader) { int level = levelOf(reader); assert level >= 0 : reader + " not present in manifest"; generations[level].remove(reader); + return level; } private void add(SSTableReader sstable, int level) { + assert level < generations.length : "Invalid level " + level + " out of " + (generations.length - 1); generations[level].add(sstable); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/src/java/org/apache/cassandra/db/compaction/OperationType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java index ecdc71c..b4b5498 100644 --- a/src/java/org/apache/cassandra/db/compaction/OperationType.java +++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java @@ -29,6 +29,7 @@ public enum OperationType ROW_CACHE_SAVE("Row cache save"), CLEANUP("Cleanup"), SCRUB("Scrub"), + UPGRADE_SSTABLES("Upgrade sstables"), INDEX_BUILD("Secondary index build"), UNKNOWN("Unkown compaction type"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java b/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java index 480dcf2..9f31b4e 100644 --- a/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java +++ b/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java @@ -25,13 +25,18 @@ import org.apache.cassandra.io.sstable.SSTableReader; import java.util.List; +import org.apache.cassandra.db.compaction.OperationType; + public class SSTableListChangedNotification implements INotification { - public Iterable<SSTableReader> removed; - public Iterable<SSTableReader> added; - public SSTableListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed) + public final Iterable<SSTableReader> removed; + public final Iterable<SSTableReader> added; + public final OperationType compactionType; + + public SSTableListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed, OperationType compactionType) { this.removed = removed; this.added = added; + this.compactionType = compactionType; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index 89fee28..1179919 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -35,6 +35,7 @@ import org.apache.cassandra.CleanupHelper; import org.apache.cassandra.Util; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.io.sstable.*; @@ -209,7 +210,7 @@ public class CompactionsTest extends CleanupHelper assertEquals(2, store.getSSTables().size()); // Now, we remove the sstable that was just created to force the use of EchoedRow (so that it doesn't hide the problem) - store.markCompacted(Collections.singleton(tmpSSTable)); + store.markCompacted(Collections.singleton(tmpSSTable), OperationType.UNKNOWN); assertEquals(1, store.getSSTables().size()); // Now assert we do have the 4 keys