Author: jbellis
Date: Wed Aug 31 16:32:37 2011
New Revision: 1163688

URL: http://svn.apache.org/viewvc?rev=1163688&view=rev
Log:
fix race that allowed multiple simultaneous leveled compaction tasks
patch by jbellis; reviewed by Ben Coverston for CASSANDRA-3087

Modified:
    cassandra/trunk/CHANGES.txt
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1163688&r1=1163687&r2=1163688&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Aug 31 16:32:37 2011
@@ -44,7 +44,7 @@
    Thrift<->Avro conversion methods (CASSANDRA-3032)
  * Add timeouts to client request schedulers (CASSANDRA-3079)
  * Cli to use hashes rather than array of hashes for strategy options 
(CASSANDRA-3081)
- * LeveledCompactionStrategy (CASSANDRA-1608, 3085, 3110)
+ * LeveledCompactionStrategy (CASSANDRA-1608, 3085, 3110, 3087)
  * Improvements of the CLI `describe` command (CASSANDRA-2630)
  * reduce window where dropped CF sstables may not be deleted (CASSANDRA-2942)
  * Expose gossip/FD info to JMX (CASSANDRA-2806)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1163688&r1=1163687&r2=1163688&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 Wed Aug 31 16:32:37 2011
@@ -66,7 +66,17 @@ public class CompactionManager implement
     public static final String MBEAN_OBJECT_NAME = 
"org.apache.cassandra.db:type=CompactionManager";
     private static final Logger logger = 
LoggerFactory.getLogger(CompactionManager.class);
     public static final CompactionManager instance;
-    // acquire as read to perform a compaction, and as write to prevent 
compactions
+
+    /**
+     * compactionLock has two purposes:
+     * - Compaction acquires its readLock so that multiple compactions can 
happen simultaneously,
+     *   but the KS/CF migtations acquire its writeLock, so they can be sure 
no new SSTables will
+     *   be created for a dropped CF posthumously.  (Thus, compaction checks 
CFS.isValid while the
+     *   lock is acquired.)
+     * - "Special" compactions will acquire writelock instead of readlock to 
make sure that all
+     *   other compaction activity is quiesced and they can grab ALL the 
sstables to do something.
+     *   TODO this is too big a hammer -- we should only care about quiescing 
all for the given CFS.
+     */
     private final ReentrantReadWriteLock compactionLock = new 
ReentrantReadWriteLock();
 
     static
@@ -143,7 +153,6 @@ public class CompactionManager implement
         {
             public Object call() throws IOException
             {
-                // acquire the write lock to schedule all sstables
                 compactionLock.writeLock().lock();
                 try 
                 {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java?rev=1163688&r1=1163687&r2=1163688&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
 Wed Aug 31 16:32:37 2011
@@ -5,6 +5,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -12,7 +13,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.notifications.INotification;
 import org.apache.cassandra.notifications.INotificationConsumer;
@@ -27,6 +27,7 @@ public class LeveledCompactionStrategy e
     private LeveledManifest manifest;
     private final String SSTABLE_SIZE_OPTION = "sstable_size_in_mb";
     private final int maxSSTableSize;
+    private final AtomicReference<LeveledCompactionTask> task = new 
AtomicReference<LeveledCompactionTask>();
 
     public class ScheduledBackgroundCompaction implements Runnable
     {
@@ -90,14 +91,21 @@ public class LeveledCompactionStrategy e
         return manifest.getLevelSize(i);
     }
 
-    public synchronized List<AbstractCompactionTask> getBackgroundTasks(int 
gcBefore)
+    public List<AbstractCompactionTask> getBackgroundTasks(int gcBefore)
     {
+        LeveledCompactionTask currentTask = task.get();
+        if (currentTask != null && !currentTask.isDone())
+            return Collections.emptyList();
+
         Collection<SSTableReader> sstables = 
manifest.getCompactionCandidates();
         logger.debug("CompactionManager candidates are {}", 
StringUtils.join(sstables, ","));
         if (sstables.isEmpty())
             return Collections.emptyList();
-        LeveledCompactionTask task = new LeveledCompactionTask(cfs, sstables, 
gcBefore, this.maxSSTableSize);
-        return Collections.<AbstractCompactionTask>singletonList(task);
+
+        LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, 
sstables, gcBefore, this.maxSSTableSize);
+        return task.compareAndSet(currentTask, newTask)
+               ? Collections.<AbstractCompactionTask>singletonList(newTask)
+               : Collections.<AbstractCompactionTask>emptyList();
     }
 
     public List<AbstractCompactionTask> getMaximalTasks(int gcBefore)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java?rev=1163688&r1=1163687&r2=1163688&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
 Wed Aug 31 16:32:37 2011
@@ -1,9 +1,10 @@
 package org.apache.cassandra.db.compaction;
 
+import java.io.IOException;
 import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.CompactionTask;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 
@@ -11,6 +12,8 @@ public class LeveledCompactionTask exten
 {
     private final int sstableSizeInMB;
 
+    private final CountDownLatch latch = new CountDownLatch(1);
+
     public LeveledCompactionTask(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, final int gcBefore, int sstableSizeInMB)
     {
         super(cfs, sstables, gcBefore);
@@ -18,6 +21,19 @@ public class LeveledCompactionTask exten
     }
 
     @Override
+    public int execute(CompactionManager.CompactionExecutorStatsCollector 
collector) throws IOException
+    {
+        int n = super.execute(collector);
+        latch.countDown();
+        return n;
+    }
+
+    public boolean isDone()
+    {
+        return latch.getCount() == 0;
+    }
+
+    @Override
     protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer, 
long position)
     {
         return position > sstableSizeInMB * 1024 * 1024;


Reply via email to