fix potential for multiple concurrent compactions of the same sstables
patch by jbellis and yukim; reviewed by slebresne for CASSANDRA-5256


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/686f516c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/686f516c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/686f516c

Branch: refs/heads/cassandra-1.2
Commit: 686f516ccb887fe977238b53b9be307b56432b8c
Parents: 457b546
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Mon Feb 18 15:31:49 2013 -0600
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Wed Feb 20 05:38:57 2013 -0800

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |    1 -
 .../db/compaction/AbstractCompactionStrategy.java  |    8 ++
 .../db/compaction/AbstractCompactionTask.java      |   34 ++++++--
 .../cassandra/db/compaction/CompactionManager.java |   67 +++-----------
 .../cassandra/db/compaction/CompactionTask.java    |    8 +--
 .../db/compaction/LeveledCompactionStrategy.java   |   43 +++++-----
 .../compaction/SizeTieredCompactionStrategy.java   |   42 +++++++---
 .../LongLeveledCompactionStrategyTest.java         |    9 +--
 test/unit/org/apache/cassandra/Util.java           |    4 +-
 .../db/compaction/CompactionsPurgeTest.java        |    4 +-
 11 files changed, 114 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a543ac1..0489968 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 1.2.2
+ * fix potential for multiple concurrent compactions of the same sstables
+   (CASSANDRA-5256)
  * avoid no-op caching of byte[] on commitlog append (CASSANDRA-5199)
  * fix symlinks under data dir not working (CASSANDRA-5185)
  * fix bug in compact storage metadata handling (CASSANDRA-5189)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index c08224e..84ed1a1 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -879,7 +879,6 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         for (SSTableReader sstable : sstables)
         {
             Set<SSTableReader> overlaps = 
ImmutableSet.copyOf(tree.search(Interval.<RowPosition, 
SSTableReader>create(sstable.first, sstable.last)));
-            assert overlaps.contains(sstable);
             results = results == null ? overlaps : Sets.union(results, 
overlaps).immutableCopy();
         }
         results = Sets.difference(results, ImmutableSet.copyOf(sstables));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 356289c..cb15109 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -85,23 +85,31 @@ public abstract class AbstractCompactionStrategy
 
     /**
      * @param gcBefore throw away tombstones older than this
+     *
      * @return the next background/minor compaction task to run; null if 
nothing to do.
+     *
      * Is responsible for marking its sstables as compaction-pending.
      */
     public abstract AbstractCompactionTask getNextBackgroundTask(final int 
gcBefore);
 
     /**
      * @param gcBefore throw away tombstones older than this
+     *
      * @return a compaction task that should be run to compact this 
columnfamilystore
      * as much as possible.  Null if nothing to do.
+     *
+     * Is responsible for marking its sstables as compaction-pending.
      */
     public abstract AbstractCompactionTask getMaximalTask(final int gcBefore);
 
     /**
      * @param sstables SSTables to compact. Must be marked as compacting.
      * @param gcBefore throw away tombstones older than this
+     *
      * @return a compaction task corresponding to the requested sstables.
      * Will not be null. (Will throw if user requests an invalid compaction.)
+     *
+     * Is responsible for marking its sstables as compaction-pending.
      */
     public abstract AbstractCompactionTask 
getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index 0913765..70521dd 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -18,6 +18,9 @@
 package org.apache.cassandra.db.compaction;
 
 import java.util.Collection;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.io.sstable.SSTableReader;
@@ -32,24 +35,43 @@ public abstract class AbstractCompactionTask extends 
DiskAwareRunnable
     protected boolean isUserDefined;
     protected OperationType compactionType;
 
+    /**
+     * @param cfs
+     * @param sstables must be marked compacting
+     */
     public AbstractCompactionTask(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables)
     {
         this.cfs = cfs;
         this.sstables = sstables;
         this.isUserDefined = false;
         this.compactionType = OperationType.COMPACTION;
-    }
 
-    public abstract int execute(CompactionExecutorStatsCollector collector);
+        // enforce contract that caller should mark sstables compacting
+        Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
+        for (SSTableReader sstable : sstables)
+            assert compacting.contains(sstable) : sstable.getFilename() + " is 
not correctly marked compacting";
+    }
 
-    protected Directories getDirectories()
+    /**
+     * executes the task and unmarks sstables compacting
+     */
+    public int execute(CompactionExecutorStatsCollector collector)
     {
-        return cfs.directories;
+        try
+        {
+            return executeInternal(collector);
+        }
+        finally
+        {
+            cfs.getDataTracker().unmarkCompacting(sstables);
+        }
     }
 
-    public void unmarkSSTables()
+    protected abstract int executeInternal(CompactionExecutorStatsCollector 
collector);
+
+    protected Directories getDirectories()
     {
-        cfs.getDataTracker().unmarkCompacting(sstables);
+        return cfs.directories;
     }
 
     public AbstractCompactionTask setUserDefined(boolean isUserDefined)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 1d9af16..14e1b13 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -188,14 +188,7 @@ public class CompactionManager implements 
CompactionManagerMBean
                     logger.debug("No tasks available");
                     return;
                 }
-                try
-                {
-                    task.execute(metrics);
-                }
-                finally
-                {
-                    task.unmarkSSTables();
-                }
+                task.execute(metrics);
             }
             finally
             {
@@ -331,23 +324,16 @@ public class CompactionManager implements 
CompactionManagerMBean
                     AbstractCompactionTask task = 
cfStore.getCompactionStrategy().getMaximalTask(gcBefore);
                     if (task == null)
                         return;
+                    // downgrade the lock acquisition
+                    compactionLock.readLock().lock();
+                    compactionLock.writeLock().unlock();
                     try
                     {
-                        // downgrade the lock acquisition
-                        compactionLock.readLock().lock();
-                        compactionLock.writeLock().unlock();
-                        try
-                        {
-                            task.execute(metrics);
-                        }
-                        finally
-                        {
-                            compactionLock.readLock().unlock();
-                        }
+                        task.execute(metrics);
                     }
                     finally
                     {
-                        task.unmarkSSTables();
+                        compactionLock.readLock().unlock();
                     }
                 }
                 finally
@@ -425,35 +411,15 @@ public class CompactionManager implements 
CompactionManagerMBean
                         }
                     }
 
-                    try
+                    if (sstables.isEmpty())
                     {
-                        if (sstables.isEmpty())
-                        {
-                            logger.info("No file to compact for user defined 
compaction");
-                        }
-                        // attempt to schedule the set
-                        else if (cfs.getDataTracker().markCompacting(sstables))
-                        {
-                            // success: perform the compaction
-                            try
-                            {
-                                AbstractCompactionStrategy strategy = 
cfs.getCompactionStrategy();
-                                AbstractCompactionTask task = 
strategy.getUserDefinedTask(sstables, gcBefore);
-                                task.execute(metrics);
-                            }
-                            finally
-                            {
-                                
cfs.getDataTracker().unmarkCompacting(sstables);
-                            }
-                        }
-                        else
-                        {
-                            logger.info("SSTables for user defined compaction 
are already being compacted.");
-                        }
+                        logger.info("No files to compact for user defined 
compaction");
                     }
-                    finally
+                    else
                     {
-                        SSTableReader.releaseReferences(sstables);
+                        AbstractCompactionTask task = 
cfs.getCompactionStrategy().getUserDefinedTask(sstables, gcBefore);
+                        if (task != null)
+                            task.execute(metrics);
                     }
                 }
                 finally
@@ -469,19 +435,16 @@ public class CompactionManager implements 
CompactionManagerMBean
     // This is not efficent, do not use in any critical path
     private SSTableReader lookupSSTable(final ColumnFamilyStore cfs, 
Descriptor descriptor)
     {
-        SSTableReader found = null;
-        for (SSTableReader sstable : cfs.markCurrentSSTablesReferenced())
+        for (SSTableReader sstable : cfs.getSSTables())
         {
             // .equals() with no other changes won't work because in 
sstable.descriptor, the directory is an absolute path.
             // We could construct descriptor with an absolute path too but I 
haven't found any satisfying way to do that
             // (DB.getDataFileLocationForTable() may not return the right path 
if you have multiple volumes). Hence the
             // endsWith.
             if (sstable.descriptor.toString().endsWith(descriptor.toString()))
-                found = sstable;
-            else
-                sstable.releaseReference();
+                return sstable;
         }
-        return found;
+        return null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 3d5aebf..75ea1cb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.*;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -52,12 +53,7 @@ public class CompactionTask extends AbstractCompactionTask
         return totalBytesCompacted += bytesCompacted;
     }
 
-    /**
-     * For internal use and testing only.  The rest of the system should go 
through the submit* methods,
-     * which are properly serialized.
-     * Caller is in charge of marking/unmarking the sstables as compacting.
-     */
-    public int execute(CompactionExecutorStatsCollector collector)
+    protected int executeInternal(CompactionExecutorStatsCollector collector)
     {
         this.collector = collector;
         run();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index fe5daf5..5b29bfc 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -95,32 +95,32 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy implem
         return getMaximalTask(gcBefore);
     }
 
-    public synchronized AbstractCompactionTask getMaximalTask(int gcBefore)
+    public AbstractCompactionTask getMaximalTask(int gcBefore)
     {
-        Collection<SSTableReader> sstables = 
manifest.getCompactionCandidates();
-        OperationType op = OperationType.COMPACTION;
-        if (sstables.isEmpty())
+        while (true)
         {
-            // if there is no sstable to compact in standard way, try 
compacting based on droppable tombstone ratio
-            SSTableReader sstable = findDroppableSSTable(gcBefore);
-            if (sstable == null)
+            Collection<SSTableReader> sstables = 
manifest.getCompactionCandidates();
+            OperationType op = OperationType.COMPACTION;
+            if (sstables.isEmpty())
             {
-                logger.debug("No compaction necessary for {}", this);
-                return null;
+                // if there is no sstable to compact in standard way, try 
compacting based on droppable tombstone ratio
+                SSTableReader sstable = findDroppableSSTable(gcBefore);
+                if (sstable == null)
+                {
+                    logger.debug("No compaction necessary for {}", this);
+                    return null;
+                }
+                sstables = Collections.singleton(sstable);
+                op = OperationType.TOMBSTONE_COMPACTION;
             }
-            sstables = Collections.singleton(sstable);
-            op = OperationType.TOMBSTONE_COMPACTION;
-        }
 
-        if (!cfs.getDataTracker().markCompacting(sstables))
-        {
-            logger.debug("Unable to mark {} for compaction; probably a 
user-defined compaction got in the way", sstables);
-            return null;
+            if (cfs.getDataTracker().markCompacting(sstables))
+            {
+                LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, 
sstables, gcBefore, maxSSTableSizeInMB);
+                newTask.setCompactionType(op);
+                return newTask;
+            }
         }
-
-        LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, 
sstables, gcBefore, maxSSTableSizeInMB);
-        newTask.setCompactionType(op);
-        return newTask;
     }
 
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> 
sstables, int gcBefore)
@@ -289,11 +289,12 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy implem
             if (sstables.isEmpty())
                 continue;
 
+            Set<SSTableReader> compacting = 
cfs.getDataTracker().getCompacting();
             for (SSTableReader sstable : sstables)
             {
                 if (sstable.getEstimatedDroppableTombstoneRatio(gcBefore) <= 
tombstoneThreshold)
                     continue level;
-                else if (!sstable.isMarkedSuspect() && 
worthDroppingTombstones(sstable, gcBefore))
+                else if (!compacting.contains(sstable) && 
!sstable.isMarkedSuspect() && worthDroppingTombstones(sstable, gcBefore))
                     return sstable;
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index fab087e..7957c8d 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -58,8 +58,7 @@ public class SizeTieredCompactionStrategy extends 
AbstractCompactionStrategy
         cfs.setCompactionThresholds(cfs.metadata.getMinCompactionThreshold(), 
cfs.metadata.getMaxCompactionThreshold());
     }
 
-    // synchronized so that multiple callers as in 
CompactionManager.submitBackground will compute different candidates
-    public synchronized AbstractCompactionTask getNextBackgroundTask(final int 
gcBefore)
+    private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
     {
         // make local copies so they can't be changed out from under us 
mid-method
         int minThreshold = cfs.getMinimumCompactionThreshold();
@@ -67,7 +66,7 @@ public class SizeTieredCompactionStrategy extends 
AbstractCompactionStrategy
         if (minThreshold == 0 || maxThreshold == 0)
         {
             logger.debug("Compaction is currently disabled.");
-            return null;
+            return Collections.emptyList();
         }
 
         Set<SSTableReader> candidates = cfs.getUncompactingSSTables();
@@ -106,10 +105,10 @@ public class SizeTieredCompactionStrategy extends 
AbstractCompactionStrategy
             }
 
             if (prunedBuckets.isEmpty())
-                return null;
+                return Collections.emptyList();
         }
 
-        List<SSTableReader> smallestBucket = Collections.min(prunedBuckets, 
new Comparator<List<SSTableReader>>()
+        return Collections.min(prunedBuckets, new 
Comparator<List<SSTableReader>>()
         {
             public int compare(List<SSTableReader> o1, List<SSTableReader> o2)
             {
@@ -129,23 +128,44 @@ public class SizeTieredCompactionStrategy extends 
AbstractCompactionStrategy
                 return n / sstables.size();
             }
         });
+    }
 
-        if (!cfs.getDataTracker().markCompacting(smallestBucket))
+    public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+    {
+        while (true)
         {
-            logger.debug("Unable to mark {} for compaction; probably a 
user-defined compaction got in the way", smallestBucket);
-            return null;
-        }
+            List<SSTableReader> smallestBucket = 
getNextBackgroundSSTables(gcBefore);
+
+            if (smallestBucket.isEmpty())
+                return null;
 
-        return new CompactionTask(cfs, smallestBucket, gcBefore);
+            if (cfs.getDataTracker().markCompacting(smallestBucket))
+                return new CompactionTask(cfs, smallestBucket, gcBefore);
+        }
     }
 
     public AbstractCompactionTask getMaximalTask(final int gcBefore)
     {
-        return cfs.getSSTables().isEmpty() ? null : new CompactionTask(cfs, 
filterSuspectSSTables(cfs.getSSTables()), gcBefore);
+        while (true)
+        {
+            List<SSTableReader> sstables = 
filterSuspectSSTables(cfs.getUncompactingSSTables());
+            if (sstables.isEmpty())
+                return null;
+            if (cfs.getDataTracker().markCompacting(sstables))
+                return new CompactionTask(cfs, sstables, gcBefore);
+        }
     }
 
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> 
sstables, final int gcBefore)
     {
+        assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
+
+        if (!cfs.getDataTracker().markCompacting(sstables))
+        {
+            logger.debug("Unable to mark {} for compaction; probably a 
background compaction got to it first.  You can disable background compactions 
temporarily if this is a problem", sstables);
+            return null;
+        }
+
         return new CompactionTask(cfs, sstables, 
gcBefore).setUserDefined(true);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
 
b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index 0ba9d7c..1fbdc23 100644
--- 
a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ 
b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -85,14 +85,7 @@ public class LongLeveledCompactionStrategyTest extends 
SchemaLoader
                 {
                     public void run()
                     {
-                        try
-                        {
-                            t.execute(null);
-                        }
-                        finally
-                        {
-                            t.unmarkSSTables();
-                        }
+                        t.execute(null);
                     }
                 });
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java 
b/test/unit/org/apache/cassandra/Util.java
index 1e9031e..a6ecbca 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.AbstractCompactionTask;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.CompactionTask;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
@@ -258,7 +259,8 @@ public class Util
 
     public static void compact(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables)
     {
-        CompactionTask task = new CompactionTask(cfs, sstables, (int) 
(System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds());
+        int gcBefore = (int) (System.currentTimeMillis() / 1000) - 
cfs.metadata.getGcGraceSeconds();
+        AbstractCompactionTask task = 
cfs.getCompactionStrategy().getUserDefinedTask(sstables, gcBefore);
         task.execute(null);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/686f516c/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index deac172..827257f 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -137,7 +137,7 @@ public class CompactionsPurgeTest extends SchemaLoader
         rm.add(new QueryPath(cfName, null, 
ByteBufferUtil.bytes(String.valueOf(5))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 2);
         rm.apply();
         cfs.forceBlockingFlush();
-        new CompactionTask(cfs, sstablesIncomplete, 
Integer.MAX_VALUE).execute(null);
+        cfs.getCompactionStrategy().getUserDefinedTask(sstablesIncomplete, 
Integer.MAX_VALUE).execute(null);
 
         // verify that minor compaction does not GC when key is present
         // in a non-compacted sstable
@@ -178,7 +178,7 @@ public class CompactionsPurgeTest extends SchemaLoader
         rm.delete(new QueryPath(cfName, null, ByteBufferUtil.bytes("c2")), 9);
         rm.apply();
         cfs.forceBlockingFlush();
-        new CompactionTask(cfs, sstablesIncomplete, 
Integer.MAX_VALUE).execute(null);
+        cfs.getCompactionStrategy().getUserDefinedTask(sstablesIncomplete, 
Integer.MAX_VALUE).execute(null);
 
         ColumnFamily cf = 
cfs.getColumnFamily(QueryFilter.getIdentityFilter(key3, new QueryPath(cfName)));
         Assert.assertTrue(!cf.getColumn(ByteBufferUtil.bytes("c2")).isLive());

Reply via email to