Author: jbellis
Date: Tue Sep 20 13:13:04 2011
New Revision: 1173131

URL: http://svn.apache.org/viewvc?rev=1173131&view=rev
Log:
LeveledCompactionStrategy fixes
patch by jbellis; tested by brandonwilliams for CASSANDRA-3224

Modified:
    cassandra/branches/cassandra-1.0.0/CHANGES.txt
    
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
    
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java

Modified: cassandra/branches/cassandra-1.0.0/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/CHANGES.txt?rev=1173131&r1=1173130&r2=1173131&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0.0/CHANGES.txt Tue Sep 20 13:13:04 2011
@@ -10,6 +10,8 @@
    true) and set default badness threshold to 0.1 (CASSANDRA-3229)
  * Base choice of random or "balanced" token on bootstrap on whether
    schema definitions were found (CASSANDRA-3219)
+ * Fixes for LeveledCompactionStrategy score computation, prioritization,
+   and scheduling (CASSANDRA-3224)
 
 
 1.0.0-beta1

Modified: 
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1173131&r1=1173130&r2=1173131&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 (original)
+++ 
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 Tue Sep 20 13:13:04 2011
@@ -328,7 +328,7 @@ public class ColumnFamilyStore implement
      */
     public static void scrubDataDirectories(String table, String columnFamily)
     {
-        logger.info("Removing compacted SSTable files (see 
http://wiki.apache.org/cassandra/MemtableSSTable)");
+        logger.info("Removing compacted SSTable files from " + columnFamily + 
" (see http://wiki.apache.org/cassandra/MemtableSSTable)");
         for (Map.Entry<Descriptor,Set<Component>> sstableFiles : files(table, 
columnFamily, true, true).entrySet())
         {
             Descriptor desc = sstableFiles.getKey();

Modified: 
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java?rev=1173131&r1=1173130&r2=1173131&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
 (original)
+++ 
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
 Tue Sep 20 13:13:04 2011
@@ -76,9 +76,20 @@ public class LeveledCompactionStrategy e
         logger.info(this + " subscribed to the data tracker.");
 
         manifest = LeveledManifest.create(cfs, this.maxSSTableSize);
+        logger.debug("Created {}", manifest);
         // override min/max for this strategy
         cfs.setMaximumCompactionThreshold(Integer.MAX_VALUE);
         cfs.setMinimumCompactionThreshold(1);
+
+        // TODO this is redundant wrt the kickoff in 
AbstractCompactionStrategy, once CASSANDRA-X is done
+        Runnable runnable = new Runnable()
+        {
+            public void run()
+            {
+                
CompactionManager.instance.submitBackground(LeveledCompactionStrategy.this.cfs);
+            }
+        };
+        StorageService.optionalTasks.scheduleAtFixedRate(runnable, 5 * 60, 5, 
TimeUnit.SECONDS);
     }
 
     public void shutdown()
@@ -96,12 +107,17 @@ public class LeveledCompactionStrategy e
     {
         LeveledCompactionTask currentTask = task.get();
         if (currentTask != null && !currentTask.isDone())
+        {
+            logger.debug("Compaction still in progress for {}", this);
             return Collections.emptyList();
+        }
 
         Collection<SSTableReader> sstables = 
manifest.getCompactionCandidates();
-        logger.debug("CompactionManager candidates are {}", 
StringUtils.join(sstables, ","));
         if (sstables.isEmpty())
+        {
+            logger.debug("No compaction necessary for {}", this);
             return Collections.emptyList();
+        }
 
         LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, 
sstables, gcBefore, this.maxSSTableSize);
         return task.compareAndSet(currentTask, newTask)
@@ -139,4 +155,10 @@ public class LeveledCompactionStrategy e
             manifest.logDistribution();
         }
     }
+
+    @Override
+    public String toString()
+    {
+        return String.format("LCS@%d(%s)", hashCode(), cfs.columnFamily);
+    }
 }

Modified: 
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java?rev=1173131&r1=1173130&r2=1173131&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
 (original)
+++ 
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
 Tue Sep 20 13:13:04 2011
@@ -26,6 +26,7 @@ import java.io.IOError;
 import java.io.IOException;
 import java.util.*;
 
+import com.google.common.collect.Iterables;
 import org.apache.commons.lang.StringUtils;
 
 import org.slf4j.Logger;
@@ -37,6 +38,7 @@ import org.apache.cassandra.db.Decorated
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
 import org.codehaus.jackson.JsonEncoding;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerator;
@@ -47,6 +49,13 @@ public class LeveledManifest
 {
     private static final Logger logger = 
LoggerFactory.getLogger(LeveledCompactionStrategy.class);
 
+    /**
+     * limit the number of L0 sstables we do at once, because compaction bloom 
filter creation
+     * uses a pessimistic estimate of how many keys overlap (none), so we risk 
wasting memory
+     * or even OOMing when compacting highly overlapping sstables
+     */
+    private static int MAX_COMPACTING_L0 = 32;
+
     private final ColumnFamilyStore cfs;
     private final List<SSTableReader>[] generations;
     private final DecoratedKey[] lastCompactedKeys;
@@ -85,37 +94,37 @@ public class LeveledManifest
 
     private static void load(ColumnFamilyStore cfs, LeveledManifest manifest)
     {
+        File manifestFile = tryGetManifest(cfs);
+        if (manifestFile == null)
+            return;
+
         ObjectMapper m = new ObjectMapper();
         try
         {
-            File manifestFile = tryGetManifest(cfs);
-
-            if (manifestFile != null && manifestFile.exists())
+            JsonNode rootNode = m.readValue(manifestFile, JsonNode.class);
+            JsonNode generations = rootNode.get("generations");
+            assert generations.isArray();
+            for (JsonNode generation : generations)
             {
-                JsonNode rootNode = m.readValue(manifestFile, JsonNode.class);
-                JsonNode generations = rootNode.get("generations");
-                assert generations.isArray();
-                for (JsonNode generation : generations)
+                int level = generation.get("generation").getIntValue();
+                JsonNode generationValues = generation.get("members");
+                for (JsonNode generationValue : generationValues)
                 {
-                    int level = generation.get("generation").getIntValue();
-                    JsonNode generationValues = generation.get("members");
-                    for (JsonNode generationValue : generationValues)
+                    for (SSTableReader ssTableReader : cfs.getSSTables())
                     {
-                        for (SSTableReader ssTableReader : cfs.getSSTables())
+                        if (ssTableReader.descriptor.generation == 
generationValue.getIntValue())
                         {
-                            if (ssTableReader.descriptor.generation == 
generationValue.getIntValue())
-                            {
-                                logger.debug("Loading {} at L{}", 
ssTableReader, level);
-                                manifest.add(ssTableReader, level);
-                            }
+                            logger.debug("Loading {} at L{}", ssTableReader, 
level);
+                            manifest.add(ssTableReader, level);
                         }
                     }
                 }
             }
         }
-        catch (IOException e)
+        catch (Exception e)
         {
-            throw new IOError(e);
+            // TODO try to recover -old first
+            logger.error("Manifest present but corrupt. Cassandra will compact 
levels from scratch", e);
         }
     }
 
@@ -140,8 +149,9 @@ public class LeveledManifest
 
     public synchronized void promote(Iterable<SSTableReader> removed, 
Iterable<SSTableReader> added)
     {
-        logger.debug("Replacing [{}] with [{}]", 
StringUtils.join(removed.iterator(), ", "), StringUtils.join(added.iterator(), 
", "));
-        
+        if (logger.isDebugEnabled())
+            logger.debug((Iterables.isEmpty(added) ? "Removing [" : "Replacing 
[") + toString(removed) + "]");
+
         // the level for the added sstables is the max of the removed ones,
         // plus one if the removed were all on the same level
         int minimumLevel = Integer.MAX_VALUE;
@@ -160,16 +170,25 @@ public class LeveledManifest
 
         int newLevel = minimumLevel == maximumLevel ? maximumLevel + 1 : 
maximumLevel;
         newLevel = skipLevels(newLevel, added);
+        assert newLevel > 0;
+        logger.debug("Adding [{}] at L{}", StringUtils.join(added.iterator(), 
", "), newLevel);
 
         lastCompactedKeys[minimumLevel] = 
SSTable.sstableOrdering.max(added).last;
-        logger.debug("Adding [{}] to L{}",
-                     StringUtils.join(added.iterator(), ", "), newLevel);
         for (SSTableReader ssTableReader : added)
             add(ssTableReader, newLevel);
 
         serialize();
     }
 
+    private String toString(Iterable<SSTableReader> sstables)
+    {
+        StringBuilder builder = new StringBuilder();
+        for (SSTableReader sstable : sstables)
+        {
+            builder.append(sstable.toString()).append(" 
(L").append(levelOf(sstable)).append("), ");
+        }
+        return builder.toString();
+    }
 
     private double maxBytesForLevel (int level)
     {
@@ -182,28 +201,47 @@ public class LeveledManifest
     {
         logDistribution();
 
-        double bestScore = -1;
-        int bestLevel = -1;
-        for (int level = 0; level < generations.length; level++)
+        // LevelDB gives each level a score of how much data it contains vs 
its ideal amount, and
+        // compacts the level with the highest score. But this falls apart 
spectacularly once you
+        // get behind.  Consider this set of levels:
+        // L0: 988 [ideal: 4]
+        // L1: 117 [ideal: 10]
+        // L2: 12  [ideal: 100]
+        //
+        // The problem is that L0 has a much higher score (almost 250) than L1 
(11), so what we'll
+        // do is compact a batch of MAX_COMPACTING_L0 sstables with all 117 L1 
sstables, and put the
+        // result (say, 120 sstables) in L1. Then we'll compact the next batch 
of MAX_COMPACTING_L0,
+        // and so forth.  So we spend most of our i/o rewriting the L1 data 
with each batch.
+        //
+        // If we could just do *all* L0 a single time with L1, that would be 
ideal.  But we can't
+        // -- see the javadoc for MAX_COMPACTING_L0.
+        //
+        // LevelDB's way around this is to simply block writes if L0 
compaction falls behind.
+        // We don't have that luxury.
+        //
+        // So instead, we force compacting higher levels first.  This may not 
minimize the number
+        // of reads done as quickly in the short term, but it minimizes the 
i/o needed to compact
+        // optimially which gives us a long term win.
+        for (int i = generations.length - 1; i >= 0; i--)
         {
-            List<SSTableReader> sstables = generations[level];
+            List<SSTableReader> sstables = generations[i];
             if (sstables.isEmpty())
-                continue;
-
-            double score = SSTableReader.getTotalBytes(sstables) / 
maxBytesForLevel(level);
-            //if we're idle and we don't have anything better to do schedule a 
compaction for L0
-            //by setting its threshold to some very low value
-            score = (level == 0 && score < 1) ? 1.001 : 0;
-            logger.debug("Compaction score for level {} is {}", level, score);
-            if (score > bestScore)
+                continue; // mostly this just avoids polluting the debug log 
with zero scores
+            double score = SSTableReader.getTotalBytes(sstables) / 
maxBytesForLevel(i);
+            logger.debug("Compaction score for level {} is {}", i, score);
+
+            // L0 gets a special case that if we don't have anything more 
important to do,
+            // we'll go ahead and compact even just one sstable
+            if (score > 1 || i == 0)
             {
-                bestScore = score;
-                bestLevel = level;
+                Collection<SSTableReader> candidates = getCandidatesFor(i);
+                if (logger.isDebugEnabled())
+                    logger.debug("Compaction candidates for L{} are {}", i, 
toString(candidates));
+                return candidates;
             }
         }
 
-        // if we have met at least one of our thresholds then trigger a 
compaction
-        return bestScore > 1 ? getCandidatesFor(bestLevel) : 
Collections.<SSTableReader>emptyList();
+        return Collections.emptyList();
     }
 
     public int getLevelSize(int i)
@@ -215,7 +253,13 @@ public class LeveledManifest
     public void logDistribution()
     {
         for (int i = 0; i < generations.length; i++)
-            logger.debug("Level {} contains {} SSTables", i, 
generations[i].size());
+        {
+            if (!generations[i].isEmpty())
+            {
+                logger.debug("L{} contains {} SSTables ({} bytes) in {}",
+                             new Object[] {i, generations[i].size(), 
SSTableReader.getTotalBytes(generations[i]), this});
+            }
+        }
     }
 
     private int levelOf(SSTableReader sstable)
@@ -258,25 +302,21 @@ public class LeveledManifest
     private Collection<SSTableReader> getCandidatesFor(int level)
     {
         assert !generations[level].isEmpty();
+        logger.debug("Choosing candidates for L{}", level);
 
         if (level == 0)
         {
             // because L0 files may overlap each other, we treat compactions 
there specially:
             // a L0 compaction also checks other L0 files for overlap.
             Set<SSTableReader> candidates = new HashSet<SSTableReader>();
-            Set<SSTableReader> remaining = new 
HashSet<SSTableReader>(generations[0]);
-
-            while (!remaining.isEmpty())
-            {
-                // pick a random sstable from L0, and any that overlap with it
-                List<SSTableReader> L0 = 
overlapping(remaining.iterator().next(), remaining);
-                // add the overlapping ones from L1
-                for (SSTableReader sstable : L0)
-                {
-                    candidates.addAll(overlapping(sstable, generations[1]));
-                    remaining.remove(sstable);
-                }
-            }
+            // pick the oldest sstable from L0, and any that overlap with it
+            List<SSTableReader> ageSortedSSTables = new 
ArrayList<SSTableReader>(generations[0]);
+            Collections.sort(ageSortedSSTables, 
SSTable.maxTimestampComparator);
+            List<SSTableReader> L0 = overlapping(ageSortedSSTables.get(0), 
generations[0]);
+            L0 = L0.size() > MAX_COMPACTING_L0 ? L0.subList(0, 
MAX_COMPACTING_L0) : L0;
+            // add the overlapping ones from L1
+            for (SSTableReader sstable : L0)
+                candidates.addAll(overlapping(sstable, generations[1]));
             return candidates;
         }
 
@@ -294,18 +334,16 @@ public class LeveledManifest
 
     public synchronized void serialize()
     {
-        String dataFileLocation = getDataFilePrefix(cfs);
-        String tempManifestFileName = dataFileLocation + 
cfs.getColumnFamilyName() + "-" + "tmp.json";
-        String manifestFileName = dataFileLocation + cfs.getColumnFamilyName() 
+ ".json";
-        String oldManifestFileName = dataFileLocation + 
cfs.getColumnFamilyName() + "-" + "old.json";
-
-        File tmpManifest = new File(tempManifestFileName);
+        File manifestFile = tryGetManifest(cfs);
+        if (manifestFile == null)
+            manifestFile = new File(new 
File(DatabaseDescriptor.getAllDataFileLocations()[0], cfs.table.name), 
cfs.columnFamily + ".json");
+        File oldFile = new File(manifestFile.getPath().replace(".json", 
"-old.json"));
+        File tmpFile = new File(manifestFile.getPath().replace(".json", 
"-tmp.json"));
 
         JsonFactory f = new JsonFactory();
-
         try
         {
-            JsonGenerator g = f.createJsonGenerator(tmpManifest, 
JsonEncoding.UTF8);
+            JsonGenerator g = f.createJsonGenerator(tmpFile, 
JsonEncoding.UTF8);
             g.useDefaultPrettyPrinter();
             g.writeStartObject();
             g.writeArrayFieldStart("generations");
@@ -323,36 +361,39 @@ public class LeveledManifest
             g.writeEndArray(); // for field generations
             g.writeEndObject(); // write global object
             g.close();
+
+            if (oldFile.exists() && manifestFile.exists())
+                FileUtils.deleteWithConfirm(oldFile);
+            if (manifestFile.exists())
+                FileUtils.renameWithConfirm(manifestFile, oldFile);
+            assert tmpFile.exists();
+            FileUtils.renameWithConfirm(tmpFile, manifestFile);
+            logger.debug("Saved manifest {}", manifestFile);
         }
         catch (IOException e)
         {
-            e.printStackTrace();
+            throw new IOError(e);
         }
-        File oldFile = new File(oldManifestFileName);
-        if (oldFile.exists())
-            oldFile.delete();
-        File currentManifest = new File(manifestFileName);
-        if (currentManifest.exists())
-            currentManifest.renameTo(new File(oldManifestFileName));
-        if (tmpManifest.exists())
-            tmpManifest.renameTo(new File(manifestFileName));
     }
 
     public static File tryGetManifest(ColumnFamilyStore cfs)
     {
-        for (String dataFileLocation : 
DatabaseDescriptor.getAllDataFileLocations())
+        for (String dir : DatabaseDescriptor.getAllDataFileLocations())
         {
-            dataFileLocation = getDataFilePrefix(cfs);
-            String manifestFileName = dataFileLocation + 
System.getProperty("file.separator") + cfs.table.name + ".json";
-            File manifestFile = new File(manifestFileName);
+            File manifestFile = new File(new File(dir, cfs.table.name), 
cfs.columnFamily + ".json");
             if (manifestFile.exists())
+            {
+                logger.debug("Loading manifest from {}", manifestFile);
                 return manifestFile;
+            }
         }
+        logger.debug("No level manifest found");
         return null;
     }
 
-    public static String getDataFilePrefix(ColumnFamilyStore cfs)
+    @Override
+    public String toString()
     {
-        return DatabaseDescriptor.getAllDataFileLocations()[0] + 
System.getProperty("file.separator") + cfs.table.name + 
System.getProperty("file.separator");
+        return "Manifest@" + hashCode();
     }
 }


Reply via email to