Author: jbellis Date: Tue Sep 20 13:19:22 2011 New Revision: 1173134 URL: http://svn.apache.org/viewvc?rev=1173134&view=rev Log: merge #3224 from 1.0.0
Modified: cassandra/branches/cassandra-1.0/ (props changed) cassandra/branches/cassandra-1.0/CHANGES.txt cassandra/branches/cassandra-1.0/build.xml cassandra/branches/cassandra-1.0/contrib/ (props changed) cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed) cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed) cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed) cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed) cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed) cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/TokenMetadata.java Propchange: cassandra/branches/cassandra-1.0/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Sep 20 13:19:22 2011 @@ -5,7 +5,7 @@ /cassandra/branches/cassandra-0.8.0:1125021-1130369 /cassandra/branches/cassandra-0.8.1:1101014-1125018 /cassandra/branches/cassandra-1.0:1167106,1167185 -/cassandra/branches/cassandra-1.0.0:1167104-1172718 +/cassandra/branches/cassandra-1.0.0:1167104-1173133 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020 /cassandra/trunk:1167085-1167102,1169870 Modified: cassandra/branches/cassandra-1.0/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1173134&r1=1173133&r2=1173134&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/CHANGES.txt (original) +++ cassandra/branches/cassandra-1.0/CHANGES.txt Tue Sep 20 13:19:22 2011 @@ -14,6 +14,8 @@ true) and set default badness threshold to 0.1 (CASSANDRA-3229) * Base choice of random or "balanced" token on bootstrap on whether schema definitions were found (CASSANDRA-3219) + * Fixes for LeveledCompactionStrategy score computation, prioritization, + and scheduling (CASSANDRA-3224) 1.0.0-beta1 Modified: cassandra/branches/cassandra-1.0/build.xml URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/build.xml?rev=1173134&r1=1173133&r2=1173134&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/build.xml (original) +++ cassandra/branches/cassandra-1.0/build.xml Tue Sep 20 13:19:22 2011 @@ -839,6 +839,7 @@ url=${svn.entry.url}?pathrev=${svn.entry <fileset dir="${build.dir}"> <include name="${final.name}.jar" /> <include name="${ant.project.name}-thrift-${version}.jar" /> + <include name="${ant.project.name}-clientutil-${version}.jar" /> </fileset> </copy> <copy todir="${dist.dir}/javadoc"> Propchange: cassandra/branches/cassandra-1.0/contrib/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Sep 20 13:19:22 2011 @@ -5,7 +5,7 @@ /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018 /cassandra/branches/cassandra-1.0/contrib:1167106,1167185 -/cassandra/branches/cassandra-1.0.0/contrib:1167104-1172718 +/cassandra/branches/cassandra-1.0.0/contrib:1167104-1173133 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020 /cassandra/trunk/contrib:1167085-1167102,1169870 Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Sep 20 13:19:22 2011 @@ -5,7 +5,7 @@ /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167106,1167185 -/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1172718 +/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1173133 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1167102,1169870 Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Sep 20 13:19:22 2011 @@ -5,7 +5,7 @@ /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167106,1167185 -/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1172718 +/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1173133 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1167102,1169870 Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Sep 20 13:19:22 2011 @@ -5,7 +5,7 @@ /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167106,1167185 -/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1172718 +/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1173133 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1167102,1169870 Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Sep 20 13:19:22 2011 @@ -5,7 +5,7 @@ /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167106,1167185 -/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1172718 +/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1173133 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1167102,1169870 Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Sep 20 13:19:22 2011 @@ -5,7 +5,7 @@ /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167106,1167185 -/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1172718 +/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1173133 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1167102,1169870 Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1173134&r1=1173133&r2=1173134&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Sep 20 13:19:22 2011 @@ -328,7 +328,7 @@ public class ColumnFamilyStore implement */ public static void scrubDataDirectories(String table, String columnFamily) { - logger.info("Removing compacted SSTable files (see http://wiki.apache.org/cassandra/MemtableSSTable)"); + logger.info("Removing compacted SSTable files from " + columnFamily + " (see http://wiki.apache.org/cassandra/MemtableSSTable)"); for (Map.Entry<Descriptor,Set<Component>> sstableFiles : files(table, columnFamily, true, true).entrySet()) { Descriptor desc = sstableFiles.getKey(); Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java?rev=1173134&r1=1173133&r2=1173134&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java Tue Sep 20 13:19:22 2011 @@ -76,9 +76,20 @@ public class LeveledCompactionStrategy e logger.info(this + " subscribed to the data tracker."); manifest = LeveledManifest.create(cfs, this.maxSSTableSize); + logger.debug("Created {}", manifest); // override min/max for this strategy cfs.setMaximumCompactionThreshold(Integer.MAX_VALUE); cfs.setMinimumCompactionThreshold(1); + + // TODO this is redundant wrt the kickoff in AbstractCompactionStrategy, once CASSANDRA-X is done + Runnable runnable = new Runnable() + { + public void run() + { + CompactionManager.instance.submitBackground(LeveledCompactionStrategy.this.cfs); + } + }; + StorageService.optionalTasks.scheduleAtFixedRate(runnable, 5 * 60, 5, TimeUnit.SECONDS); } public void shutdown() @@ -96,12 +107,17 @@ public class LeveledCompactionStrategy e { LeveledCompactionTask currentTask = task.get(); if (currentTask != null && !currentTask.isDone()) + { + logger.debug("Compaction still in progress for {}", this); return Collections.emptyList(); + } Collection<SSTableReader> sstables = manifest.getCompactionCandidates(); - logger.debug("CompactionManager candidates are {}", StringUtils.join(sstables, ",")); if (sstables.isEmpty()) + { + logger.debug("No compaction necessary for {}", this); return Collections.emptyList(); + } LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, sstables, gcBefore, this.maxSSTableSize); return task.compareAndSet(currentTask, newTask) @@ -139,4 +155,10 @@ public class LeveledCompactionStrategy e manifest.logDistribution(); } } + + @Override + public String toString() + { + return String.format("LCS@%d(%s)", hashCode(), cfs.columnFamily); + } } Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java?rev=1173134&r1=1173133&r2=1173134&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java Tue Sep 20 13:19:22 2011 @@ -26,6 +26,7 @@ import java.io.IOError; import java.io.IOException; import java.util.*; +import com.google.common.collect.Iterables; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -37,6 +38,7 @@ import org.apache.cassandra.db.Decorated import org.apache.cassandra.dht.Range; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.util.FileUtils; import org.codehaus.jackson.JsonEncoding; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonGenerator; @@ -47,6 +49,13 @@ public class LeveledManifest { private static final Logger logger = LoggerFactory.getLogger(LeveledCompactionStrategy.class); + /** + * limit the number of L0 sstables we do at once, because compaction bloom filter creation + * uses a pessimistic estimate of how many keys overlap (none), so we risk wasting memory + * or even OOMing when compacting highly overlapping sstables + */ + private static int MAX_COMPACTING_L0 = 32; + private final ColumnFamilyStore cfs; private final List<SSTableReader>[] generations; private final DecoratedKey[] lastCompactedKeys; @@ -85,37 +94,37 @@ public class LeveledManifest private static void load(ColumnFamilyStore cfs, LeveledManifest manifest) { + File manifestFile = tryGetManifest(cfs); + if (manifestFile == null) + return; + ObjectMapper m = new ObjectMapper(); try { - File manifestFile = tryGetManifest(cfs); - - if (manifestFile != null && manifestFile.exists()) + JsonNode rootNode = m.readValue(manifestFile, JsonNode.class); + JsonNode generations = rootNode.get("generations"); + assert generations.isArray(); + for (JsonNode generation : generations) { - JsonNode rootNode = m.readValue(manifestFile, JsonNode.class); - JsonNode generations = rootNode.get("generations"); - assert generations.isArray(); - for (JsonNode generation : generations) + int level = generation.get("generation").getIntValue(); + JsonNode generationValues = generation.get("members"); + for (JsonNode generationValue : generationValues) { - int level = generation.get("generation").getIntValue(); - JsonNode generationValues = generation.get("members"); - for (JsonNode generationValue : generationValues) + for (SSTableReader ssTableReader : cfs.getSSTables()) { - for (SSTableReader ssTableReader : cfs.getSSTables()) + if (ssTableReader.descriptor.generation == generationValue.getIntValue()) { - if (ssTableReader.descriptor.generation == generationValue.getIntValue()) - { - logger.debug("Loading {} at L{}", ssTableReader, level); - manifest.add(ssTableReader, level); - } + logger.debug("Loading {} at L{}", ssTableReader, level); + manifest.add(ssTableReader, level); } } } } } - catch (IOException e) + catch (Exception e) { - throw new IOError(e); + // TODO try to recover -old first + logger.error("Manifest present but corrupt. Cassandra will compact levels from scratch", e); } } @@ -140,8 +149,9 @@ public class LeveledManifest public synchronized void promote(Iterable<SSTableReader> removed, Iterable<SSTableReader> added) { - logger.debug("Replacing [{}] with [{}]", StringUtils.join(removed.iterator(), ", "), StringUtils.join(added.iterator(), ", ")); - + if (logger.isDebugEnabled()) + logger.debug((Iterables.isEmpty(added) ? "Removing [" : "Replacing [") + toString(removed) + "]"); + // the level for the added sstables is the max of the removed ones, // plus one if the removed were all on the same level int minimumLevel = Integer.MAX_VALUE; @@ -160,16 +170,25 @@ public class LeveledManifest int newLevel = minimumLevel == maximumLevel ? maximumLevel + 1 : maximumLevel; newLevel = skipLevels(newLevel, added); + assert newLevel > 0; + logger.debug("Adding [{}] at L{}", StringUtils.join(added.iterator(), ", "), newLevel); lastCompactedKeys[minimumLevel] = SSTable.sstableOrdering.max(added).last; - logger.debug("Adding [{}] to L{}", - StringUtils.join(added.iterator(), ", "), newLevel); for (SSTableReader ssTableReader : added) add(ssTableReader, newLevel); serialize(); } + private String toString(Iterable<SSTableReader> sstables) + { + StringBuilder builder = new StringBuilder(); + for (SSTableReader sstable : sstables) + { + builder.append(sstable.toString()).append(" (L").append(levelOf(sstable)).append("), "); + } + return builder.toString(); + } private double maxBytesForLevel (int level) { @@ -182,28 +201,47 @@ public class LeveledManifest { logDistribution(); - double bestScore = -1; - int bestLevel = -1; - for (int level = 0; level < generations.length; level++) + // LevelDB gives each level a score of how much data it contains vs its ideal amount, and + // compacts the level with the highest score. But this falls apart spectacularly once you + // get behind. Consider this set of levels: + // L0: 988 [ideal: 4] + // L1: 117 [ideal: 10] + // L2: 12 [ideal: 100] + // + // The problem is that L0 has a much higher score (almost 250) than L1 (11), so what we'll + // do is compact a batch of MAX_COMPACTING_L0 sstables with all 117 L1 sstables, and put the + // result (say, 120 sstables) in L1. Then we'll compact the next batch of MAX_COMPACTING_L0, + // and so forth. So we spend most of our i/o rewriting the L1 data with each batch. + // + // If we could just do *all* L0 a single time with L1, that would be ideal. But we can't + // -- see the javadoc for MAX_COMPACTING_L0. + // + // LevelDB's way around this is to simply block writes if L0 compaction falls behind. + // We don't have that luxury. + // + // So instead, we force compacting higher levels first. This may not minimize the number + // of reads done as quickly in the short term, but it minimizes the i/o needed to compact + // optimially which gives us a long term win. + for (int i = generations.length - 1; i >= 0; i--) { - List<SSTableReader> sstables = generations[level]; + List<SSTableReader> sstables = generations[i]; if (sstables.isEmpty()) - continue; - - double score = SSTableReader.getTotalBytes(sstables) / maxBytesForLevel(level); - //if we're idle and we don't have anything better to do schedule a compaction for L0 - //by setting its threshold to some very low value - score = (level == 0 && score < 1) ? 1.001 : 0; - logger.debug("Compaction score for level {} is {}", level, score); - if (score > bestScore) + continue; // mostly this just avoids polluting the debug log with zero scores + double score = SSTableReader.getTotalBytes(sstables) / maxBytesForLevel(i); + logger.debug("Compaction score for level {} is {}", i, score); + + // L0 gets a special case that if we don't have anything more important to do, + // we'll go ahead and compact even just one sstable + if (score > 1 || i == 0) { - bestScore = score; - bestLevel = level; + Collection<SSTableReader> candidates = getCandidatesFor(i); + if (logger.isDebugEnabled()) + logger.debug("Compaction candidates for L{} are {}", i, toString(candidates)); + return candidates; } } - // if we have met at least one of our thresholds then trigger a compaction - return bestScore > 1 ? getCandidatesFor(bestLevel) : Collections.<SSTableReader>emptyList(); + return Collections.emptyList(); } public int getLevelSize(int i) @@ -215,7 +253,13 @@ public class LeveledManifest public void logDistribution() { for (int i = 0; i < generations.length; i++) - logger.debug("Level {} contains {} SSTables", i, generations[i].size()); + { + if (!generations[i].isEmpty()) + { + logger.debug("L{} contains {} SSTables ({} bytes) in {}", + new Object[] {i, generations[i].size(), SSTableReader.getTotalBytes(generations[i]), this}); + } + } } private int levelOf(SSTableReader sstable) @@ -258,25 +302,21 @@ public class LeveledManifest private Collection<SSTableReader> getCandidatesFor(int level) { assert !generations[level].isEmpty(); + logger.debug("Choosing candidates for L{}", level); if (level == 0) { // because L0 files may overlap each other, we treat compactions there specially: // a L0 compaction also checks other L0 files for overlap. Set<SSTableReader> candidates = new HashSet<SSTableReader>(); - Set<SSTableReader> remaining = new HashSet<SSTableReader>(generations[0]); - - while (!remaining.isEmpty()) - { - // pick a random sstable from L0, and any that overlap with it - List<SSTableReader> L0 = overlapping(remaining.iterator().next(), remaining); - // add the overlapping ones from L1 - for (SSTableReader sstable : L0) - { - candidates.addAll(overlapping(sstable, generations[1])); - remaining.remove(sstable); - } - } + // pick the oldest sstable from L0, and any that overlap with it + List<SSTableReader> ageSortedSSTables = new ArrayList<SSTableReader>(generations[0]); + Collections.sort(ageSortedSSTables, SSTable.maxTimestampComparator); + List<SSTableReader> L0 = overlapping(ageSortedSSTables.get(0), generations[0]); + L0 = L0.size() > MAX_COMPACTING_L0 ? L0.subList(0, MAX_COMPACTING_L0) : L0; + // add the overlapping ones from L1 + for (SSTableReader sstable : L0) + candidates.addAll(overlapping(sstable, generations[1])); return candidates; } @@ -294,18 +334,16 @@ public class LeveledManifest public synchronized void serialize() { - String dataFileLocation = getDataFilePrefix(cfs); - String tempManifestFileName = dataFileLocation + cfs.getColumnFamilyName() + "-" + "tmp.json"; - String manifestFileName = dataFileLocation + cfs.getColumnFamilyName() + ".json"; - String oldManifestFileName = dataFileLocation + cfs.getColumnFamilyName() + "-" + "old.json"; - - File tmpManifest = new File(tempManifestFileName); + File manifestFile = tryGetManifest(cfs); + if (manifestFile == null) + manifestFile = new File(new File(DatabaseDescriptor.getAllDataFileLocations()[0], cfs.table.name), cfs.columnFamily + ".json"); + File oldFile = new File(manifestFile.getPath().replace(".json", "-old.json")); + File tmpFile = new File(manifestFile.getPath().replace(".json", "-tmp.json")); JsonFactory f = new JsonFactory(); - try { - JsonGenerator g = f.createJsonGenerator(tmpManifest, JsonEncoding.UTF8); + JsonGenerator g = f.createJsonGenerator(tmpFile, JsonEncoding.UTF8); g.useDefaultPrettyPrinter(); g.writeStartObject(); g.writeArrayFieldStart("generations"); @@ -323,36 +361,39 @@ public class LeveledManifest g.writeEndArray(); // for field generations g.writeEndObject(); // write global object g.close(); + + if (oldFile.exists() && manifestFile.exists()) + FileUtils.deleteWithConfirm(oldFile); + if (manifestFile.exists()) + FileUtils.renameWithConfirm(manifestFile, oldFile); + assert tmpFile.exists(); + FileUtils.renameWithConfirm(tmpFile, manifestFile); + logger.debug("Saved manifest {}", manifestFile); } catch (IOException e) { - e.printStackTrace(); + throw new IOError(e); } - File oldFile = new File(oldManifestFileName); - if (oldFile.exists()) - oldFile.delete(); - File currentManifest = new File(manifestFileName); - if (currentManifest.exists()) - currentManifest.renameTo(new File(oldManifestFileName)); - if (tmpManifest.exists()) - tmpManifest.renameTo(new File(manifestFileName)); } public static File tryGetManifest(ColumnFamilyStore cfs) { - for (String dataFileLocation : DatabaseDescriptor.getAllDataFileLocations()) + for (String dir : DatabaseDescriptor.getAllDataFileLocations()) { - dataFileLocation = getDataFilePrefix(cfs); - String manifestFileName = dataFileLocation + System.getProperty("file.separator") + cfs.table.name + ".json"; - File manifestFile = new File(manifestFileName); + File manifestFile = new File(new File(dir, cfs.table.name), cfs.columnFamily + ".json"); if (manifestFile.exists()) + { + logger.debug("Loading manifest from {}", manifestFile); return manifestFile; + } } + logger.debug("No level manifest found"); return null; } - public static String getDataFilePrefix(ColumnFamilyStore cfs) + @Override + public String toString() { - return DatabaseDescriptor.getAllDataFileLocations()[0] + System.getProperty("file.separator") + cfs.table.name + System.getProperty("file.separator"); + return "Manifest@" + hashCode(); } } Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/TokenMetadata.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=1173134&r1=1173133&r2=1173134&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/TokenMetadata.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/TokenMetadata.java Tue Sep 20 13:19:22 2011 @@ -43,22 +43,34 @@ public class TokenMetadata /* Maintains token to endpoint map of every node in the cluster. */ private BiMap<Token, InetAddress> tokenToEndpointMap; - // Suppose that there is a ring of nodes A, C and E, with replication factor 3. + // Prior to CASSANDRA-603, we just had <tt>Map<Range, InetAddress> pendingRanges<tt>, + // which was added to when a node began bootstrap and removed from when it finished. + // + // This is inadequate when multiple changes are allowed simultaneously. For example, + // suppose that there is a ring of nodes A, C and E, with replication factor 3. // Node D bootstraps between C and E, so its pending ranges will be E-A, A-C and C-D. - // Now suppose node B bootstraps between A and C at the same time. Its pending ranges would be C-E, E-A and A-B. - // Now both nodes have pending range E-A in their list, which will cause pending range collision - // even though we're only talking about replica range, not even primary range. The same thing happens - // for any nodes that boot simultaneously between same two nodes. For this we cannot simply make pending ranges a <tt>Multimap</tt>, - // since that would make us unable to notice the real problem of two nodes trying to boot using the same token. - // In order to do this properly, we need to know what tokens are booting at any time. + // Now suppose node B bootstraps between A and C at the same time. Its pending ranges + // would be C-E, E-A and A-B. Now both nodes need to be assigned pending range E-A, + // which we would be unable to represent with the old Map. The same thing happens + // even more obviously for any nodes that boot simultaneously between same two nodes. + // + // So, we made two changes: + // + // First, we changed pendingRanges to a <tt>Multimap<Range, InetAddress></tt> (now + // <tt>Map<String, Multimap<Range, InetAddress>></tt>, because replication strategy + // and options are per-KeySpace). + // + // Second, we added the bootstrapTokens and leavingEndpoints collections, so we can + // rebuild pendingRanges from the complete information of what is going on, when + // additional changes are made mid-operation. + // + // Finally, note that recording the tokens of joining nodes in bootstrapTokens also + // means we can detect and reject the addition of multiple nodes at the same token + // before one becomes part of the ring. private BiMap<Token, InetAddress> bootstrapTokens = HashBiMap.create(); - - // we will need to know at all times what nodes are leaving and calculate ranges accordingly. - // An anonymous pending ranges list is not enough, as that does not tell which node is leaving - // and/or if the ranges are there because of bootstrap or leave operation. - // (See CASSANDRA-603 for more detail + examples). + // (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving) private Set<InetAddress> leavingEndpoints = new HashSet<InetAddress>(); - + // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints} private ConcurrentMap<String, Multimap<Range, InetAddress>> pendingRanges = new ConcurrentHashMap<String, Multimap<Range, InetAddress>>(); // nodes which are migrating to the new tokens in the ring