Author: jbellis Date: Tue Sep 20 13:13:04 2011 New Revision: 1173131 URL: http://svn.apache.org/viewvc?rev=1173131&view=rev Log: LeveledCompactionStrategy fixes patch by jbellis; tested by brandonwilliams for CASSANDRA-3224
Modified: cassandra/branches/cassandra-1.0.0/CHANGES.txt cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java Modified: cassandra/branches/cassandra-1.0.0/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/CHANGES.txt?rev=1173131&r1=1173130&r2=1173131&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0.0/CHANGES.txt (original) +++ cassandra/branches/cassandra-1.0.0/CHANGES.txt Tue Sep 20 13:13:04 2011 @@ -10,6 +10,8 @@ true) and set default badness threshold to 0.1 (CASSANDRA-3229) * Base choice of random or "balanced" token on bootstrap on whether schema definitions were found (CASSANDRA-3219) + * Fixes for LeveledCompactionStrategy score computation, prioritization, + and scheduling (CASSANDRA-3224) 1.0.0-beta1 Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1173131&r1=1173130&r2=1173131&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Sep 20 13:13:04 2011 @@ -328,7 +328,7 @@ public class ColumnFamilyStore implement */ public static void scrubDataDirectories(String table, String columnFamily) { - logger.info("Removing compacted SSTable files (see http://wiki.apache.org/cassandra/MemtableSSTable)"); + logger.info("Removing compacted SSTable files from " + columnFamily + " (see http://wiki.apache.org/cassandra/MemtableSSTable)"); for (Map.Entry<Descriptor,Set<Component>> sstableFiles : files(table, columnFamily, true, true).entrySet()) { Descriptor desc = sstableFiles.getKey(); Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java?rev=1173131&r1=1173130&r2=1173131&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java (original) +++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java Tue Sep 20 13:13:04 2011 @@ -76,9 +76,20 @@ public class LeveledCompactionStrategy e logger.info(this + " subscribed to the data tracker."); manifest = LeveledManifest.create(cfs, this.maxSSTableSize); + logger.debug("Created {}", manifest); // override min/max for this strategy cfs.setMaximumCompactionThreshold(Integer.MAX_VALUE); cfs.setMinimumCompactionThreshold(1); + + // TODO this is redundant wrt the kickoff in AbstractCompactionStrategy, once CASSANDRA-X is done + Runnable runnable = new Runnable() + { + public void run() + { + CompactionManager.instance.submitBackground(LeveledCompactionStrategy.this.cfs); + } + }; + StorageService.optionalTasks.scheduleAtFixedRate(runnable, 5 * 60, 5, TimeUnit.SECONDS); } public void shutdown() @@ -96,12 +107,17 @@ public class LeveledCompactionStrategy e { LeveledCompactionTask currentTask = task.get(); if (currentTask != null && !currentTask.isDone()) + { + logger.debug("Compaction still in progress for {}", this); return Collections.emptyList(); + } Collection<SSTableReader> sstables = manifest.getCompactionCandidates(); - logger.debug("CompactionManager candidates are {}", StringUtils.join(sstables, ",")); if (sstables.isEmpty()) + { + logger.debug("No compaction necessary for {}", this); return Collections.emptyList(); + } LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, sstables, gcBefore, this.maxSSTableSize); return task.compareAndSet(currentTask, newTask) @@ -139,4 +155,10 @@ public class LeveledCompactionStrategy e manifest.logDistribution(); } } + + @Override + public String toString() + { + return String.format("LCS@%d(%s)", hashCode(), cfs.columnFamily); + } } Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java?rev=1173131&r1=1173130&r2=1173131&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java (original) +++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java Tue Sep 20 13:13:04 2011 @@ -26,6 +26,7 @@ import java.io.IOError; import java.io.IOException; import java.util.*; +import com.google.common.collect.Iterables; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -37,6 +38,7 @@ import org.apache.cassandra.db.Decorated import org.apache.cassandra.dht.Range; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.util.FileUtils; import org.codehaus.jackson.JsonEncoding; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonGenerator; @@ -47,6 +49,13 @@ public class LeveledManifest { private static final Logger logger = LoggerFactory.getLogger(LeveledCompactionStrategy.class); + /** + * limit the number of L0 sstables we do at once, because compaction bloom filter creation + * uses a pessimistic estimate of how many keys overlap (none), so we risk wasting memory + * or even OOMing when compacting highly overlapping sstables + */ + private static int MAX_COMPACTING_L0 = 32; + private final ColumnFamilyStore cfs; private final List<SSTableReader>[] generations; private final DecoratedKey[] lastCompactedKeys; @@ -85,37 +94,37 @@ public class LeveledManifest private static void load(ColumnFamilyStore cfs, LeveledManifest manifest) { + File manifestFile = tryGetManifest(cfs); + if (manifestFile == null) + return; + ObjectMapper m = new ObjectMapper(); try { - File manifestFile = tryGetManifest(cfs); - - if (manifestFile != null && manifestFile.exists()) + JsonNode rootNode = m.readValue(manifestFile, JsonNode.class); + JsonNode generations = rootNode.get("generations"); + assert generations.isArray(); + for (JsonNode generation : generations) { - JsonNode rootNode = m.readValue(manifestFile, JsonNode.class); - JsonNode generations = rootNode.get("generations"); - assert generations.isArray(); - for (JsonNode generation : generations) + int level = generation.get("generation").getIntValue(); + JsonNode generationValues = generation.get("members"); + for (JsonNode generationValue : generationValues) { - int level = generation.get("generation").getIntValue(); - JsonNode generationValues = generation.get("members"); - for (JsonNode generationValue : generationValues) + for (SSTableReader ssTableReader : cfs.getSSTables()) { - for (SSTableReader ssTableReader : cfs.getSSTables()) + if (ssTableReader.descriptor.generation == generationValue.getIntValue()) { - if (ssTableReader.descriptor.generation == generationValue.getIntValue()) - { - logger.debug("Loading {} at L{}", ssTableReader, level); - manifest.add(ssTableReader, level); - } + logger.debug("Loading {} at L{}", ssTableReader, level); + manifest.add(ssTableReader, level); } } } } } - catch (IOException e) + catch (Exception e) { - throw new IOError(e); + // TODO try to recover -old first + logger.error("Manifest present but corrupt. Cassandra will compact levels from scratch", e); } } @@ -140,8 +149,9 @@ public class LeveledManifest public synchronized void promote(Iterable<SSTableReader> removed, Iterable<SSTableReader> added) { - logger.debug("Replacing [{}] with [{}]", StringUtils.join(removed.iterator(), ", "), StringUtils.join(added.iterator(), ", ")); - + if (logger.isDebugEnabled()) + logger.debug((Iterables.isEmpty(added) ? "Removing [" : "Replacing [") + toString(removed) + "]"); + // the level for the added sstables is the max of the removed ones, // plus one if the removed were all on the same level int minimumLevel = Integer.MAX_VALUE; @@ -160,16 +170,25 @@ public class LeveledManifest int newLevel = minimumLevel == maximumLevel ? maximumLevel + 1 : maximumLevel; newLevel = skipLevels(newLevel, added); + assert newLevel > 0; + logger.debug("Adding [{}] at L{}", StringUtils.join(added.iterator(), ", "), newLevel); lastCompactedKeys[minimumLevel] = SSTable.sstableOrdering.max(added).last; - logger.debug("Adding [{}] to L{}", - StringUtils.join(added.iterator(), ", "), newLevel); for (SSTableReader ssTableReader : added) add(ssTableReader, newLevel); serialize(); } + private String toString(Iterable<SSTableReader> sstables) + { + StringBuilder builder = new StringBuilder(); + for (SSTableReader sstable : sstables) + { + builder.append(sstable.toString()).append(" (L").append(levelOf(sstable)).append("), "); + } + return builder.toString(); + } private double maxBytesForLevel (int level) { @@ -182,28 +201,47 @@ public class LeveledManifest { logDistribution(); - double bestScore = -1; - int bestLevel = -1; - for (int level = 0; level < generations.length; level++) + // LevelDB gives each level a score of how much data it contains vs its ideal amount, and + // compacts the level with the highest score. But this falls apart spectacularly once you + // get behind. Consider this set of levels: + // L0: 988 [ideal: 4] + // L1: 117 [ideal: 10] + // L2: 12 [ideal: 100] + // + // The problem is that L0 has a much higher score (almost 250) than L1 (11), so what we'll + // do is compact a batch of MAX_COMPACTING_L0 sstables with all 117 L1 sstables, and put the + // result (say, 120 sstables) in L1. Then we'll compact the next batch of MAX_COMPACTING_L0, + // and so forth. So we spend most of our i/o rewriting the L1 data with each batch. + // + // If we could just do *all* L0 a single time with L1, that would be ideal. But we can't + // -- see the javadoc for MAX_COMPACTING_L0. + // + // LevelDB's way around this is to simply block writes if L0 compaction falls behind. + // We don't have that luxury. + // + // So instead, we force compacting higher levels first. This may not minimize the number + // of reads done as quickly in the short term, but it minimizes the i/o needed to compact + // optimially which gives us a long term win. + for (int i = generations.length - 1; i >= 0; i--) { - List<SSTableReader> sstables = generations[level]; + List<SSTableReader> sstables = generations[i]; if (sstables.isEmpty()) - continue; - - double score = SSTableReader.getTotalBytes(sstables) / maxBytesForLevel(level); - //if we're idle and we don't have anything better to do schedule a compaction for L0 - //by setting its threshold to some very low value - score = (level == 0 && score < 1) ? 1.001 : 0; - logger.debug("Compaction score for level {} is {}", level, score); - if (score > bestScore) + continue; // mostly this just avoids polluting the debug log with zero scores + double score = SSTableReader.getTotalBytes(sstables) / maxBytesForLevel(i); + logger.debug("Compaction score for level {} is {}", i, score); + + // L0 gets a special case that if we don't have anything more important to do, + // we'll go ahead and compact even just one sstable + if (score > 1 || i == 0) { - bestScore = score; - bestLevel = level; + Collection<SSTableReader> candidates = getCandidatesFor(i); + if (logger.isDebugEnabled()) + logger.debug("Compaction candidates for L{} are {}", i, toString(candidates)); + return candidates; } } - // if we have met at least one of our thresholds then trigger a compaction - return bestScore > 1 ? getCandidatesFor(bestLevel) : Collections.<SSTableReader>emptyList(); + return Collections.emptyList(); } public int getLevelSize(int i) @@ -215,7 +253,13 @@ public class LeveledManifest public void logDistribution() { for (int i = 0; i < generations.length; i++) - logger.debug("Level {} contains {} SSTables", i, generations[i].size()); + { + if (!generations[i].isEmpty()) + { + logger.debug("L{} contains {} SSTables ({} bytes) in {}", + new Object[] {i, generations[i].size(), SSTableReader.getTotalBytes(generations[i]), this}); + } + } } private int levelOf(SSTableReader sstable) @@ -258,25 +302,21 @@ public class LeveledManifest private Collection<SSTableReader> getCandidatesFor(int level) { assert !generations[level].isEmpty(); + logger.debug("Choosing candidates for L{}", level); if (level == 0) { // because L0 files may overlap each other, we treat compactions there specially: // a L0 compaction also checks other L0 files for overlap. Set<SSTableReader> candidates = new HashSet<SSTableReader>(); - Set<SSTableReader> remaining = new HashSet<SSTableReader>(generations[0]); - - while (!remaining.isEmpty()) - { - // pick a random sstable from L0, and any that overlap with it - List<SSTableReader> L0 = overlapping(remaining.iterator().next(), remaining); - // add the overlapping ones from L1 - for (SSTableReader sstable : L0) - { - candidates.addAll(overlapping(sstable, generations[1])); - remaining.remove(sstable); - } - } + // pick the oldest sstable from L0, and any that overlap with it + List<SSTableReader> ageSortedSSTables = new ArrayList<SSTableReader>(generations[0]); + Collections.sort(ageSortedSSTables, SSTable.maxTimestampComparator); + List<SSTableReader> L0 = overlapping(ageSortedSSTables.get(0), generations[0]); + L0 = L0.size() > MAX_COMPACTING_L0 ? L0.subList(0, MAX_COMPACTING_L0) : L0; + // add the overlapping ones from L1 + for (SSTableReader sstable : L0) + candidates.addAll(overlapping(sstable, generations[1])); return candidates; } @@ -294,18 +334,16 @@ public class LeveledManifest public synchronized void serialize() { - String dataFileLocation = getDataFilePrefix(cfs); - String tempManifestFileName = dataFileLocation + cfs.getColumnFamilyName() + "-" + "tmp.json"; - String manifestFileName = dataFileLocation + cfs.getColumnFamilyName() + ".json"; - String oldManifestFileName = dataFileLocation + cfs.getColumnFamilyName() + "-" + "old.json"; - - File tmpManifest = new File(tempManifestFileName); + File manifestFile = tryGetManifest(cfs); + if (manifestFile == null) + manifestFile = new File(new File(DatabaseDescriptor.getAllDataFileLocations()[0], cfs.table.name), cfs.columnFamily + ".json"); + File oldFile = new File(manifestFile.getPath().replace(".json", "-old.json")); + File tmpFile = new File(manifestFile.getPath().replace(".json", "-tmp.json")); JsonFactory f = new JsonFactory(); - try { - JsonGenerator g = f.createJsonGenerator(tmpManifest, JsonEncoding.UTF8); + JsonGenerator g = f.createJsonGenerator(tmpFile, JsonEncoding.UTF8); g.useDefaultPrettyPrinter(); g.writeStartObject(); g.writeArrayFieldStart("generations"); @@ -323,36 +361,39 @@ public class LeveledManifest g.writeEndArray(); // for field generations g.writeEndObject(); // write global object g.close(); + + if (oldFile.exists() && manifestFile.exists()) + FileUtils.deleteWithConfirm(oldFile); + if (manifestFile.exists()) + FileUtils.renameWithConfirm(manifestFile, oldFile); + assert tmpFile.exists(); + FileUtils.renameWithConfirm(tmpFile, manifestFile); + logger.debug("Saved manifest {}", manifestFile); } catch (IOException e) { - e.printStackTrace(); + throw new IOError(e); } - File oldFile = new File(oldManifestFileName); - if (oldFile.exists()) - oldFile.delete(); - File currentManifest = new File(manifestFileName); - if (currentManifest.exists()) - currentManifest.renameTo(new File(oldManifestFileName)); - if (tmpManifest.exists()) - tmpManifest.renameTo(new File(manifestFileName)); } public static File tryGetManifest(ColumnFamilyStore cfs) { - for (String dataFileLocation : DatabaseDescriptor.getAllDataFileLocations()) + for (String dir : DatabaseDescriptor.getAllDataFileLocations()) { - dataFileLocation = getDataFilePrefix(cfs); - String manifestFileName = dataFileLocation + System.getProperty("file.separator") + cfs.table.name + ".json"; - File manifestFile = new File(manifestFileName); + File manifestFile = new File(new File(dir, cfs.table.name), cfs.columnFamily + ".json"); if (manifestFile.exists()) + { + logger.debug("Loading manifest from {}", manifestFile); return manifestFile; + } } + logger.debug("No level manifest found"); return null; } - public static String getDataFilePrefix(ColumnFamilyStore cfs) + @Override + public String toString() { - return DatabaseDescriptor.getAllDataFileLocations()[0] + System.getProperty("file.separator") + cfs.table.name + System.getProperty("file.separator"); + return "Manifest@" + hashCode(); } }