Author: jbellis Date: Wed Aug 31 16:32:37 2011 New Revision: 1163688 URL: http://svn.apache.org/viewvc?rev=1163688&view=rev Log: fix race that allowed multiple simultaneous leveled compaction tasks patch by jbellis; reviewed by Ben Coverston for CASSANDRA-3087
Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1163688&r1=1163687&r2=1163688&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Wed Aug 31 16:32:37 2011 @@ -44,7 +44,7 @@ Thrift<->Avro conversion methods (CASSANDRA-3032) * Add timeouts to client request schedulers (CASSANDRA-3079) * Cli to use hashes rather than array of hashes for strategy options (CASSANDRA-3081) - * LeveledCompactionStrategy (CASSANDRA-1608, 3085, 3110) + * LeveledCompactionStrategy (CASSANDRA-1608, 3085, 3110, 3087) * Improvements of the CLI `describe` command (CASSANDRA-2630) * reduce window where dropped CF sstables may not be deleted (CASSANDRA-2942) * Expose gossip/FD info to JMX (CASSANDRA-2806) Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1163688&r1=1163687&r2=1163688&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Wed Aug 31 16:32:37 2011 @@ -66,7 +66,17 @@ public class CompactionManager implement public static final String MBEAN_OBJECT_NAME = "org.apache.cassandra.db:type=CompactionManager"; private static final Logger logger = LoggerFactory.getLogger(CompactionManager.class); public static final CompactionManager instance; - // acquire as read to perform a compaction, and as write to prevent compactions + + /** + * compactionLock has two purposes: + * - Compaction acquires its readLock so that multiple compactions can happen simultaneously, + * but the KS/CF migtations acquire its writeLock, so they can be sure no new SSTables will + * be created for a dropped CF posthumously. (Thus, compaction checks CFS.isValid while the + * lock is acquired.) + * - "Special" compactions will acquire writelock instead of readlock to make sure that all + * other compaction activity is quiesced and they can grab ALL the sstables to do something. + * TODO this is too big a hammer -- we should only care about quiescing all for the given CFS. + */ private final ReentrantReadWriteLock compactionLock = new ReentrantReadWriteLock(); static @@ -143,7 +153,6 @@ public class CompactionManager implement { public Object call() throws IOException { - // acquire the write lock to schedule all sstables compactionLock.writeLock().lock(); try { Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java?rev=1163688&r1=1163687&r2=1163688&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java Wed Aug 31 16:32:37 2011 @@ -5,6 +5,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -12,7 +13,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DataTracker; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.notifications.INotification; import org.apache.cassandra.notifications.INotificationConsumer; @@ -27,6 +27,7 @@ public class LeveledCompactionStrategy e private LeveledManifest manifest; private final String SSTABLE_SIZE_OPTION = "sstable_size_in_mb"; private final int maxSSTableSize; + private final AtomicReference<LeveledCompactionTask> task = new AtomicReference<LeveledCompactionTask>(); public class ScheduledBackgroundCompaction implements Runnable { @@ -90,14 +91,21 @@ public class LeveledCompactionStrategy e return manifest.getLevelSize(i); } - public synchronized List<AbstractCompactionTask> getBackgroundTasks(int gcBefore) + public List<AbstractCompactionTask> getBackgroundTasks(int gcBefore) { + LeveledCompactionTask currentTask = task.get(); + if (currentTask != null && !currentTask.isDone()) + return Collections.emptyList(); + Collection<SSTableReader> sstables = manifest.getCompactionCandidates(); logger.debug("CompactionManager candidates are {}", StringUtils.join(sstables, ",")); if (sstables.isEmpty()) return Collections.emptyList(); - LeveledCompactionTask task = new LeveledCompactionTask(cfs, sstables, gcBefore, this.maxSSTableSize); - return Collections.<AbstractCompactionTask>singletonList(task); + + LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, sstables, gcBefore, this.maxSSTableSize); + return task.compareAndSet(currentTask, newTask) + ? Collections.<AbstractCompactionTask>singletonList(newTask) + : Collections.<AbstractCompactionTask>emptyList(); } public List<AbstractCompactionTask> getMaximalTasks(int gcBefore) Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java?rev=1163688&r1=1163687&r2=1163688&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java Wed Aug 31 16:32:37 2011 @@ -1,9 +1,10 @@ package org.apache.cassandra.db.compaction; +import java.io.IOException; import java.util.Collection; +import java.util.concurrent.CountDownLatch; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.compaction.CompactionTask; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableWriter; @@ -11,6 +12,8 @@ public class LeveledCompactionTask exten { private final int sstableSizeInMB; + private final CountDownLatch latch = new CountDownLatch(1); + public LeveledCompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, final int gcBefore, int sstableSizeInMB) { super(cfs, sstables, gcBefore); @@ -18,6 +21,19 @@ public class LeveledCompactionTask exten } @Override + public int execute(CompactionManager.CompactionExecutorStatsCollector collector) throws IOException + { + int n = super.execute(collector); + latch.countDown(); + return n; + } + + public boolean isDone() + { + return latch.getCount() == 0; + } + + @Override protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer, long position) { return position > sstableSizeInMB * 1024 * 1024;