fix potential for multiple concurrent compactions of the same sstables patch by jbellis and yukim; reviewed by slebresne for CASSANDRA-5256
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/686f516c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/686f516c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/686f516c Branch: refs/heads/cassandra-1.2 Commit: 686f516ccb887fe977238b53b9be307b56432b8c Parents: 457b546 Author: Jonathan Ellis <jbel...@apache.org> Authored: Mon Feb 18 15:31:49 2013 -0600 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Wed Feb 20 05:38:57 2013 -0800 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 1 - .../db/compaction/AbstractCompactionStrategy.java | 8 ++ .../db/compaction/AbstractCompactionTask.java | 34 ++++++-- .../cassandra/db/compaction/CompactionManager.java | 67 +++----------- .../cassandra/db/compaction/CompactionTask.java | 8 +-- .../db/compaction/LeveledCompactionStrategy.java | 43 +++++----- .../compaction/SizeTieredCompactionStrategy.java | 42 +++++++--- .../LongLeveledCompactionStrategyTest.java | 9 +-- test/unit/org/apache/cassandra/Util.java | 4 +- .../db/compaction/CompactionsPurgeTest.java | 4 +- 11 files changed, 114 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a543ac1..0489968 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 1.2.2 + * fix potential for multiple concurrent compactions of the same sstables + (CASSANDRA-5256) * avoid no-op caching of byte[] on commitlog append (CASSANDRA-5199) * fix symlinks under data dir not working (CASSANDRA-5185) * fix bug in compact storage metadata handling (CASSANDRA-5189) http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index c08224e..84ed1a1 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -879,7 +879,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean for (SSTableReader sstable : sstables) { Set<SSTableReader> overlaps = ImmutableSet.copyOf(tree.search(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last))); - assert overlaps.contains(sstable); results = results == null ? overlaps : Sets.union(results, overlaps).immutableCopy(); } results = Sets.difference(results, ImmutableSet.copyOf(sstables)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 356289c..cb15109 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -85,23 +85,31 @@ public abstract class AbstractCompactionStrategy /** * @param gcBefore throw away tombstones older than this + * * @return the next background/minor compaction task to run; null if nothing to do. + * * Is responsible for marking its sstables as compaction-pending. */ public abstract AbstractCompactionTask getNextBackgroundTask(final int gcBefore); /** * @param gcBefore throw away tombstones older than this + * * @return a compaction task that should be run to compact this columnfamilystore * as much as possible. Null if nothing to do. + * + * Is responsible for marking its sstables as compaction-pending. */ public abstract AbstractCompactionTask getMaximalTask(final int gcBefore); /** * @param sstables SSTables to compact. Must be marked as compacting. * @param gcBefore throw away tombstones older than this + * * @return a compaction task corresponding to the requested sstables. * Will not be null. (Will throw if user requests an invalid compaction.) + * + * Is responsible for marking its sstables as compaction-pending. */ public abstract AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore); http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java index 0913765..70521dd 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java @@ -18,6 +18,9 @@ package org.apache.cassandra.db.compaction; import java.util.Collection; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.db.Directories; import org.apache.cassandra.io.sstable.SSTableReader; @@ -32,24 +35,43 @@ public abstract class AbstractCompactionTask extends DiskAwareRunnable protected boolean isUserDefined; protected OperationType compactionType; + /** + * @param cfs + * @param sstables must be marked compacting + */ public AbstractCompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables) { this.cfs = cfs; this.sstables = sstables; this.isUserDefined = false; this.compactionType = OperationType.COMPACTION; - } - public abstract int execute(CompactionExecutorStatsCollector collector); + // enforce contract that caller should mark sstables compacting + Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting(); + for (SSTableReader sstable : sstables) + assert compacting.contains(sstable) : sstable.getFilename() + " is not correctly marked compacting"; + } - protected Directories getDirectories() + /** + * executes the task and unmarks sstables compacting + */ + public int execute(CompactionExecutorStatsCollector collector) { - return cfs.directories; + try + { + return executeInternal(collector); + } + finally + { + cfs.getDataTracker().unmarkCompacting(sstables); + } } - public void unmarkSSTables() + protected abstract int executeInternal(CompactionExecutorStatsCollector collector); + + protected Directories getDirectories() { - cfs.getDataTracker().unmarkCompacting(sstables); + return cfs.directories; } public AbstractCompactionTask setUserDefined(boolean isUserDefined) http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 1d9af16..14e1b13 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -188,14 +188,7 @@ public class CompactionManager implements CompactionManagerMBean logger.debug("No tasks available"); return; } - try - { - task.execute(metrics); - } - finally - { - task.unmarkSSTables(); - } + task.execute(metrics); } finally { @@ -331,23 +324,16 @@ public class CompactionManager implements CompactionManagerMBean AbstractCompactionTask task = cfStore.getCompactionStrategy().getMaximalTask(gcBefore); if (task == null) return; + // downgrade the lock acquisition + compactionLock.readLock().lock(); + compactionLock.writeLock().unlock(); try { - // downgrade the lock acquisition - compactionLock.readLock().lock(); - compactionLock.writeLock().unlock(); - try - { - task.execute(metrics); - } - finally - { - compactionLock.readLock().unlock(); - } + task.execute(metrics); } finally { - task.unmarkSSTables(); + compactionLock.readLock().unlock(); } } finally @@ -425,35 +411,15 @@ public class CompactionManager implements CompactionManagerMBean } } - try + if (sstables.isEmpty()) { - if (sstables.isEmpty()) - { - logger.info("No file to compact for user defined compaction"); - } - // attempt to schedule the set - else if (cfs.getDataTracker().markCompacting(sstables)) - { - // success: perform the compaction - try - { - AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); - AbstractCompactionTask task = strategy.getUserDefinedTask(sstables, gcBefore); - task.execute(metrics); - } - finally - { - cfs.getDataTracker().unmarkCompacting(sstables); - } - } - else - { - logger.info("SSTables for user defined compaction are already being compacted."); - } + logger.info("No files to compact for user defined compaction"); } - finally + else { - SSTableReader.releaseReferences(sstables); + AbstractCompactionTask task = cfs.getCompactionStrategy().getUserDefinedTask(sstables, gcBefore); + if (task != null) + task.execute(metrics); } } finally @@ -469,19 +435,16 @@ public class CompactionManager implements CompactionManagerMBean // This is not efficent, do not use in any critical path private SSTableReader lookupSSTable(final ColumnFamilyStore cfs, Descriptor descriptor) { - SSTableReader found = null; - for (SSTableReader sstable : cfs.markCurrentSSTablesReferenced()) + for (SSTableReader sstable : cfs.getSSTables()) { // .equals() with no other changes won't work because in sstable.descriptor, the directory is an absolute path. // We could construct descriptor with an absolute path too but I haven't found any satisfying way to do that // (DB.getDataFileLocationForTable() may not return the right path if you have multiple volumes). Hence the // endsWith. if (sstable.descriptor.toString().endsWith(descriptor.toString())) - found = sstable; - else - sstable.releaseReference(); + return sstable; } - return found; + return null; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 3d5aebf..75ea1cb 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.util.*; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -52,12 +53,7 @@ public class CompactionTask extends AbstractCompactionTask return totalBytesCompacted += bytesCompacted; } - /** - * For internal use and testing only. The rest of the system should go through the submit* methods, - * which are properly serialized. - * Caller is in charge of marking/unmarking the sstables as compacting. - */ - public int execute(CompactionExecutorStatsCollector collector) + protected int executeInternal(CompactionExecutorStatsCollector collector) { this.collector = collector; run(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index fe5daf5..5b29bfc 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -95,32 +95,32 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem return getMaximalTask(gcBefore); } - public synchronized AbstractCompactionTask getMaximalTask(int gcBefore) + public AbstractCompactionTask getMaximalTask(int gcBefore) { - Collection<SSTableReader> sstables = manifest.getCompactionCandidates(); - OperationType op = OperationType.COMPACTION; - if (sstables.isEmpty()) + while (true) { - // if there is no sstable to compact in standard way, try compacting based on droppable tombstone ratio - SSTableReader sstable = findDroppableSSTable(gcBefore); - if (sstable == null) + Collection<SSTableReader> sstables = manifest.getCompactionCandidates(); + OperationType op = OperationType.COMPACTION; + if (sstables.isEmpty()) { - logger.debug("No compaction necessary for {}", this); - return null; + // if there is no sstable to compact in standard way, try compacting based on droppable tombstone ratio + SSTableReader sstable = findDroppableSSTable(gcBefore); + if (sstable == null) + { + logger.debug("No compaction necessary for {}", this); + return null; + } + sstables = Collections.singleton(sstable); + op = OperationType.TOMBSTONE_COMPACTION; } - sstables = Collections.singleton(sstable); - op = OperationType.TOMBSTONE_COMPACTION; - } - if (!cfs.getDataTracker().markCompacting(sstables)) - { - logger.debug("Unable to mark {} for compaction; probably a user-defined compaction got in the way", sstables); - return null; + if (cfs.getDataTracker().markCompacting(sstables)) + { + LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, sstables, gcBefore, maxSSTableSizeInMB); + newTask.setCompactionType(op); + return newTask; + } } - - LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, sstables, gcBefore, maxSSTableSizeInMB); - newTask.setCompactionType(op); - return newTask; } public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore) @@ -289,11 +289,12 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem if (sstables.isEmpty()) continue; + Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting(); for (SSTableReader sstable : sstables) { if (sstable.getEstimatedDroppableTombstoneRatio(gcBefore) <= tombstoneThreshold) continue level; - else if (!sstable.isMarkedSuspect() && worthDroppingTombstones(sstable, gcBefore)) + else if (!compacting.contains(sstable) && !sstable.isMarkedSuspect() && worthDroppingTombstones(sstable, gcBefore)) return sstable; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index fab087e..7957c8d 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -58,8 +58,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy cfs.setCompactionThresholds(cfs.metadata.getMinCompactionThreshold(), cfs.metadata.getMaxCompactionThreshold()); } - // synchronized so that multiple callers as in CompactionManager.submitBackground will compute different candidates - public synchronized AbstractCompactionTask getNextBackgroundTask(final int gcBefore) + private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore) { // make local copies so they can't be changed out from under us mid-method int minThreshold = cfs.getMinimumCompactionThreshold(); @@ -67,7 +66,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy if (minThreshold == 0 || maxThreshold == 0) { logger.debug("Compaction is currently disabled."); - return null; + return Collections.emptyList(); } Set<SSTableReader> candidates = cfs.getUncompactingSSTables(); @@ -106,10 +105,10 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy } if (prunedBuckets.isEmpty()) - return null; + return Collections.emptyList(); } - List<SSTableReader> smallestBucket = Collections.min(prunedBuckets, new Comparator<List<SSTableReader>>() + return Collections.min(prunedBuckets, new Comparator<List<SSTableReader>>() { public int compare(List<SSTableReader> o1, List<SSTableReader> o2) { @@ -129,23 +128,44 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy return n / sstables.size(); } }); + } - if (!cfs.getDataTracker().markCompacting(smallestBucket)) + public AbstractCompactionTask getNextBackgroundTask(int gcBefore) + { + while (true) { - logger.debug("Unable to mark {} for compaction; probably a user-defined compaction got in the way", smallestBucket); - return null; - } + List<SSTableReader> smallestBucket = getNextBackgroundSSTables(gcBefore); + + if (smallestBucket.isEmpty()) + return null; - return new CompactionTask(cfs, smallestBucket, gcBefore); + if (cfs.getDataTracker().markCompacting(smallestBucket)) + return new CompactionTask(cfs, smallestBucket, gcBefore); + } } public AbstractCompactionTask getMaximalTask(final int gcBefore) { - return cfs.getSSTables().isEmpty() ? null : new CompactionTask(cfs, filterSuspectSSTables(cfs.getSSTables()), gcBefore); + while (true) + { + List<SSTableReader> sstables = filterSuspectSSTables(cfs.getUncompactingSSTables()); + if (sstables.isEmpty()) + return null; + if (cfs.getDataTracker().markCompacting(sstables)) + return new CompactionTask(cfs, sstables, gcBefore); + } } public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore) { + assert !sstables.isEmpty(); // checked for by CM.submitUserDefined + + if (!cfs.getDataTracker().markCompacting(sstables)) + { + logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables); + return null; + } + return new CompactionTask(cfs, sstables, gcBefore).setUserDefined(true); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java index 0ba9d7c..1fbdc23 100644 --- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java +++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java @@ -85,14 +85,7 @@ public class LongLeveledCompactionStrategyTest extends SchemaLoader { public void run() { - try - { - t.execute(null); - } - finally - { - t.unmarkSSTables(); - } + t.execute(null); } }); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index 1e9031e..a6ecbca 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -34,6 +34,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.compaction.AbstractCompactionTask; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.CompactionTask; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; @@ -258,7 +259,8 @@ public class Util public static void compact(ColumnFamilyStore cfs, Collection<SSTableReader> sstables) { - CompactionTask task = new CompactionTask(cfs, sstables, (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds()); + int gcBefore = (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds(); + AbstractCompactionTask task = cfs.getCompactionStrategy().getUserDefinedTask(sstables, gcBefore); task.execute(null); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java index deac172..827257f 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java @@ -137,7 +137,7 @@ public class CompactionsPurgeTest extends SchemaLoader rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(5))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 2); rm.apply(); cfs.forceBlockingFlush(); - new CompactionTask(cfs, sstablesIncomplete, Integer.MAX_VALUE).execute(null); + cfs.getCompactionStrategy().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null); // verify that minor compaction does not GC when key is present // in a non-compacted sstable @@ -178,7 +178,7 @@ public class CompactionsPurgeTest extends SchemaLoader rm.delete(new QueryPath(cfName, null, ByteBufferUtil.bytes("c2")), 9); rm.apply(); cfs.forceBlockingFlush(); - new CompactionTask(cfs, sstablesIncomplete, Integer.MAX_VALUE).execute(null); + cfs.getCompactionStrategy().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null); ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key3, new QueryPath(cfName))); Assert.assertTrue(!cf.getColumn(ByteBufferUtil.bytes("c2")).isLive());