Allocate merkletrees with the correct size

Patch by marcuse; reviewed by Marcus Olsson for CASSANDRA-11390


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d1bfae5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d1bfae5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d1bfae5

Branch: refs/heads/trunk
Commit: 1d1bfae580d44d3b8a4678c5af5767ff17102128
Parents: d479b8d
Author: Marcus Eriksson <marc...@apache.org>
Authored: Mon Mar 21 15:41:32 2016 +0100
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Wed Mar 23 14:54:39 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 49 ++++++++++++++------
 2 files changed, 35 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d1bfae5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 904206b..cf36047 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.5
+ * Allocate merkletrees with the correct size (CASSANDRA-11390)
  * Support streaming pre-3.0 sstables (CASSANDRA-10990)
  * Add backpressure to compressed commit log (CASSANDRA-10971)
  * SSTableExport supports secondary index tables (CASSANDRA-11330)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d1bfae5/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 891f976..7c46fcb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1078,16 +1078,8 @@ public class CompactionManager implements 
CompactionManagerMBean
 
             // Create Merkle trees suitable to hold estimated partitions for 
the given ranges.
             // We blindly assume that a partition is evenly distributed on all 
sstables for now.
-            long numPartitions = 0;
-            for (SSTableReader sstable : sstables)
-            {
-                numPartitions += 
sstable.estimatedKeysForRanges(validator.desc.ranges);
-            }
             // determine tree depth from number of partitions, but cap at 20 
to prevent large tree.
-            int depth = numPartitions > 0 ? (int) 
Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
-            MerkleTrees tree = new MerkleTrees(cfs.getPartitioner());
-            tree.addMerkleTrees((int) Math.pow(2, depth), 
validator.desc.ranges);
-
+            MerkleTrees tree = createMerkleTrees(sstables, 
validator.desc.ranges, cfs);
             long start = System.nanoTime();
             try (AbstractCompactionStrategy.ScannerList scanners = 
cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges);
                  ValidationCompactionController controller = new 
ValidationCompactionController(cfs, gcBefore);
@@ -1114,15 +1106,11 @@ public class CompactionManager implements 
CompactionManagerMBean
                 }
             }
 
-            if (logger.isTraceEnabled())
+            if (logger.isDebugEnabled())
             {
-                // MT serialize may take time
                 long duration = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-                logger.trace("Validation finished in {} msec, depth {} for {} 
keys, serialized size {} bytes for {}",
+                logger.debug("Validation finished in {} msec, for {}",
                              duration,
-                             depth,
-                             numPartitions,
-                             MerkleTrees.serializer.serializedSize(tree, 0),
                              validator.desc);
             }
         }
@@ -1133,6 +1121,37 @@ public class CompactionManager implements 
CompactionManagerMBean
         }
     }
 
+    private static MerkleTrees createMerkleTrees(Iterable<SSTableReader> 
sstables, Collection<Range<Token>> ranges, ColumnFamilyStore cfs)
+    {
+        MerkleTrees tree = new MerkleTrees(cfs.getPartitioner());
+        long allPartitions = 0;
+        Map<Range<Token>, Long> rangePartitionCounts = new HashMap<>();
+        for (Range<Token> range : ranges)
+        {
+            long numPartitions = 0;
+            for (SSTableReader sstable : sstables)
+                numPartitions += 
sstable.estimatedKeysForRanges(Collections.singleton(range));
+            rangePartitionCounts.put(range, numPartitions);
+            allPartitions += numPartitions;
+        }
+
+        for (Range<Token> range : ranges)
+        {
+            long numPartitions = rangePartitionCounts.get(range);
+            double rangeOwningRatio = allPartitions > 0 ? 
(double)numPartitions / allPartitions : 0;
+            int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - 
Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0;
+            int depth = numPartitions > 0 ? (int) 
Math.min(Math.floor(Math.log(numPartitions)), maxDepth) : 0;
+            tree.addMerkleTree((int) Math.pow(2, depth), range);
+        }
+        if (logger.isDebugEnabled())
+        {
+            // MT serialize may take time
+            logger.debug("Created {} merkle trees with merkle trees size {}, 
{} partitions, {} bytes", tree.ranges().size(), tree.size(), allPartitions, 
MerkleTrees.serializer.serializedSize(tree, 0));
+        }
+
+        return tree;
+    }
+
     private synchronized Refs<SSTableReader> 
getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
     {
         Refs<SSTableReader> sstables;

Reply via email to