Don't promote sstables for cleanup, scrub and updateSSTables patch by slebresne; reviewed by jbellis for CASSANDRA-3989
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65059cf4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65059cf4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65059cf4 Branch: refs/heads/trunk Commit: 65059cf48e0794e0459b1882961616f55382c756 Parents: 09f091a Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Wed Mar 7 18:35:15 2012 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Thu Mar 8 09:46:09 2012 +0100 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 11 +++-- src/java/org/apache/cassandra/db/DataTracker.java | 15 +++--- .../db/compaction/AbstractCompactionTask.java | 8 +++ .../cassandra/db/compaction/CompactionManager.java | 8 ++- .../cassandra/db/compaction/CompactionTask.java | 8 ++-- .../db/compaction/LeveledCompactionStrategy.java | 13 +++++- .../cassandra/db/compaction/LeveledManifest.java | 35 +++++++++++--- .../cassandra/db/compaction/OperationType.java | 1 + .../SSTableListChangedNotification.java | 7 +++- .../cassandra/db/compaction/CompactionsTest.java | 3 +- 11 files changed, 81 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 16f8f0c..bf8d9fc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -45,6 +45,8 @@ Merged from 1.0: * Pig: Composite column support (CASSANDRA-384) * Avoid NPE during repair when a keyspace has no CFs (CASSANDRA-3988) * Fix division-by-zero error on get_slice (CASSANDRA-4000) + * don't change manifest level for cleanup, scrub, and upgradesstables + operations under LeveledCompactionStrategy (CASSANDRA-3989) 1.1-beta1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 2872d4f..48cdb8c 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -51,6 +51,7 @@ import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.LeveledCompactionStrategy; import org.apache.cassandra.db.filter.ExtendedFilter; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.IFilter; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; @@ -937,15 +938,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this); } - public void markCompacted(Collection<SSTableReader> sstables) + public void markCompacted(Collection<SSTableReader> sstables, OperationType compactionType) { assert !sstables.isEmpty(); - data.markCompacted(sstables); + data.markCompacted(sstables, compactionType); } - public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements) + public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements, OperationType compactionType) { - data.replaceCompactedSSTables(sstables, replacements); + data.replaceCompactedSSTables(sstables, replacements, compactionType); } void replaceFlushed(Memtable memtable, SSTableReader sstable) @@ -1966,6 +1967,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } if (!truncatedSSTables.isEmpty()) - markCompacted(truncatedSSTables); + markCompacted(truncatedSSTables, OperationType.UNKNOWN); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index d346eab..13c3369 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.notifications.INotification; import org.apache.cassandra.notifications.INotificationConsumer; @@ -237,16 +238,16 @@ public class DataTracker while (!view.compareAndSet(currentView, newView)); } - public void markCompacted(Collection<SSTableReader> sstables) + public void markCompacted(Collection<SSTableReader> sstables, OperationType compactionType) { replace(sstables, Collections.<SSTableReader>emptyList()); - notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList()); + notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList(), compactionType); } - public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements) + public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements, OperationType compactionType) { replace(sstables, replacements); - notifySSTablesChanged(sstables, replacements); + notifySSTablesChanged(sstables, replacements, compactionType); } public void addInitialSSTables(Collection<SSTableReader> sstables) @@ -286,7 +287,7 @@ public class DataTracker // notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion" return; } - notifySSTablesChanged(notCompacting, Collections.<SSTableReader>emptySet()); + notifySSTablesChanged(notCompacting, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN); postReplace(notCompacting, Collections.<SSTableReader>emptySet()); } @@ -518,11 +519,11 @@ public class DataTracker return (double) falseCount / (trueCount + falseCount); } - public void notifySSTablesChanged(Iterable<SSTableReader> removed, Iterable<SSTableReader> added) + public void notifySSTablesChanged(Iterable<SSTableReader> removed, Iterable<SSTableReader> added, OperationType compactionType) { for (INotificationConsumer subscriber : subscribers) { - INotification notification = new SSTableListChangedNotification(added, removed); + INotification notification = new SSTableListChangedNotification(added, removed, compactionType); subscriber.handleNotification(notification, this); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java index e031e07..9826941 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java @@ -30,12 +30,14 @@ public abstract class AbstractCompactionTask protected final ColumnFamilyStore cfs; protected Collection<SSTableReader> sstables; protected boolean isUserDefined; + protected OperationType compactionType; public AbstractCompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables) { this.cfs = cfs; this.sstables = sstables; this.isUserDefined = false; + this.compactionType = OperationType.COMPACTION; } public abstract int execute(CompactionExecutorStatsCollector collector) throws IOException; @@ -93,4 +95,10 @@ public abstract class AbstractCompactionTask this.isUserDefined = isUserDefined; return this; } + + public AbstractCompactionTask setCompactionType(OperationType compactionType) + { + this.compactionType = compactionType; + return this; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 7a36625..590c3d7 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -211,8 +211,10 @@ public class CompactionManager implements CompactionManagerMBean for (final SSTableReader sstable : sstables) { // SSTables are marked by the caller + // NOTE: it is important that the task create one and only one sstable, even for Leveled compaction (see LeveledManifest.replace()) CompactionTask task = new CompactionTask(cfs, Collections.singletonList(sstable), Integer.MAX_VALUE); task.isUserDefined(true); + task.setCompactionType(OperationType.UPGRADE_SSTABLES); task.execute(executor); } } @@ -624,7 +626,7 @@ public class CompactionManager implements CompactionManagerMBean if (newSstable == null) { - cfs.markCompacted(Arrays.asList(sstable)); + cfs.markCompacted(Arrays.asList(sstable), OperationType.SCRUB); if (badRows > 0) logger.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot"); else @@ -632,7 +634,7 @@ public class CompactionManager implements CompactionManagerMBean } else { - cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable)); + cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable), OperationType.SCRUB); logger.info("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped"); if (badRows > 0) logger.warn("Unable to recover " + badRows + " rows that were skipped. You can attempt manual recovery from the pre-scrub snapshot. You can also run nodetool repair to transfer the data from a healthy replica, if any"); @@ -777,7 +779,7 @@ public class CompactionManager implements CompactionManagerMBean // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd cfs.indexManager.flushIndexesBlocking(); - cfs.replaceCompactedSSTables(Arrays.asList(sstable), results); + cfs.replaceCompactedSSTables(Arrays.asList(sstable), results, OperationType.CLEANUP); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 4d77af0..6961490 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -121,8 +121,8 @@ public class CompactionTask extends AbstractCompactionTask logger.debug("Expected bloom filter size : " + keysPerSSTable); AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction() - ? new ParallelCompactionIterable(OperationType.COMPACTION, toCompact, controller) - : new CompactionIterable(OperationType.COMPACTION, toCompact, controller); + ? new ParallelCompactionIterable(compactionType, toCompact, controller) + : new CompactionIterable(compactionType, toCompact, controller); CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull()); Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>(); @@ -143,7 +143,7 @@ public class CompactionTask extends AbstractCompactionTask // don't mark compacted in the finally block, since if there _is_ nondeleted data, // we need to sync it (via closeAndOpen) first, so there is no period during which // a crash could cause data loss. - cfs.markCompacted(toCompact); + cfs.markCompacted(toCompact, compactionType); return 0; } @@ -199,7 +199,7 @@ public class CompactionTask extends AbstractCompactionTask collector.finishCompaction(ci); } - cfs.replaceCompactedSSTables(toCompact, sstables); + cfs.replaceCompactedSSTables(toCompact, sstables, compactionType); // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 3bbbeb6..0d5bdc7 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -132,7 +132,18 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem else if (notification instanceof SSTableListChangedNotification) { SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification; - manifest.promote(listChangedNotification.removed, listChangedNotification.added); + switch (listChangedNotification.compactionType) + { + // Cleanup, scrub and updateSSTable shouldn't promote (see #3989) + case CLEANUP: + case SCRUB: + case UPGRADE_SSTABLES: + manifest.replace(listChangedNotification.removed, listChangedNotification.added); + break; + default: + manifest.promote(listChangedNotification.removed, listChangedNotification.added); + break; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 3c02cda..bbb41a5 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -60,7 +60,6 @@ public class LeveledManifest private final Map<SSTableReader, Integer> sstableGenerations; private final RowPosition[] lastCompactedKeys; private final int maxSSTableSizeInMB; - private int levelCount; private LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB) { @@ -165,11 +164,10 @@ public class LeveledManifest int maximumLevel = 0; for (SSTableReader sstable : removed) { - int thisLevel = levelOf(sstable); + int thisLevel = remove(sstable); assert thisLevel >= 0; maximumLevel = Math.max(maximumLevel, thisLevel); minimumLevel = Math.min(minimumLevel, thisLevel); - remove(sstable); } // it's valid to do a remove w/o an add (e.g. on truncate) @@ -189,6 +187,22 @@ public class LeveledManifest serialize(); } + public synchronized void replace(Iterable<SSTableReader> removed, Iterable<SSTableReader> added) + { + // replace is for compaction operation that don't really change the + // content of a sstable (cleanup, scrub) and much replace one sstable by another + assert Iterables.size(removed) == 1; + assert Iterables.size(added) == 1; + SSTableReader toRemove = removed.iterator().next(); + SSTableReader toAdd = added.iterator().next(); + logDistribution(); + if (logger.isDebugEnabled()) + logger.debug("Replacing " + removed + " by " + toAdd); + + add(toAdd, remove(toRemove)); + serialize(); + } + private String toString(Iterable<SSTableReader> sstables) { StringBuilder builder = new StringBuilder(); @@ -291,12 +305,15 @@ public class LeveledManifest private void logDistribution() { - for (int i = 0; i < generations.length; i++) + if (logger.isDebugEnabled()) { - if (!generations[i].isEmpty()) + for (int i = 0; i < generations.length; i++) { - logger.debug("L{} contains {} SSTables ({} bytes) in {}", - new Object[] {i, generations[i].size(), SSTableReader.getTotalBytes(generations[i]), this}); + if (!generations[i].isEmpty()) + { + logger.debug("L{} contains {} SSTables ({} bytes) in {}", + new Object[] {i, generations[i].size(), SSTableReader.getTotalBytes(generations[i]), this}); + } } } } @@ -310,16 +327,18 @@ public class LeveledManifest return level.intValue(); } - private void remove(SSTableReader reader) + private int remove(SSTableReader reader) { int level = levelOf(reader); assert level >= 0 : reader + " not present in manifest"; generations[level].remove(reader); sstableGenerations.remove(reader); + return level; } private void add(SSTableReader sstable, int level) { + assert level < generations.length : "Invalid level " + level + " out of " + (generations.length - 1); generations[level].add(sstable); sstableGenerations.put(sstable, Integer.valueOf(level)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/compaction/OperationType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java index 75e3d06..79f6c5e 100644 --- a/src/java/org/apache/cassandra/db/compaction/OperationType.java +++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java @@ -25,6 +25,7 @@ public enum OperationType ROW_CACHE_SAVE("Row cache save"), CLEANUP("Cleanup"), SCRUB("Scrub"), + UPGRADE_SSTABLES("Upgrade sstables"), INDEX_BUILD("Secondary index build"), UNKNOWN("Unkown compaction type"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java b/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java index 770fd48..ca7ead9 100644 --- a/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java +++ b/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java @@ -19,13 +19,18 @@ package org.apache.cassandra.notifications; import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.db.compaction.OperationType; + public class SSTableListChangedNotification implements INotification { public final Iterable<SSTableReader> removed; public final Iterable<SSTableReader> added; - public SSTableListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed) + public final OperationType compactionType; + + public SSTableListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed, OperationType compactionType) { this.removed = removed; this.added = added; + this.compactionType = compactionType; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index a9c66bf..bd49c23 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -34,6 +34,7 @@ import org.apache.cassandra.CleanupHelper; import org.apache.cassandra.Util; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.io.sstable.*; @@ -273,7 +274,7 @@ public class CompactionsTest extends CleanupHelper assertEquals(2, store.getSSTables().size()); // Now, we remove the sstable that was just created to force the use of EchoedRow (so that it doesn't hide the problem) - store.markCompacted(Collections.singleton(tmpSSTable)); + store.markCompacted(Collections.singleton(tmpSSTable), OperationType.UNKNOWN); assertEquals(1, store.getSSTables().size()); // Now assert we do have the 4 keys