Allocate merkletrees with the correct size Patch by marcuse; reviewed by Marcus Olsson for CASSANDRA-11390
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d1bfae5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d1bfae5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d1bfae5 Branch: refs/heads/trunk Commit: 1d1bfae580d44d3b8a4678c5af5767ff17102128 Parents: d479b8d Author: Marcus Eriksson <marc...@apache.org> Authored: Mon Mar 21 15:41:32 2016 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Mar 23 14:54:39 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 49 ++++++++++++++------ 2 files changed, 35 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d1bfae5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 904206b..cf36047 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.5 + * Allocate merkletrees with the correct size (CASSANDRA-11390) * Support streaming pre-3.0 sstables (CASSANDRA-10990) * Add backpressure to compressed commit log (CASSANDRA-10971) * SSTableExport supports secondary index tables (CASSANDRA-11330) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d1bfae5/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 891f976..7c46fcb 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1078,16 +1078,8 @@ public class CompactionManager implements CompactionManagerMBean // Create Merkle trees suitable to hold estimated partitions for the given ranges. // We blindly assume that a partition is evenly distributed on all sstables for now. - long numPartitions = 0; - for (SSTableReader sstable : sstables) - { - numPartitions += sstable.estimatedKeysForRanges(validator.desc.ranges); - } // determine tree depth from number of partitions, but cap at 20 to prevent large tree. - int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0; - MerkleTrees tree = new MerkleTrees(cfs.getPartitioner()); - tree.addMerkleTrees((int) Math.pow(2, depth), validator.desc.ranges); - + MerkleTrees tree = createMerkleTrees(sstables, validator.desc.ranges, cfs); long start = System.nanoTime(); try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges); ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore); @@ -1114,15 +1106,11 @@ public class CompactionManager implements CompactionManagerMBean } } - if (logger.isTraceEnabled()) + if (logger.isDebugEnabled()) { - // MT serialize may take time long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - logger.trace("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}", + logger.debug("Validation finished in {} msec, for {}", duration, - depth, - numPartitions, - MerkleTrees.serializer.serializedSize(tree, 0), validator.desc); } } @@ -1133,6 +1121,37 @@ public class CompactionManager implements CompactionManagerMBean } } + private static MerkleTrees createMerkleTrees(Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, ColumnFamilyStore cfs) + { + MerkleTrees tree = new MerkleTrees(cfs.getPartitioner()); + long allPartitions = 0; + Map<Range<Token>, Long> rangePartitionCounts = new HashMap<>(); + for (Range<Token> range : ranges) + { + long numPartitions = 0; + for (SSTableReader sstable : sstables) + numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(range)); + rangePartitionCounts.put(range, numPartitions); + allPartitions += numPartitions; + } + + for (Range<Token> range : ranges) + { + long numPartitions = rangePartitionCounts.get(range); + double rangeOwningRatio = allPartitions > 0 ? (double)numPartitions / allPartitions : 0; + int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0; + int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), maxDepth) : 0; + tree.addMerkleTree((int) Math.pow(2, depth), range); + } + if (logger.isDebugEnabled()) + { + // MT serialize may take time + logger.debug("Created {} merkle trees with merkle trees size {}, {} partitions, {} bytes", tree.ranges().size(), tree.size(), allPartitions, MerkleTrees.serializer.serializedSize(tree, 0)); + } + + return tree; + } + private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator) { Refs<SSTableReader> sstables;