Author: jbellis Date: Fri Dec 17 02:59:26 2010 New Revision: 1050270 URL: http://svn.apache.org/viewvc?rev=1050270&view=rev Log: make compaction buckets deterministic patch by thobbs; reviewed by jbellis for CASSANDRA-1265
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsTest.java Modified: cassandra/branches/cassandra-0.6/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1050270&r1=1050269&r2=1050270&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.6/CHANGES.txt Fri Dec 17 02:59:26 2010 @@ -21,6 +21,7 @@ * ReadResponseResolver check digests against each other (CASSANDRA-1830) * change exception for read requests during bootstrap from InvalidRequest to Unavailable (CASSANDRA-1862) + * make compaction buckets deterministic (CASSANDRA-1265) 0.6.8 Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1050270&r1=1050269&r2=1050270&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java Fri Dec 17 02:59:26 2010 @@ -42,6 +42,7 @@ import org.apache.cassandra.io.util.File import org.apache.cassandra.service.AntiEntropyService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; import org.apache.log4j.Logger; import org.cliffc.high_scale_lib.NonBlockingHashMap; @@ -88,7 +89,9 @@ public class CompactionManager implement return 0; } logger.debug("Checking to see if compaction of " + cfs.columnFamily_ + " would be useful"); - Set<List<SSTableReader>> buckets = getBuckets(cfs.getSSTables(), 50L * 1024L * 1024L); + + Set<List<SSTableReader>> buckets = getBuckets( + convertSSTablesToPairs(cfs.getSSTables()), 50L * 1024L * 1024L); updateEstimateFor(cfs, buckets); for (List<SSTableReader> sstables : buckets) @@ -467,29 +470,43 @@ public class CompactionManager implement /* * Group files of similar size into buckets. */ - static Set<List<SSTableReader>> getBuckets(Iterable<SSTableReader> files, long min) + static <T> Set<List<T>> getBuckets(Iterable<Pair<T, Long>> files, long min) { - Map<List<SSTableReader>, Long> buckets = new HashMap<List<SSTableReader>, Long>(); - for (SSTableReader sstable : files) + // Sort the list in order to get deterministic results during the grouping below + List<Pair<T, Long>> sortedFiles = new ArrayList<Pair<T, Long>>(); + for (Pair<T, Long> pair: files) + sortedFiles.add(pair); + + Collections.sort(sortedFiles, new Comparator<Pair<T, Long>>() { - long size = sstable.length(); + public int compare(Pair<T, Long> p1, Pair<T, Long> p2) + { + return p1.right.compareTo(p2.right); + } + }); + + Map<List<T>, Long> buckets = new HashMap<List<T>, Long>(); + + for (Pair<T, Long> pair: sortedFiles) + { + long size = pair.right; boolean bFound = false; // look for a bucket containing similar-sized files: // group in the same bucket if it's w/in 50% of the average for this bucket, // or this file and the bucket are all considered "small" (less than `min`) - for (Entry<List<SSTableReader>, Long> entry : buckets.entrySet()) + for (Entry<List<T>, Long> entry : buckets.entrySet()) { - List<SSTableReader> bucket = entry.getKey(); + List<T> bucket = entry.getKey(); long averageSize = entry.getValue(); - if ((size > averageSize / 2 && size < 3 * averageSize / 2) + if ((size > (averageSize / 2) && size < (3 * averageSize) / 2) || (size < min && averageSize < min)) { // remove and re-add because adding changes the hash buckets.remove(bucket); long totalSize = bucket.size() * averageSize; averageSize = (totalSize + size) / (bucket.size() + 1); - bucket.add(sstable); + bucket.add(pair.left); buckets.put(bucket, averageSize); bFound = true; break; @@ -498,8 +515,8 @@ public class CompactionManager implement // no similar bucket found; put it in a new one if (!bFound) { - ArrayList<SSTableReader> bucket = new ArrayList<SSTableReader>(); - bucket.add(sstable); + ArrayList<T> bucket = new ArrayList<T>(); + bucket.add(pair.left); buckets.put(bucket, size); } } @@ -507,6 +524,14 @@ public class CompactionManager implement return buckets.keySet(); } + private static Collection<Pair<SSTableReader, Long>> convertSSTablesToPairs(Collection<SSTableReader> collection) + { + Collection<Pair<SSTableReader, Long>> tablePairs = new HashSet<Pair<SSTableReader, Long>>(); + for(SSTableReader table: collection) + tablePairs.add(new Pair<SSTableReader, Long>(table, table.length())); + return tablePairs; + } + public static int getDefaultGCBefore() { return (int)(System.currentTimeMillis() / 1000) - DatabaseDescriptor.getGcGraceInSeconds(); @@ -565,7 +590,9 @@ public class CompactionManager implement public void run () { logger.debug("Estimating compactions for " + cfs.columnFamily_); - final Set<List<SSTableReader>> buckets = getBuckets(cfs.getSSTables(), 50L * 1024L * 1024L); + + final Set<List<SSTableReader>> buckets = + getBuckets(convertSSTablesToPairs(cfs.getSSTables()), 50L * 1024L * 1024L); updateEstimateFor(cfs, buckets); } }; Modified: cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=1050270&r1=1050269&r2=1050270&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsTest.java (original) +++ cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsTest.java Fri Dec 17 02:59:26 2010 @@ -24,6 +24,8 @@ import java.util.concurrent.ExecutionExc import java.util.concurrent.Future; import java.util.Set; import java.util.HashSet; +import java.util.List; +import java.util.ArrayList; import org.apache.cassandra.Util; @@ -33,6 +35,7 @@ import org.apache.cassandra.config.Datab import org.apache.cassandra.CleanupHelper; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; import static junit.framework.Assert.assertEquals; public class CompactionsTest extends CleanupHelper @@ -75,4 +78,57 @@ public class CompactionsTest extends Cle } assertEquals(inserted.size(), Util.getRangeSlice(store).rows.size()); } + + @Test + public void testGetBuckets() + { + List<Pair<String, Long>> pairs = new ArrayList<Pair<String,Long>>(); + String[] strings = { "a", "bbbb", "cccccccc", "cccccccc", "bbbb", "a" }; + for (int i = 0; i < strings.length; i++) { + Pair<String, Long> pair = new Pair<String, Long>(strings[i], new Long(strings[i].length())); + pairs.add(pair); + } + + Set<List<String>> buckets = CompactionManager.getBuckets(pairs, 2); + assertEquals(3, buckets.size()); + + for(List<String> bucket: buckets) + { + assertEquals(2, bucket.size()); + assertEquals(bucket.get(0).length(), bucket.get(1).length()); + assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0)); + } + + pairs.clear(); + buckets.clear(); + + String[] strings2 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" }; + for (int i = 0; i < strings2.length; i++) { + Pair<String, Long> pair = new Pair<String, Long>(strings2[i], new Long(strings2[i].length())); + pairs.add(pair); + } + + buckets = CompactionManager.getBuckets(pairs, 2); + assertEquals(2, buckets.size()); + + for(List<String> bucket: buckets) + { + assertEquals(3, bucket.size()); + assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0)); + assertEquals(bucket.get(1).charAt(0), bucket.get(2).charAt(0)); + } + + // Test the "min" functionality + pairs.clear(); + buckets.clear(); + + String[] strings3 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" }; + for (int i = 0; i < strings3.length; i++) { + Pair<String, Long> pair = new Pair<String, Long>(strings3[i], new Long(strings3[i].length())); + pairs.add(pair); + } + + buckets = CompactionManager.getBuckets(pairs, 10); // notice the min is 10 + assertEquals(1, buckets.size()); + } }